/*
 * Decompiled with CFR 0.152.
 */
package org.apache.twill.internal.kafka.client;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import java.nio.channels.ClosedByInterruptException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.internal.kafka.client.BasicFetchedMessage;
import org.apache.twill.kafka.client.BrokerInfo;
import org.apache.twill.kafka.client.BrokerService;
import org.apache.twill.kafka.client.FetchedMessage;
import org.apache.twill.kafka.client.KafkaConsumer;
import org.apache.twill.kafka.client.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class SimpleKafkaConsumer
implements KafkaConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(SimpleKafkaConsumer.class);
    private static final int FETCH_SIZE = 0x100000;
    private static final int SO_TIMEOUT = 5000;
    private static final int MAX_WAIT = 1000;
    private static final long CONSUMER_EXPIRE_MINUTES = 1L;
    private static final long CONSUMER_FAILURE_RETRY_INTERVAL = 2000L;
    private static final long EMPTY_FETCH_WAIT = 500L;
    private final BrokerService brokerService;
    private final LoadingCache<BrokerInfo, SimpleConsumer> consumers;
    private final BlockingQueue<Cancellable> consumerCancels;

    SimpleKafkaConsumer(BrokerService brokerService) {
        this.brokerService = brokerService;
        this.consumers = CacheBuilder.newBuilder().expireAfterAccess(1L, TimeUnit.MINUTES).removalListener(this.createRemovalListener()).build(this.createConsumerLoader());
        this.consumerCancels = new LinkedBlockingQueue<Cancellable>();
    }

    @Override
    public KafkaConsumer.Preparer prepare() {
        return new SimplePreparer();
    }

    void stop() {
        LOG.info("Stopping Kafka consumer");
        LinkedList cancels = Lists.newLinkedList();
        this.consumerCancels.drainTo(cancels);
        for (Cancellable cancel : cancels) {
            cancel.cancel();
        }
        this.consumers.invalidateAll();
        LOG.info("Kafka Consumer stopped");
    }

    private CacheLoader<BrokerInfo, SimpleConsumer> createConsumerLoader() {
        return new CacheLoader<BrokerInfo, SimpleConsumer>(){

            public SimpleConsumer load(BrokerInfo key) throws Exception {
                return new SimpleConsumer(key.getHost(), key.getPort(), 5000, 0x100000, "simple-kafka-client");
            }
        };
    }

    private RemovalListener<BrokerInfo, SimpleConsumer> createRemovalListener() {
        return new RemovalListener<BrokerInfo, SimpleConsumer>(){

            public void onRemoval(RemovalNotification<BrokerInfo, SimpleConsumer> notification) {
                SimpleConsumer consumer = (SimpleConsumer)notification.getValue();
                if (consumer != null) {
                    consumer.close();
                }
            }
        };
    }

    private long getLastOffset(TopicPartition topicPart, long timestamp) {
        long[] offsets;
        SimpleConsumer consumer;
        BrokerInfo brokerInfo = this.brokerService.getLeader(topicPart.getTopic(), topicPart.getPartition());
        SimpleConsumer simpleConsumer = consumer = brokerInfo == null ? null : (SimpleConsumer)this.consumers.getUnchecked((Object)brokerInfo);
        if (consumer == null) {
            LOG.warn("Failed to talk to any broker. Default offset to 0 for {}", (Object)topicPart);
            return 0L;
        }
        OffsetRequest request = new OffsetRequest((Map)ImmutableMap.of((Object)new TopicAndPartition(topicPart.getTopic(), topicPart.getPartition()), (Object)new PartitionOffsetRequestInfo(timestamp, 1)), kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
        OffsetResponse response = consumer.getOffsetsBefore(request);
        long[] lArray = offsets = response.hasError() ? null : response.offsets(topicPart.getTopic(), topicPart.getPartition());
        if (offsets == null || offsets.length <= 0) {
            short errorCode = response.errorCode(topicPart.getTopic(), topicPart.getPartition());
            if (errorCode != ErrorMapping.UnknownTopicOrPartitionCode()) {
                this.consumers.refresh((Object)brokerInfo);
                LOG.warn("Failed to fetch offset for {} with timestamp {}. Error: {}. Default offset to 0.", new Object[]{topicPart, timestamp, errorCode});
            }
            return 0L;
        }
        LOG.debug("Offset {} fetched for {} with timestamp {}.", new Object[]{offsets[0], topicPart, timestamp});
        return offsets[0];
    }

    private final class ConsumerThread
    extends Thread {
        private final TopicPartition topicPart;
        private final long startOffset;
        private final KafkaConsumer.MessageCallback callback;
        private final BasicFetchedMessage fetchedMessage;
        private volatile boolean running;

        private ConsumerThread(TopicPartition topicPart, long startOffset, KafkaConsumer.MessageCallback callback) {
            super(String.format("Kafka-Consumer-%s-%d", topicPart.getTopic(), topicPart.getPartition()));
            this.topicPart = topicPart;
            this.startOffset = startOffset;
            this.callback = callback;
            this.running = true;
            this.fetchedMessage = new BasicFetchedMessage(topicPart);
        }

        @Override
        public void run() {
            AtomicLong offset = new AtomicLong(this.startOffset);
            Map.Entry<BrokerInfo, SimpleConsumer> consumerEntry = null;
            while (this.running) {
                if (consumerEntry == null && (consumerEntry = this.getConsumerEntry()) == null) {
                    LOG.debug("No leader for topic partition {}.", (Object)this.topicPart);
                    try {
                        TimeUnit.MILLISECONDS.sleep(2000L);
                    }
                    catch (InterruptedException e) {
                        LOG.trace("Consumer sleep interrupted.", (Throwable)e);
                    }
                    continue;
                }
                long off = offset.get();
                if (off < 0L) {
                    offset.set(SimpleKafkaConsumer.this.getLastOffset(this.topicPart, off));
                }
                SimpleConsumer consumer = consumerEntry.getValue();
                try {
                    FetchResponse response = this.fetchMessages(consumer, offset.get());
                    if (response.hasError()) {
                        short errorCode = response.errorCode(this.topicPart.getTopic(), this.topicPart.getPartition());
                        LOG.info("Failed to fetch message on {}. Error: {}", (Object)this.topicPart, (Object)errorCode);
                        if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
                            offset.set(kafka.api.OffsetRequest.EarliestTime());
                        }
                        SimpleKafkaConsumer.this.consumers.refresh((Object)consumerEntry.getKey());
                        consumerEntry = null;
                        continue;
                    }
                    ByteBufferMessageSet messages = response.messageSet(this.topicPart.getTopic(), this.topicPart.getPartition());
                    if (this.sleepIfEmpty(messages)) continue;
                    this.invokeCallback(messages, offset);
                }
                catch (Throwable t) {
                    if (this.running || !(t instanceof ClosedByInterruptException)) {
                        LOG.info("Exception when fetching message on {}.", (Object)this.topicPart, (Object)t);
                    }
                    SimpleKafkaConsumer.this.consumers.refresh((Object)consumerEntry.getKey());
                    consumerEntry = null;
                }
            }
            try {
                this.callback.finished();
            }
            catch (Throwable t) {
                LOG.error("Exception thrown from MessageCallback.finished({})", (Object)this.running, (Object)t);
            }
        }

        public void terminate() {
            LOG.info("Terminate requested {}", (Object)this.getName());
            this.running = false;
            this.interrupt();
        }

        private Map.Entry<BrokerInfo, SimpleConsumer> getConsumerEntry() {
            BrokerInfo leader = SimpleKafkaConsumer.this.brokerService.getLeader(this.topicPart.getTopic(), this.topicPart.getPartition());
            return leader == null ? null : Maps.immutableEntry((Object)leader, (Object)SimpleKafkaConsumer.this.consumers.getUnchecked((Object)leader));
        }

        private FetchResponse fetchMessages(SimpleConsumer consumer, long offset) {
            FetchRequest request = new FetchRequestBuilder().clientId(consumer.clientId()).addFetch(this.topicPart.getTopic(), this.topicPart.getPartition(), offset, 0x100000).maxWait(1000).build();
            return consumer.fetch(request);
        }

        private boolean sleepIfEmpty(ByteBufferMessageSet messages) {
            if (Iterables.isEmpty((Iterable)messages)) {
                LOG.trace("No message fetched. Sleep for {} ms before next fetch.", (Object)500L);
                try {
                    TimeUnit.MILLISECONDS.sleep(500L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                return true;
            }
            return false;
        }

        private void invokeCallback(ByteBufferMessageSet messages, AtomicLong offset) {
            long savedOffset = offset.get();
            try {
                this.callback.onReceived(this.createFetchedMessages(messages, offset));
            }
            catch (Throwable t) {
                LOG.error("Callback throws exception. Retry from offset {} for {}", new Object[]{this.startOffset, this.topicPart, t});
                offset.set(savedOffset);
            }
        }

        private Iterator<FetchedMessage> createFetchedMessages(ByteBufferMessageSet messageSet, final AtomicLong offset) {
            final Iterator messages = messageSet.iterator();
            return new AbstractIterator<FetchedMessage>(){

                protected FetchedMessage computeNext() {
                    while (messages.hasNext()) {
                        MessageAndOffset message = (MessageAndOffset)messages.next();
                        long msgOffset = message.offset();
                        if (msgOffset < offset.get()) {
                            LOG.trace("Received old offset {}, expecting {} on {}. Message Ignored.", new Object[]{msgOffset, offset.get(), ConsumerThread.this.topicPart});
                            continue;
                        }
                        offset.set(message.nextOffset());
                        ConsumerThread.this.fetchedMessage.setPayload(message.message().payload());
                        ConsumerThread.this.fetchedMessage.setNextOffset(offset.get());
                        return ConsumerThread.this.fetchedMessage;
                    }
                    return (FetchedMessage)this.endOfData();
                }
            };
        }
    }

    private final class SimplePreparer
    implements KafkaConsumer.Preparer {
        private final Map<TopicPartition, Long> requests = Maps.newHashMap();
        private final ThreadFactory threadFactory = Threads.createDaemonThreadFactory((String)"message-callback-%d");

        private SimplePreparer() {
        }

        @Override
        public KafkaConsumer.Preparer add(String topic, int partition, long offset) {
            this.requests.put(new TopicPartition(topic, partition), offset);
            return this;
        }

        @Override
        public KafkaConsumer.Preparer addFromBeginning(String topic, int partition) {
            TopicPartition topicPartition = new TopicPartition(topic, partition);
            this.requests.put(topicPartition, kafka.api.OffsetRequest.EarliestTime());
            return this;
        }

        @Override
        public KafkaConsumer.Preparer addLatest(String topic, int partition) {
            TopicPartition topicPartition = new TopicPartition(topic, partition);
            this.requests.put(topicPartition, kafka.api.OffsetRequest.LatestTime());
            return this;
        }

        @Override
        public Cancellable consume(KafkaConsumer.MessageCallback callback) {
            final ExecutorService executor = Executors.newSingleThreadExecutor(this.threadFactory);
            final ArrayList pollers = Lists.newArrayList();
            final AtomicBoolean cancelled = new AtomicBoolean();
            Cancellable cancellable = new Cancellable(){

                public void cancel() {
                    if (!cancelled.compareAndSet(false, true)) {
                        return;
                    }
                    SimpleKafkaConsumer.this.consumerCancels.remove(this);
                    LOG.info("Requesting stop of all consumer threads.");
                    for (ConsumerThread consumerThread : pollers) {
                        consumerThread.terminate();
                    }
                    LOG.info("Wait for all consumer threads to stop.");
                    for (ConsumerThread consumerThread : pollers) {
                        try {
                            consumerThread.join();
                        }
                        catch (InterruptedException e) {
                            LOG.warn("Interrupted exception while waiting for thread to complete.", (Throwable)e);
                        }
                    }
                    LOG.info("All consumer threads stopped.");
                    executor.shutdown();
                }
            };
            KafkaConsumer.MessageCallback messageCallback = this.wrapCallback(callback, executor, cancellable);
            for (Map.Entry<TopicPartition, Long> entry : this.requests.entrySet()) {
                ConsumerThread consumerThread = new ConsumerThread(entry.getKey(), entry.getValue(), messageCallback);
                consumerThread.setDaemon(true);
                consumerThread.start();
                pollers.add(consumerThread);
            }
            SimpleKafkaConsumer.this.consumerCancels.add(cancellable);
            return cancellable;
        }

        private KafkaConsumer.MessageCallback wrapCallback(final KafkaConsumer.MessageCallback callback, final ExecutorService executor, final Cancellable cancellable) {
            final AtomicBoolean stopped = new AtomicBoolean();
            return new KafkaConsumer.MessageCallback(){

                @Override
                public void onReceived(final Iterator<FetchedMessage> messages) {
                    if (stopped.get()) {
                        return;
                    }
                    Futures.getUnchecked(executor.submit(new Runnable(){

                        @Override
                        public void run() {
                            if (stopped.get()) {
                                return;
                            }
                            callback.onReceived(messages);
                        }
                    }));
                }

                @Override
                public void finished() {
                    if (!stopped.compareAndSet(false, true)) {
                        return;
                    }
                    Futures.getUnchecked(executor.submit(new Runnable(){

                        @Override
                        public void run() {
                            callback.finished();
                            cancellable.cancel();
                        }
                    }));
                }
            };
        }
    }
}

