/*
 * Decompiled with CFR 0.152.
 */
package org.pentaho.amazon.client.impl;

import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.model.ActionOnFailure;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.Application;
import com.amazonaws.services.elasticmapreduce.model.BootstrapActionConfig;
import com.amazonaws.services.elasticmapreduce.model.CancelStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.ClusterState;
import com.amazonaws.services.elasticmapreduce.model.DescribeClusterRequest;
import com.amazonaws.services.elasticmapreduce.model.DescribeClusterResult;
import com.amazonaws.services.elasticmapreduce.model.DescribeStepRequest;
import com.amazonaws.services.elasticmapreduce.model.DescribeStepResult;
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig;
import com.amazonaws.services.elasticmapreduce.model.JobFlowInstancesConfig;
import com.amazonaws.services.elasticmapreduce.model.ListStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.ListStepsResult;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
import com.amazonaws.services.elasticmapreduce.model.ScriptBootstrapActionConfig;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
import com.amazonaws.services.elasticmapreduce.model.StepExecutionState;
import com.amazonaws.services.elasticmapreduce.model.StepSummary;
import com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsRequest;
import com.amazonaws.services.elasticmapreduce.util.StepFactory;
import com.google.common.annotations.VisibleForTesting;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.StringTokenizer;
import java.util.stream.Collectors;
import org.pentaho.amazon.AbstractAmazonJobEntry;
import org.pentaho.amazon.client.api.EmrClient;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.util.StringUtil;
import org.pentaho.di.core.util.Utils;

public class EmrClientImpl
implements EmrClient {
    private static final String EMR_EC2_DEFAULT_ROLE = "EMR_EC2_DefaultRole";
    private static final String EMR_EFAULT_ROLE = "EMR_DefaultRole";
    private static final String STEP_HIVE = "hive";
    private static final String STEP_EMR = "emr";
    private AmazonElasticMapReduce emrClient;
    private String currentClusterState;
    private String currentStepState;
    private RunJobFlowResult runJobFlowResult;
    private String hadoopJobFlowId;
    private String stepId;
    private List<StepSummary> stepSummaries = null;
    private boolean alive;
    private boolean requestClusterShutdown = false;
    private boolean requestStepCancell = false;

    public EmrClientImpl(AmazonElasticMapReduce emrClient) {
        this.emrClient = emrClient;
    }

    @Override
    public void runJobFlow(String stagingS3FileUrl, String stagingS3BucketUrl, String stepType, String mainClass, String bootstrapActions, AbstractAmazonJobEntry jobEntry) {
        this.alive = jobEntry.getAlive();
        RunJobFlowRequest runJobFlowRequest = this.initEmrCluster(stagingS3FileUrl, stagingS3BucketUrl, stepType, mainClass, bootstrapActions, jobEntry);
        this.runJobFlowResult = this.emrClient.runJobFlow(runJobFlowRequest);
        this.hadoopJobFlowId = this.runJobFlowResult.getJobFlowId();
        this.stepId = this.getCurrentlyRunningStepId();
    }

    @Override
    public String getHadoopJobFlowId() {
        return this.runJobFlowResult.getJobFlowId();
    }

    @Override
    public String getStepId() {
        return this.stepId;
    }

    @Override
    public void addStepToExistingJobFlow(String stagingS3FileUrl, String stagingS3BucketUrl, String stepType, String mainClass, AbstractAmazonJobEntry jobEntry) {
        this.alive = jobEntry.getAlive();
        this.hadoopJobFlowId = jobEntry.getHadoopJobFlowId();
        this.setStepsFromCluster();
        List<StepConfig> steps = this.initSteps(stagingS3FileUrl, stepType, mainClass, jobEntry);
        AddJobFlowStepsRequest addJobFlowStepsRequest = new AddJobFlowStepsRequest();
        addJobFlowStepsRequest.setJobFlowId(this.hadoopJobFlowId);
        addJobFlowStepsRequest.setSteps(steps);
        this.emrClient.addJobFlowSteps(addJobFlowStepsRequest);
        this.stepId = this.getSpecifiedRunningStep();
    }

    @Override
    public boolean isClusterRunning() {
        if (ClusterState.WAITING.name().equalsIgnoreCase(this.currentClusterState)) {
            return false;
        }
        if (ClusterState.TERMINATED.name().equalsIgnoreCase(this.currentClusterState)) {
            return false;
        }
        return !ClusterState.TERMINATED_WITH_ERRORS.name().equalsIgnoreCase(this.currentClusterState);
    }

    @Override
    public boolean isStepRunning() {
        if (StepExecutionState.CANCELLED.name().equalsIgnoreCase(this.currentStepState)) {
            return false;
        }
        if (StepExecutionState.INTERRUPTED.name().equalsIgnoreCase(this.currentStepState)) {
            return false;
        }
        if (StepExecutionState.COMPLETED.name().equalsIgnoreCase(this.currentStepState)) {
            return false;
        }
        return !StepExecutionState.FAILED.name().equalsIgnoreCase(this.currentStepState);
    }

    @Override
    public boolean isRunning() {
        this.currentStepState = this.getActualStepState();
        this.currentClusterState = this.getActualClusterState();
        boolean isClusterRunning = this.isClusterRunning();
        boolean isStepRunning = this.isStepRunning();
        if (!this.alive && !this.requestClusterShutdown && ClusterState.WAITING.name().equalsIgnoreCase(this.currentClusterState) && !isStepRunning) {
            this.terminateJobFlows();
            return this.isClusterRunning();
        }
        return isClusterRunning || isStepRunning;
    }

    @Override
    public String getCurrentClusterState() {
        return this.currentClusterState;
    }

    @Override
    public String getCurrentStepState() {
        return this.currentStepState;
    }

    @Override
    public boolean isClusterTerminated() {
        return ClusterState.TERMINATED.name().equalsIgnoreCase(this.currentClusterState) || ClusterState.TERMINATED_WITH_ERRORS.name().equalsIgnoreCase(this.currentClusterState);
    }

    @Override
    public boolean isStepFailed() {
        return StepExecutionState.FAILED.name().equalsIgnoreCase(this.currentStepState);
    }

    @Override
    public boolean isStepNotSuccess() {
        this.currentStepState = this.getActualStepState();
        if (StepExecutionState.CANCELLED.name().equalsIgnoreCase(this.currentStepState)) {
            return true;
        }
        if (StepExecutionState.INTERRUPTED.name().equalsIgnoreCase(this.currentStepState)) {
            return true;
        }
        return StepExecutionState.FAILED.name().equalsIgnoreCase(this.currentStepState);
    }

    private JobFlowInstancesConfig initEC2Instance(Integer numInsts, String masterInstanceType, String slaveInstanceType) {
        JobFlowInstancesConfig instances = new JobFlowInstancesConfig();
        instances.setInstanceCount(numInsts);
        instances.setMasterInstanceType(masterInstanceType);
        instances.setSlaveInstanceType(slaveInstanceType);
        instances.setKeepJobFlowAliveWhenNoSteps(Boolean.valueOf(this.alive));
        return instances;
    }

    @VisibleForTesting
    RunJobFlowRequest initEmrCluster(String stagingS3FileUrl, String stagingS3BucketUrl, String stepType, String mainClass, String bootstrapActions, AbstractAmazonJobEntry jobEntry) {
        String ec2Role;
        RunJobFlowRequest runJobFlowRequest = new RunJobFlowRequest();
        runJobFlowRequest.setName(jobEntry.getHadoopJobName());
        runJobFlowRequest.setReleaseLabel(jobEntry.getEmrRelease());
        runJobFlowRequest.setLogUri(stagingS3BucketUrl);
        JobFlowInstancesConfig instances = this.initEC2Instance(Integer.parseInt(jobEntry.getNumInstances()), jobEntry.getMasterInstanceType(), jobEntry.getSlaveInstanceType());
        runJobFlowRequest.setInstances(instances);
        List<StepConfig> steps = this.initSteps(stagingS3FileUrl, stepType, mainClass, jobEntry);
        if (steps.size() > 0) {
            runJobFlowRequest.setSteps(steps);
        }
        if (stepType.equals(STEP_HIVE)) {
            List<BootstrapActionConfig> stepBootstrapActions;
            List<Application> applications = this.initApplications();
            if (applications.size() > 0) {
                runJobFlowRequest.setApplications(applications);
            }
            if ((stepBootstrapActions = this.initBootstrapActions(bootstrapActions)) != null && stepBootstrapActions.size() > 0) {
                runJobFlowRequest.setBootstrapActions(stepBootstrapActions);
            }
        }
        if ((ec2Role = jobEntry.getEc2Role()) == null || ec2Role.trim().isEmpty()) {
            runJobFlowRequest.setJobFlowRole(EMR_EC2_DEFAULT_ROLE);
        } else {
            runJobFlowRequest.setJobFlowRole(ec2Role);
        }
        String emrRole = jobEntry.getEmrRole();
        if (emrRole == null || emrRole.trim().isEmpty()) {
            runJobFlowRequest.setServiceRole(EMR_EFAULT_ROLE);
        } else {
            runJobFlowRequest.setServiceRole(emrRole);
        }
        runJobFlowRequest.setVisibleToAllUsers(Boolean.valueOf(true));
        return runJobFlowRequest;
    }

    private StepConfig configureHiveStep(String stagingS3qUrl, String cmdLineArgs) {
        String[] cmdLineArgsArr;
        if (cmdLineArgs == null) {
            cmdLineArgsArr = new String[]{""};
        } else {
            List<String> cmdArgs = Arrays.asList(cmdLineArgs.split("\\s+"));
            List<String> updatedCmdArgs = cmdArgs.stream().map(e -> EmrClientImpl.replaceDoubleS3(e)).collect(Collectors.toList());
            cmdLineArgsArr = updatedCmdArgs.toArray(new String[updatedCmdArgs.size()]);
        }
        StepConfig hiveStepConfig = new StepConfig("Hive", new StepFactory().newRunHiveScriptStep(stagingS3qUrl, cmdLineArgsArr));
        if (this.alive) {
            hiveStepConfig.withActionOnFailure(ActionOnFailure.CANCEL_AND_WAIT);
        } else {
            hiveStepConfig.withActionOnFailure(ActionOnFailure.TERMINATE_JOB_FLOW);
        }
        return hiveStepConfig;
    }

    private StepConfig initHiveStep(String stagingS3qUrl, String cmdLineArgs) {
        StepConfig hiveStepConfig = this.configureHiveStep(stagingS3qUrl, cmdLineArgs);
        return hiveStepConfig;
    }

    private static HadoopJarStepConfig configureHadoopStep(String stagingS3Jar, String mainClass, List<String> jarStepArgs) {
        HadoopJarStepConfig hadoopJarStepConfig = new HadoopJarStepConfig();
        hadoopJarStepConfig.setJar(stagingS3Jar);
        hadoopJarStepConfig.setMainClass(mainClass);
        hadoopJarStepConfig.setArgs(jarStepArgs);
        return hadoopJarStepConfig;
    }

    private StepConfig initHadoopStep(String jarUrl, String mainClass, List<String> jarStepArgs) {
        StepConfig stepConfig = new StepConfig();
        stepConfig.setName("custom jar: " + jarUrl);
        stepConfig.setHadoopJarStep(EmrClientImpl.configureHadoopStep(jarUrl, mainClass, jarStepArgs));
        if (this.alive) {
            stepConfig.withActionOnFailure(ActionOnFailure.CANCEL_AND_WAIT);
        } else {
            stepConfig.withActionOnFailure(ActionOnFailure.TERMINATE_JOB_FLOW);
        }
        return stepConfig;
    }

    @VisibleForTesting
    public static String removeLineBreaks(String multiLineFieldValue) {
        if (StringUtil.isEmpty((String)multiLineFieldValue)) {
            return multiLineFieldValue;
        }
        return multiLineFieldValue.replaceAll("\\s+", " ").trim();
    }

    private List<StepConfig> initSteps(String stagingS3FileUrl, String stepType, String mainClass, AbstractAmazonJobEntry jobEntry) {
        ArrayList<StepConfig> steps = new ArrayList<StepConfig>();
        StepConfig config = null;
        String cmdLineArgs = EmrClientImpl.removeLineBreaks(jobEntry.getCmdLineArgs());
        if (stepType.equals(STEP_HIVE)) {
            config = this.initHiveStep(stagingS3FileUrl, cmdLineArgs);
        }
        if (stepType.equals(STEP_EMR)) {
            List<String> jarStepArgs = this.parseJarStepArgs(cmdLineArgs);
            config = this.initHadoopStep(stagingS3FileUrl, mainClass, jarStepArgs);
        }
        steps.add(config);
        return steps;
    }

    private List<String> parseJarStepArgs(String cmdLineArgs) {
        ArrayList<String> jarStepArgs = new ArrayList<String>();
        if (!StringUtil.isEmpty((String)cmdLineArgs)) {
            StringTokenizer st = new StringTokenizer(cmdLineArgs, " ");
            while (st.hasMoreTokens()) {
                String token = st.nextToken();
                jarStepArgs.add(EmrClientImpl.replaceDoubleS3(token));
            }
        }
        return jarStepArgs;
    }

    private static String replaceDoubleS3(String token) {
        if (token.contains("s3://s3/")) {
            token = token.replace("s3://s3/", "s3://");
        }
        return token;
    }

    private List<Application> initApplications() {
        ArrayList<Application> applications = new ArrayList<Application>();
        Application hive = new Application().withName("Hive");
        applications.add(hive);
        return applications;
    }

    private List<BootstrapActionConfig> initBootstrapActions(String bootstrapActions) {
        List<BootstrapActionConfig> actionConfigs = EmrClientImpl.configBootstrapActions(EmrClientImpl.removeLineBreaks(bootstrapActions));
        return actionConfigs;
    }

    private static List<BootstrapActionConfig> configBootstrapActions(String bootstrapActions) {
        ArrayList<BootstrapActionConfig> bootstrapActionConfigs = new ArrayList<BootstrapActionConfig>();
        if (!StringUtil.isEmpty((String)bootstrapActions)) {
            StringTokenizer st = new StringTokenizer(bootstrapActions, " ");
            String path = "";
            String name = "";
            List<String> args = null;
            int actionCount = 0;
            while (st.hasMoreTokens()) {
                String value;
                String key;
                block13: {
                    key = st.nextToken();
                    value = st.nextToken();
                    try {
                        if (value.startsWith("\"")) {
                            while (!value.endsWith("\"")) {
                                if (st.hasMoreTokens()) {
                                    value = value + " " + st.nextToken();
                                    continue;
                                }
                                throw new RuntimeException("Argument does not end with a double quote: " + key + " " + value);
                            }
                            value = value.substring(1, value.length() - 1);
                        }
                        if (!key.equals("--bootstrap-action")) break block13;
                        if (!Const.isEmpty((String)path)) {
                            ++actionCount;
                            if (name.equals("")) {
                                name = "Bootstrap Action " + actionCount;
                            }
                            BootstrapActionConfig bootstrapActionConfig = EmrClientImpl.configureBootstrapAction(path, name, args);
                            bootstrapActionConfigs.add(bootstrapActionConfig);
                            name = "";
                            args = null;
                        }
                        if (value.startsWith("s3://")) {
                            path = value = EmrClientImpl.replaceDoubleS3(value);
                            break block13;
                        }
                        throw new RuntimeException("s3:// path expected for bootstrap action: " + key + " " + value);
                    }
                    catch (RuntimeException e) {
                        e.printStackTrace();
                        return null;
                    }
                }
                if (key.equals("--bootstrap-name")) {
                    name = value;
                }
                if (!key.equals("--args")) continue;
                args = EmrClientImpl.configArgs(value, ",");
            }
            if (!Utils.isEmpty((CharSequence)path)) {
                ++actionCount;
                if (name.equals("")) {
                    name = "Bootstrap Action " + actionCount;
                }
                BootstrapActionConfig bootstrapActionConfig = EmrClientImpl.configureBootstrapAction(path, name, args);
                bootstrapActionConfigs.add(bootstrapActionConfig);
            }
        }
        return bootstrapActionConfigs;
    }

    private static BootstrapActionConfig configureBootstrapAction(String path, String name, List<String> args) {
        ScriptBootstrapActionConfig scriptBootstrapActionConfig = new ScriptBootstrapActionConfig();
        BootstrapActionConfig bootstrapActionConfig = new BootstrapActionConfig();
        scriptBootstrapActionConfig.setPath(path);
        scriptBootstrapActionConfig.setArgs(args);
        bootstrapActionConfig.setName(name);
        bootstrapActionConfig.setScriptBootstrapAction(scriptBootstrapActionConfig);
        return bootstrapActionConfig;
    }

    private static List<String> configArgs(String args, String separator) {
        ArrayList<String> argList = new ArrayList<String>();
        if (!StringUtil.isEmpty((String)args)) {
            StringTokenizer st = new StringTokenizer(args, separator);
            while (st.hasMoreTokens()) {
                String token = st.nextToken();
                argList.add(token);
            }
        }
        return argList;
    }

    private static BootstrapActionConfig createBootstrapAction(String path, String name, List<String> args) {
        ScriptBootstrapActionConfig scriptBootstrapActionConfig = new ScriptBootstrapActionConfig();
        BootstrapActionConfig bootstrapActionConfig = new BootstrapActionConfig();
        if (!path.isEmpty()) {
            scriptBootstrapActionConfig.setPath(path);
            scriptBootstrapActionConfig.setArgs(args);
        }
        bootstrapActionConfig.setName(name);
        bootstrapActionConfig.setScriptBootstrapAction(scriptBootstrapActionConfig);
        return bootstrapActionConfig;
    }

    private List<StepSummary> getSteps() {
        ListStepsRequest listStepsRequest = new ListStepsRequest();
        listStepsRequest.setClusterId(this.hadoopJobFlowId);
        ListStepsResult listStepsResult = this.emrClient.listSteps(listStepsRequest);
        List stepSummaries = listStepsResult.getSteps();
        if (stepSummaries.isEmpty()) {
            return null;
        }
        return stepSummaries;
    }

    private void setStepsFromCluster() {
        this.stepSummaries = this.getSteps();
    }

    private String getCurrentlyRunningStepId() {
        return this.getSteps().get(0).getId();
    }

    private String getSpecifiedRunningStep() {
        List<StepSummary> currentSteps = this.getSteps();
        currentSteps.removeAll(this.stepSummaries);
        if (currentSteps.isEmpty()) {
            return null;
        }
        return currentSteps.get(0).getId();
    }

    private void terminateJobFlows() {
        if (!this.requestClusterShutdown) {
            TerminateJobFlowsRequest terminateJobFlowsRequest = new TerminateJobFlowsRequest();
            terminateJobFlowsRequest.withJobFlowIds(new String[]{this.hadoopJobFlowId});
            this.emrClient.terminateJobFlows(terminateJobFlowsRequest);
            this.currentClusterState = this.getActualClusterState();
            this.requestClusterShutdown = true;
        }
    }

    private void cancelStepExecution() {
        if (!this.requestStepCancell) {
            CancelStepsRequest cancelStepsRequest = new CancelStepsRequest();
            cancelStepsRequest.setClusterId(this.hadoopJobFlowId);
            ArrayList<String> stepIds = new ArrayList<String>();
            stepIds.add(this.stepId);
            cancelStepsRequest.setStepIds(stepIds);
            this.emrClient.cancelSteps(cancelStepsRequest);
            this.requestStepCancell = true;
        }
    }

    @Override
    public boolean stopSteps() {
        if (this.alive) {
            this.cancelStepExecution();
            return true;
        }
        this.terminateJobFlows();
        return false;
    }

    private String getActualClusterState() {
        String clusterState = null;
        DescribeClusterRequest describeClusterRequest = new DescribeClusterRequest();
        describeClusterRequest.setClusterId(this.hadoopJobFlowId);
        DescribeClusterResult describeClusterResult = this.emrClient.describeCluster(describeClusterRequest);
        if (describeClusterResult != null) {
            clusterState = describeClusterResult.getCluster().getStatus().getState();
        }
        return clusterState;
    }

    private String getActualStepState() {
        String stepState = null;
        DescribeStepRequest describeStepRequest = new DescribeStepRequest();
        describeStepRequest.setClusterId(this.hadoopJobFlowId);
        describeStepRequest.setStepId(this.stepId);
        DescribeStepResult describeStepResult = this.emrClient.describeStep(describeStepRequest);
        if (describeStepResult != null) {
            stepState = describeStepResult.getStep().getStatus().getState();
        }
        return stepState;
    }

    @Override
    public String getJobFlowLogUri() throws URISyntaxException {
        DescribeClusterRequest clusterRequest = new DescribeClusterRequest();
        clusterRequest.setClusterId(this.hadoopJobFlowId);
        DescribeClusterResult clusterResult = this.emrClient.describeCluster(clusterRequest);
        String clusterLogUri = clusterResult.getCluster().getLogUri();
        String clusterLogBucket = new URI(clusterLogUri).getHost();
        return clusterLogBucket;
    }
}

