/*
 * Decompiled with CFR 0.152.
 */
package org.apache.twill.internal.kafka;

import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractIdleService;
import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
import org.I0Itec.zkclient.exception.ZkTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class EmbeddedKafkaServer
extends AbstractIdleService {
    public static final String START_TIMEOUT_RETRIES = "twill.kafka.start.timeout.retries";
    private static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaServer.class);
    private static final String DEFAULT_START_TIMEOUT_RETRIES = "5";
    private final int startTimeoutRetries;
    private final KafkaConfig kafkaConfig;
    private KafkaServer server;

    public EmbeddedKafkaServer(Properties properties) {
        this.startTimeoutRetries = Integer.parseInt(properties.getProperty(START_TIMEOUT_RETRIES, DEFAULT_START_TIMEOUT_RETRIES));
        this.kafkaConfig = new KafkaConfig(properties);
    }

    protected void startUp() throws Exception {
        int tries = 0;
        do {
            KafkaServer kafkaServer = this.createKafkaServer(this.kafkaConfig);
            try {
                kafkaServer.startup();
                this.server = kafkaServer;
            }
            catch (Exception e) {
                kafkaServer.shutdown();
                kafkaServer.awaitShutdown();
                Throwable rootCause = Throwables.getRootCause((Throwable)e);
                if (rootCause instanceof ZkTimeoutException) {
                    LOG.warn("Timeout when connecting to ZooKeeper from KafkaServer. Attempt number {}.", (Object)tries, (Object)rootCause);
                    continue;
                }
                throw e;
            }
        } while (this.server == null && ++tries < this.startTimeoutRetries);
        if (this.server == null) {
            throw new IllegalStateException("Failed to start Kafka server after " + tries + " attempts.");
        }
    }

    protected void shutDown() throws Exception {
        if (this.server != null) {
            this.server.shutdown();
            this.server.awaitShutdown();
        }
    }

    private KafkaServer createKafkaServer(KafkaConfig kafkaConfig) {
        return new KafkaServer(kafkaConfig, new Time(){

            public long milliseconds() {
                return System.currentTimeMillis();
            }

            public long nanoseconds() {
                return System.nanoTime();
            }

            public void sleep(long ms) {
                try {
                    Thread.sleep(ms);
                }
                catch (InterruptedException e) {
                    Thread.interrupted();
                }
            }
        });
    }
}

