/*
 * Decompiled with CFR 0.152.
 */
package org.pentaho.di.trans.ael.websocket;

import com.google.common.collect.Maps;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.pentaho.di.core.Result;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.LogChannelInterface;
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.engine.api.events.PDIEvent;
import org.pentaho.di.engine.api.model.Operation;
import org.pentaho.di.engine.api.model.Transformation;
import org.pentaho.di.engine.api.remote.ExecutionRequest;
import org.pentaho.di.engine.api.remote.Message;
import org.pentaho.di.engine.api.remote.StopMessage;
import org.pentaho.di.engine.api.reporting.LogEntry;
import org.pentaho.di.engine.api.reporting.Status;
import org.pentaho.di.engine.model.ActingPrincipal;
import org.pentaho.di.resource.ResourceEntry;
import org.pentaho.di.trans.RowProducer;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.ael.adapters.TransMetaConverter;
import org.pentaho.di.trans.ael.websocket.DaemonMessagesClientEndpoint;
import org.pentaho.di.trans.ael.websocket.MessageEventService;
import org.pentaho.di.trans.ael.websocket.StepDataInterfaceWebSocketEngineAdapter;
import org.pentaho.di.trans.ael.websocket.StepInterfaceWebSocketEngineAdapter;
import org.pentaho.di.trans.ael.websocket.Util;
import org.pentaho.di.trans.ael.websocket.exception.HandlerRegistrationException;
import org.pentaho.di.trans.ael.websocket.exception.MessageEventHandlerExecutionException;
import org.pentaho.di.trans.ael.websocket.handler.MessageEventHandler;
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaDataCombi;
import org.pentaho.di.trans.step.StepMetaInterface;

public class TransWebSocketEngineAdapter
extends Trans {
    private static final String OPERATION_LOG = "OPERATION_LOG_TRANS_WEBSOCK_";
    private static final String TRANSFORMATION_LOG = "TRANSFORMATION_LOG_TRANS_WEBSOCK";
    private static final String TRANSFORMATION_STATUS = "TRANSFORMATION_STATUS_TRANS_WEBSOCK";
    private static final String TRANSFORMATION_ERROR = "TRANSFORMATION_ERROR_TRANS_WEBSOCK";
    private static final String TRANSFORMATION_STOP = "TRANSFORMATION_STOP_TRANS_WEBSOCK";
    private static final int SLEEP_TIME_MS = 10000;
    private static final int MAX_TEST_FAILED = 3;
    private ExecutorService sessionMonitor = null;
    private final Transformation transformation;
    private ExecutionRequest executionRequest;
    private DaemonMessagesClientEndpoint daemonMessagesClientEndpoint = null;
    private final MessageEventService messageEventService;
    private org.pentaho.di.engine.api.reporting.LogLevel logLevel = null;
    private final String host;
    private final String port;
    private final boolean ssl;
    private boolean cancelling = false;
    private CountDownLatch transFinishedSignal = new CountDownLatch(1);
    private AtomicInteger errors = new AtomicInteger();
    private static final Map<LogLevel, org.pentaho.di.engine.api.reporting.LogLevel> LEVEL_MAP = new HashMap<LogLevel, org.pentaho.di.engine.api.reporting.LogLevel>();

    public TransWebSocketEngineAdapter(TransMeta transMeta, String host, String port, boolean ssl) {
        this.transformation = TransMetaConverter.convert(transMeta);
        this.transMeta = transMeta;
        this.messageEventService = new MessageEventService();
        this.host = host;
        this.port = port;
        this.ssl = ssl;
    }

    DaemonMessagesClientEndpoint getDaemonEndpoint() throws KettleException {
        try {
            if (this.daemonMessagesClientEndpoint == null) {
                this.daemonMessagesClientEndpoint = new DaemonMessagesClientEndpoint(this.host, this.port, this.ssl, this.messageEventService);
            }
            return this.daemonMessagesClientEndpoint;
        }
        catch (KettleException e) {
            this.finishProcess(true);
            this.transFinishedSignal.countDown();
            throw e;
        }
    }

    @Override
    public void setLogLevel(LogLevel logLogLevel) {
        this.logLevel = LEVEL_MAP.getOrDefault(logLogLevel, org.pentaho.di.engine.api.reporting.LogLevel.MINIMAL);
    }

    @Override
    public void killAll() {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    @Override
    public void stopAll() {
        try {
            this.cancelling = true;
            this.getDaemonEndpoint().sendMessage(new StopMessage(this.getErrors() == 0 ? "User Request" : "Error reported"));
            if (this.getErrors() == 0) {
                this.waitUntilFinished();
                this.finishProcess(true);
            }
        }
        catch (KettleException e) {
            this.getLogChannel().logDebug(e.getMessage());
        }
        finally {
            this.cancelling = false;
        }
    }

    @Override
    public void safeStop() {
        try {
            this.getDaemonEndpoint().sendMessage(StopMessage.builder().reasonPhrase("User Request").safeStop(true).build());
            this.getSteps().stream().map(stepMetaDataCombi -> stepMetaDataCombi.step).filter(stepInterface -> stepInterface.getInputRowSets().isEmpty()).forEach(step -> step.setStopped(true));
            Executors.newSingleThreadExecutor().submit(() -> {
                this.waitUntilFinished();
                this.finishProcess(true);
            });
        }
        catch (KettleException e) {
            this.getLogChannel().logDebug(e.getMessage(), new Object[]{e});
        }
    }

    @Override
    public void prepareExecution(String[] arguments) throws KettleException {
        this.activateParameters();
        this.transMeta.activateParameters();
        this.transMeta.setInternalKettleVariables();
        Map env = Arrays.stream(this.transMeta.listVariables()).collect(Collectors.toMap(Function.identity(), this.transMeta::getVariable));
        this.executionRequest = new ExecutionRequest(new HashMap(), env, this.transformation, new HashMap(), this.logLevel, this.getActingPrincipal(this.transMeta));
        this.setSteps(this.opsToSteps());
        this.wireStatusToTransListeners();
        this.subscribeToOpLogging();
        this.subscribeToTransLogging();
        this.setReadyToStart(true);
    }

    private void logToChannel(LogChannelInterface logChannel, LogEntry data) {
        org.pentaho.di.engine.api.reporting.LogLevel logLogLevel = data.getLogLogLevel();
        switch (logLogLevel) {
            case ERROR: {
                if (data.getThrowable() != null) {
                    logChannel.logError(data.getMessage(), data.getThrowable());
                    break;
                }
                logChannel.logError(data.getMessage());
                break;
            }
            case MINIMAL: {
                logChannel.logMinimal(data.getMessage());
                break;
            }
            case BASIC: {
                logChannel.logBasic(data.getMessage());
                break;
            }
            case DETAILED: {
                logChannel.logDetailed(data.getMessage());
                break;
            }
            case DEBUG: {
                logChannel.logDebug(data.getMessage());
                break;
            }
            case TRACE: {
                logChannel.logRowlevel(data.getMessage());
            }
        }
    }

    private void subscribeToOpLogging() throws KettleException {
        this.transformation.getOperations().stream().forEach(operation -> {
            try {
                this.messageEventService.addHandler((Message)Util.getOperationLogEvent(operation.getId()), new MessageEventHandler((Operation)operation){
                    final /* synthetic */ Operation val$operation;
                    {
                        this.val$operation = operation;
                    }

                    @Override
                    public void execute(Message message) throws MessageEventHandlerExecutionException {
                        PDIEvent event = (PDIEvent)message;
                        LogEntry logEntry = (LogEntry)event.getData();
                        StepInterface stepInterface = TransWebSocketEngineAdapter.this.findStepInterface(this.val$operation.getId(), 0);
                        if (stepInterface != null) {
                            LogChannelInterface logChannel = stepInterface.getLogChannel();
                            TransWebSocketEngineAdapter.this.logToChannel(logChannel, logEntry);
                        } else {
                            TransWebSocketEngineAdapter.this.logToChannel(TransWebSocketEngineAdapter.this.getLogChannel(), logEntry);
                        }
                    }

                    @Override
                    public String getIdentifier() {
                        return TransWebSocketEngineAdapter.OPERATION_LOG + this.val$operation.getId();
                    }
                });
            }
            catch (HandlerRegistrationException e) {
                this.getLogChannel().logError("Error registering message handlers", (Throwable)((Object)e));
            }
        });
    }

    private void subscribeToTransLogging() throws KettleException {
        this.messageEventService.addHandler((Message)Util.getTransformationLogEvent(), new MessageEventHandler(){

            @Override
            public void execute(Message message) throws MessageEventHandlerExecutionException {
                PDIEvent event = (PDIEvent)message;
                LogEntry data = (LogEntry)event.getData();
                TransWebSocketEngineAdapter.this.logToChannel(TransWebSocketEngineAdapter.this.getLogChannel(), data);
            }

            @Override
            public String getIdentifier() {
                return TransWebSocketEngineAdapter.TRANSFORMATION_LOG;
            }
        });
    }

    private void wireStatusToTransListeners() throws KettleException {
        this.messageEventService.addHandler((Message)Util.getTransformationStatusEvent(), new MessageEventHandler(){

            @Override
            public void execute(Message message) throws MessageEventHandlerExecutionException {
                PDIEvent transStatusEvent = (PDIEvent)message;
                TransWebSocketEngineAdapter.this.addStepPerformanceSnapShot();
                TransWebSocketEngineAdapter.this.getTransListeners().forEach(l -> {
                    try {
                        switch ((Status)transStatusEvent.getData()) {
                            case RUNNING: {
                                l.transStarted(TransWebSocketEngineAdapter.this);
                                l.transActive(TransWebSocketEngineAdapter.this);
                                break;
                            }
                            case PAUSED: {
                                break;
                            }
                            case STOPPED: {
                                break;
                            }
                            case FAILED: 
                            case FINISHED: {
                                l.transFinished(TransWebSocketEngineAdapter.this);
                                TransWebSocketEngineAdapter.this.setFinished(true);
                            }
                        }
                    }
                    catch (KettleException e) {
                        throw new RuntimeException(e);
                    }
                });
            }

            @Override
            public String getIdentifier() {
                return TransWebSocketEngineAdapter.TRANSFORMATION_STATUS;
            }
        });
        this.messageEventService.addHandler((Message)Util.getTransformationErrorEvent(), new MessageEventHandler(){

            @Override
            public void execute(Message message) throws MessageEventHandlerExecutionException {
                Throwable throwable = ((LogEntry)((PDIEvent)message).getData()).getThrowable();
                TransWebSocketEngineAdapter.this.getLogChannel().logError("Error Executing Transformation", throwable);
                TransWebSocketEngineAdapter.this.errors.incrementAndGet();
                TransWebSocketEngineAdapter.this.finishProcess(true);
            }

            @Override
            public String getIdentifier() {
                return TransWebSocketEngineAdapter.TRANSFORMATION_ERROR;
            }
        });
        this.messageEventService.addHandler((Message)Util.getStopMessage(), new MessageEventHandler(){

            @Override
            public void execute(Message message) throws MessageEventHandlerExecutionException {
                StopMessage stopMessage = (StopMessage)message;
                if (stopMessage.sessionWasKilled() || stopMessage.operationFailed()) {
                    TransWebSocketEngineAdapter.this.getLogChannel().logError("Finalizing execution: " + stopMessage.getReasonPhrase());
                } else {
                    TransWebSocketEngineAdapter.this.getLogChannel().logBasic("Finalizing execution: " + stopMessage.getReasonPhrase());
                }
                if (!TransWebSocketEngineAdapter.this.cancelling) {
                    TransWebSocketEngineAdapter.this.finishProcess(false);
                }
                try {
                    TransWebSocketEngineAdapter.this.getDaemonEndpoint().close(stopMessage.getReasonPhrase());
                }
                catch (KettleException e) {
                    TransWebSocketEngineAdapter.this.getLogChannel().logError("Error finalizing", (Throwable)e);
                }
                TransWebSocketEngineAdapter.this.closeSessionMonitor();
                TransWebSocketEngineAdapter.this.transFinishedSignal.countDown();
            }

            @Override
            public String getIdentifier() {
                return TransWebSocketEngineAdapter.TRANSFORMATION_STOP;
            }
        });
    }

    private List<StepMetaDataCombi> opsToSteps() {
        return this.opsToSteps(this.transformation);
    }

    private List<StepMetaDataCombi> opsToSteps(Transformation transformation) {
        Map operationToCombi = transformation.getOperations().stream().collect(Collectors.toMap(Function.identity(), op -> {
            StepMetaDataCombi combi = new StepMetaDataCombi();
            combi.stepMeta = StepMeta.fromXml((String)op.getConfig().get("StepMeta"));
            try {
                combi.data = new StepDataInterfaceWebSocketEngineAdapter((Operation)op, this.messageEventService);
                List<StepMetaDataCombi> subSteps = this.getSubSteps(transformation, combi);
                combi.step = new StepInterfaceWebSocketEngineAdapter((Operation)op, this.messageEventService, combi.stepMeta, this.transMeta, combi.data, this, subSteps);
            }
            catch (KettleException e) {
                e.printStackTrace();
            }
            combi.meta = combi.stepMeta.getStepMetaInterface();
            combi.stepname = combi.stepMeta.getName();
            return combi;
        }));
        return new ArrayList<StepMetaDataCombi>(operationToCombi.values());
    }

    private List<StepMetaDataCombi> getSubSteps(Transformation transformation, StepMetaDataCombi combi) {
        HashMap config = transformation.getConfig("SubTransformations").orElse(Maps.newHashMap());
        StepMetaInterface smi = combi.stepMeta.getStepMetaInterface();
        return config.keySet().stream().filter(key -> this.stepHasDependency(combi, smi, (String)key)).flatMap(key -> this.opsToSteps((Transformation)config.get(key)).stream()).collect(Collectors.toList());
    }

    private boolean stepHasDependency(StepMetaDataCombi combi, StepMetaInterface smi, String path) {
        return smi.getResourceDependencies(this.transMeta, combi.stepMeta).stream().flatMap(resourceReference -> resourceReference.getEntries().stream()).filter(entry -> ResourceEntry.ResourceType.ACTIONFILE.equals((Object)entry.getResourcetype())).anyMatch(entry -> entry.getResource().equals(path));
    }

    @Override
    public void startThreads() throws KettleException {
        this.getDaemonEndpoint().sendMessage(this.executionRequest);
        this.startSessionMonitor();
    }

    private void startSessionMonitor() {
        this.getLogChannel().logDebug("Starting Session Monitor.");
        this.sessionMonitor = Executors.newSingleThreadExecutor();
        this.sessionMonitor.submit(() -> {
            int failedTests = 0;
            while (!this.isFinished()) {
                try {
                    if (failedTests > 3) {
                        this.errors.incrementAndGet();
                        this.getLogChannel().logError("Session Monitor detected that communication with the server was lost. Finalizing execution.");
                        if (!this.cancelling) {
                            this.finishProcess(false);
                        }
                        this.transFinishedSignal.countDown();
                        continue;
                    }
                    TimeUnit.MILLISECONDS.sleep(10000L);
                    this.getDaemonEndpoint().sessionValid();
                    if (failedTests > 0) {
                        this.getLogChannel().logDebug("Session Monitor - Server Communication restored.");
                    }
                    failedTests = 0;
                }
                catch (KettleException e) {
                    this.getLogChannel().logDebug("Session Monitor detected communication problem with the server. Retry (" + ++failedTests + "/" + 3 + ").");
                }
                catch (InterruptedException e) {
                    this.getLogChannel().logDebug("Session Monitor was interrupted.");
                }
            }
            this.closeSessionMonitor();
        });
    }

    private void closeSessionMonitor() {
        if (this.sessionMonitor != null && !this.sessionMonitor.isShutdown()) {
            try {
                this.getLogChannel().logDebug("Shutting down the Session Monitor.");
                this.sessionMonitor.shutdown();
            }
            finally {
                if (!this.sessionMonitor.isTerminated()) {
                    this.sessionMonitor.shutdownNow();
                }
                this.getLogChannel().logDebug("Session Monitor shutdown.");
            }
        }
    }

    @Override
    public void waitUntilFinished() {
        try {
            this.transFinishedSignal.await();
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Waiting for transformation to be finished interrupted!", e);
        }
    }

    @Override
    public int getErrors() {
        int nrErrors = this.errors.get();
        if (this.getSteps() != null) {
            for (int i = 0; i < this.getSteps().size(); ++i) {
                nrErrors = (int)((long)nrErrors + this.getSteps().get((int)i).step.getErrors());
            }
        }
        return nrErrors;
    }

    @Override
    public Result getResult() {
        Result toRet = new Result();
        toRet.setNrErrors((long)this.getErrors());
        return toRet;
    }

    private void finishProcess(boolean emitToAllSteps) {
        this.setFinished(true);
        if (emitToAllSteps) {
            this.getSteps().stream().map(stepMetaDataCombi -> stepMetaDataCombi.step).forEach(step -> {
                step.setStopped(true);
                step.setRunning(false);
            });
        }
        this.getTransListeners().forEach(l -> {
            try {
                l.transFinished(this);
            }
            catch (KettleException e1) {
                this.getLogChannel().logError("Error notifying trans listener", (Throwable)e1);
            }
        });
    }

    @Override
    public RowProducer addRowProducer(String stepname, int copynr) throws KettleException {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    private Principal getActingPrincipal(TransMeta transMeta) {
        if (transMeta.getRepository() == null || transMeta.getRepository().getUserInfo() == null) {
            return ActingPrincipal.ANONYMOUS;
        }
        return new ActingPrincipal(transMeta.getRepository().getUserInfo().getName());
    }

    static {
        LEVEL_MAP.put(LogLevel.BASIC, org.pentaho.di.engine.api.reporting.LogLevel.BASIC);
        LEVEL_MAP.put(LogLevel.DEBUG, org.pentaho.di.engine.api.reporting.LogLevel.DEBUG);
        LEVEL_MAP.put(LogLevel.DETAILED, org.pentaho.di.engine.api.reporting.LogLevel.DETAILED);
        LEVEL_MAP.put(LogLevel.ERROR, org.pentaho.di.engine.api.reporting.LogLevel.ERROR);
        LEVEL_MAP.put(LogLevel.MINIMAL, org.pentaho.di.engine.api.reporting.LogLevel.MINIMAL);
        LEVEL_MAP.put(LogLevel.ROWLEVEL, org.pentaho.di.engine.api.reporting.LogLevel.TRACE);
    }
}

