/*
 * Decompiled with CFR 0.152.
 */
package org.pentaho.big.data.kettle.plugins.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.pentaho.big.data.kettle.plugins.kafka.KafkaConsumerField;
import org.pentaho.big.data.kettle.plugins.kafka.KafkaConsumerInput;
import org.pentaho.big.data.kettle.plugins.kafka.KafkaConsumerInputData;
import org.pentaho.big.data.kettle.plugins.kafka.KafkaConsumerInputMeta;
import org.pentaho.di.core.row.RowDataUtil;
import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.core.variables.VariableSpace;
import org.pentaho.di.trans.streaming.common.BaseStreamStep;
import org.pentaho.di.trans.streaming.common.BlockingQueueStreamSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaStreamSource
extends BlockingQueueStreamSource<List<Object>> {
    private final Logger logger = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    private final VariableSpace variables;
    private KafkaConsumerInputMeta kafkaConsumerInputMeta;
    private KafkaConsumerInputData kafkaConsumerInputData;
    private Map<KafkaConsumerField.Name, Integer> positions;
    private Consumer consumer;
    private final ExecutorService executorService = Executors.newCachedThreadPool();
    private KafkaConsumerCallable callable;
    private Future<Void> future;

    public KafkaStreamSource(Consumer consumer, KafkaConsumerInputMeta inputMeta, KafkaConsumerInputData kafkaConsumerInputData, VariableSpace variables, KafkaConsumerInput kafkaStep) {
        super((BaseStreamStep)kafkaStep);
        this.positions = new HashMap<KafkaConsumerField.Name, Integer>();
        this.consumer = consumer;
        this.variables = variables;
        this.kafkaConsumerInputData = kafkaConsumerInputData;
        this.kafkaConsumerInputMeta = inputMeta;
    }

    public void close() {
        this.callable.shutdown();
    }

    public void open() {
        if (this.future != null) {
            this.logger.warn("open() called more than once");
            return;
        }
        List valueMetas = this.kafkaConsumerInputData.outputRowMeta.getValueMetaList();
        this.positions = new HashMap<KafkaConsumerField.Name, Integer>(valueMetas.size());
        IntStream.range(0, valueMetas.size()).forEach(idx -> {
            Optional<KafkaConsumerField.Name> match = Arrays.stream(KafkaConsumerField.Name.values()).filter(name -> {
                KafkaConsumerField f = name.getFieldFromMeta(this.kafkaConsumerInputMeta);
                String fieldName = this.variables.environmentSubstitute(f.getOutputName());
                return fieldName != null && fieldName.equals(((ValueMetaInterface)valueMetas.get(idx)).getName());
            }).findFirst();
            match.ifPresent(name -> this.positions.put((KafkaConsumerField.Name)((Object)((Object)name)), idx));
        });
        this.callable = new KafkaConsumerCallable(this.consumer, () -> super.close());
        this.future = this.executorService.submit(this.callable);
    }

    public void commitOffsets(List<List<Object>> rows) {
        Map<Object, Map<Object, Optional<List>>> maxRows = rows.stream().collect(Collectors.groupingBy(row -> row.get(this.positions.get((Object)KafkaConsumerField.Name.TOPIC)), Collectors.groupingBy(row -> row.get(this.positions.get((Object)KafkaConsumerField.Name.PARTITION)), Collectors.maxBy(Comparator.comparingLong(row -> (Long)row.get(this.positions.get((Object)KafkaConsumerField.Name.OFFSET)))))));
        Map<TopicPartition, OffsetAndMetadata> offsets = maxRows.values().stream().flatMap(m -> m.values().stream()).map(Optional::get).collect(Collectors.toMap(row -> new TopicPartition((String)row.get(this.positions.get((Object)KafkaConsumerField.Name.TOPIC)), ((Long)row.get(this.positions.get((Object)KafkaConsumerField.Name.PARTITION))).intValue()), row -> new OffsetAndMetadata((Long)row.get(this.positions.get((Object)KafkaConsumerField.Name.OFFSET)) + 1L)));
        this.callable.queueCommit(offsets);
    }

    List<Object> processMessageAsRow(ConsumerRecord<String, String> record) {
        Object[] rowData = RowDataUtil.allocateRowData((int)this.kafkaConsumerInputData.outputRowMeta.size());
        if (this.positions.get((Object)KafkaConsumerField.Name.KEY) != null) {
            rowData[this.positions.get((Object)((Object)KafkaConsumerField.Name.KEY)).intValue()] = record.key();
        }
        if (this.positions.get((Object)KafkaConsumerField.Name.MESSAGE) != null) {
            rowData[this.positions.get((Object)((Object)KafkaConsumerField.Name.MESSAGE)).intValue()] = record.value();
        }
        if (this.positions.get((Object)KafkaConsumerField.Name.TOPIC) != null) {
            rowData[this.positions.get((Object)((Object)KafkaConsumerField.Name.TOPIC)).intValue()] = record.topic();
        }
        if (this.positions.get((Object)KafkaConsumerField.Name.PARTITION) != null) {
            rowData[this.positions.get((Object)((Object)KafkaConsumerField.Name.PARTITION)).intValue()] = (long)record.partition();
        }
        if (this.positions.get((Object)KafkaConsumerField.Name.OFFSET) != null) {
            rowData[this.positions.get((Object)((Object)KafkaConsumerField.Name.OFFSET)).intValue()] = record.offset();
        }
        if (this.positions.get((Object)KafkaConsumerField.Name.TIMESTAMP) != null) {
            rowData[this.positions.get((Object)((Object)KafkaConsumerField.Name.TIMESTAMP)).intValue()] = record.timestamp();
        }
        return Arrays.asList(rowData);
    }

    class KafkaConsumerCallable
    implements Callable<Void> {
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final Consumer consumer;
        private Runnable onClose;
        private ConcurrentLinkedQueue<Map<TopicPartition, OffsetAndMetadata>> toCommit = new ConcurrentLinkedQueue();

        public KafkaConsumerCallable(Consumer consumer, Runnable onClose) {
            this.consumer = consumer;
            this.onClose = onClose;
        }

        public void queueCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
            this.toCommit.add(offsets);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() {
            try {
                ConsumerRecords records;
                while (!this.closed.get()) {
                    this.commitOffsets();
                    records = this.consumer.poll(1000L);
                    ArrayList<List<Object>> rows = new ArrayList<List<Object>>();
                    for (ConsumerRecord record : records) {
                        rows.add(KafkaStreamSource.this.processMessageAsRow((ConsumerRecord<String, String>)record));
                    }
                    KafkaStreamSource.this.acceptRows(rows);
                }
                records = null;
                return records;
            }
            catch (WakeupException e) {
                if (!this.closed.get()) {
                    throw e;
                }
                Void void_ = null;
                return void_;
            }
            finally {
                this.commitOffsets();
                this.consumer.close();
                this.onClose.run();
            }
        }

        private void commitOffsets() {
            while (!this.toCommit.isEmpty()) {
                this.consumer.commitSync(this.toCommit.poll());
            }
        }

        public void shutdown() {
            this.closed.set(true);
            this.consumer.wakeup();
        }
    }
}

