/*
 * Decompiled with CFR 0.152.
 */
package org.pentaho.amazon;

import java.io.File;
import java.io.IOException;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.commons.vfs2.FileSystemOptions;
import org.apache.commons.vfs2.UserAuthenticator;
import org.apache.commons.vfs2.auth.StaticUserAuthenticator;
import org.apache.commons.vfs2.impl.DefaultFileSystemConfigBuilder;
import org.apache.log4j.Appender;
import org.pentaho.amazon.AbstractAmazonJobEntry;
import org.pentaho.amazon.client.ClientFactoriesManager;
import org.pentaho.amazon.client.ClientType;
import org.pentaho.amazon.client.api.EmrClient;
import org.pentaho.amazon.client.api.S3Client;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.Result;
import org.pentaho.di.core.ResultFile;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettleFileException;
import org.pentaho.di.core.logging.Log4jFileAppender;
import org.pentaho.di.core.logging.LogWriter;
import org.pentaho.di.core.variables.VariableSpace;
import org.pentaho.di.core.vfs.KettleVFS;
import org.pentaho.di.i18n.BaseMessages;

public abstract class AbstractAmazonJobExecutor
extends AbstractAmazonJobEntry {
    private static Class<?> PKG = AbstractAmazonJobExecutor.class;
    private Log4jFileAppender appender = null;
    private S3Client s3Client;
    protected EmrClient emrClient;
    protected String key;
    protected int numInsts = 2;

    public void setupLogFile() {
        String logFileName = "pdi-" + this.getName();
        try {
            this.appender = LogWriter.createFileAppender((String)logFileName, (boolean)true, (boolean)false);
            LogWriter.getInstance().addAppender((Appender)this.appender);
            this.log.setLogLevel(this.parentJob.getLogLevel());
        }
        catch (Exception e) {
            this.logError(BaseMessages.getString(PKG, (String)"AbstractAmazonJobExecutor.FailedToOpenLogFile", (String[])new String[]{logFileName, e.toString()}));
            this.logError(Const.getStackTracker((Throwable)e));
        }
    }

    public String getStagingBucketName() throws FileSystemException, KettleException {
        String bucketName = "";
        String pathToStagingDir = this.getS3FileObjectPath();
        bucketName = pathToStagingDir.substring(1, pathToStagingDir.length()).split("/")[0];
        return bucketName;
    }

    private String getS3FileObjectPath() throws FileSystemException, KettleFileException {
        FileSystemOptions opts = new FileSystemOptions();
        DefaultFileSystemConfigBuilder.getInstance().setUserAuthenticator(opts, (UserAuthenticator)new StaticUserAuthenticator(null, this.getAWSAccessKeyId(), this.getAWSSecretKey()));
        FileObject stagingDirFileObject = KettleVFS.getFileObject((String)this.stagingDir, (VariableSpace)this.getVariables(), (FileSystemOptions)opts);
        return stagingDirFileObject.getName().getPath();
    }

    private String getKeyFromS3StagingDir() throws KettleFileException, FileSystemException {
        String pathToStagingDir = this.getS3FileObjectPath();
        StringBuilder sb = new StringBuilder(pathToStagingDir);
        sb.replace(0, 1, "");
        if (sb.indexOf("/") == -1) {
            return null;
        }
        sb.replace(0, sb.indexOf("/") + 1, "");
        if (sb.length() > 0) {
            return sb.toString();
        }
        return null;
    }

    protected void setS3BucketKey(FileObject stagingFile) throws KettleFileException, FileSystemException {
        StringBuilder sb;
        String keyFromStagingDir = this.getKeyFromS3StagingDir();
        if (keyFromStagingDir == null) {
            keyFromStagingDir = "";
        }
        if ((sb = new StringBuilder(keyFromStagingDir)).length() > 0) {
            sb.append("/");
        }
        sb.append(stagingFile.getName().getBaseName());
        this.key = sb.toString();
    }

    public String getStagingS3BucketUrl(String stagingBucketName) {
        return "s3://" + stagingBucketName;
    }

    public String getStagingS3FileUrl(String stagingBucketName) {
        return "s3://" + stagingBucketName + "/" + this.key;
    }

    public String buildFilename(String filename) {
        filename = this.environmentSubstitute(filename);
        return filename;
    }

    public abstract File createStagingFile() throws IOException, KettleException;

    public abstract String getStepBootstrapActions();

    public abstract String getMainClass() throws Exception;

    public abstract String getStepType();

    private void runNewJobFlow(String stagingS3FileUrl, String stagingS3BucketUrl) throws Exception {
        this.emrClient.runJobFlow(stagingS3FileUrl, stagingS3BucketUrl, this.getStepType(), this.getMainClass(), this.getStepBootstrapActions(), this);
    }

    private void addStepToExistingJobFlow(String stagingS3FileUrl, String stagingS3BucketUrl) throws Exception {
        this.emrClient.addStepToExistingJobFlow(stagingS3FileUrl, stagingS3BucketUrl, this.getStepType(), this.getMainClass(), this);
    }

    private void logError(String stagingBucketName, String stepId) {
        this.logError(this.s3Client.readStepLogsFromS3(stagingBucketName, this.hadoopJobFlowId, stepId));
    }

    private void initAmazonClients() {
        ClientFactoriesManager manager = ClientFactoriesManager.getInstance();
        this.s3Client = (S3Client)manager.createClient(this.getAWSAccessKeyId(), this.getAWSSecretKey(), this.region, ClientType.S3);
        this.emrClient = (EmrClient)manager.createClient(this.getAWSAccessKeyId(), this.getAWSSecretKey(), this.region, ClientType.EMR);
    }

    public Result execute(Result result, int arg1) throws KettleException {
        block21: {
            this.setupLogFile();
            try {
                this.initAmazonClients();
                String stagingBucketName = this.getStagingBucketName();
                String stagingS3BucketUrl = this.getStagingS3BucketUrl(stagingBucketName);
                this.s3Client.createBucketIfNotExists(stagingBucketName);
                File tmpFile = this.createStagingFile();
                try {
                    this.s3Client.deleteObjectFromBucket(stagingBucketName, this.key);
                }
                catch (Exception ex) {
                    this.logError(Const.getStackTracker((Throwable)ex));
                }
                this.s3Client.putObjectInBucket(stagingBucketName, this.key, tmpFile);
                String stagingS3FileUrl = this.getStagingS3FileUrl(stagingBucketName);
                if (this.runOnNewCluster) {
                    String numInstancesS = this.environmentSubstitute(this.numInstances);
                    try {
                        this.numInsts = Integer.parseInt(numInstancesS);
                    }
                    catch (NumberFormatException e) {
                        this.logError(BaseMessages.getString(PKG, (String)"AbstractAmazonJobExecutor.InstanceNumber.Error", (String[])new String[]{numInstancesS}));
                    }
                    this.runNewJobFlow(stagingS3FileUrl, stagingS3BucketUrl);
                    this.hadoopJobFlowId = this.emrClient.getHadoopJobFlowId();
                } else {
                    this.addStepToExistingJobFlow(stagingS3FileUrl, stagingS3BucketUrl);
                }
                String loggingIntervalS = this.environmentSubstitute(this.loggingInterval);
                int logIntv = 10;
                try {
                    logIntv = Integer.parseInt(loggingIntervalS);
                }
                catch (NumberFormatException ex) {
                    this.logError(BaseMessages.getString(PKG, (String)"AbstractAmazonJobExecutor.LoggingInterval.Error", (String[])new String[]{loggingIntervalS}));
                }
                if (!this.blocking) break block21;
                try {
                    if (this.log.isBasic()) {
                        while (this.emrClient.isRunning()) {
                            if (this.isJobStoppedByUser()) {
                                this.setResultError(result);
                                break;
                            }
                            if (this.emrClient.getCurrentClusterState() == null || this.emrClient.getCurrentClusterState().isEmpty()) break;
                            this.logBasic(this.hadoopJobName + " " + BaseMessages.getString(PKG, (String)"AbstractAmazonJobExecutor.JobFlowExecutionStatus", (String[])new String[]{this.hadoopJobFlowId}) + this.emrClient.getCurrentClusterState() + " ");
                            this.logBasic(this.hadoopJobName + " " + BaseMessages.getString(PKG, (String)"AbstractAmazonJobExecutor.JobFlowStepStatus", (String[])new String[]{this.emrClient.getStepId()}) + this.emrClient.getCurrentStepState() + " ");
                            try {
                                Thread.sleep(logIntv * 1000);
                            }
                            catch (InterruptedException ie) {
                                this.logError(Const.getStackTracker((Throwable)ie));
                            }
                        }
                        if (this.emrClient.isClusterTerminated() && this.emrClient.isStepNotSuccess()) {
                            this.setResultError(result);
                            this.logError(this.hadoopJobName + " " + BaseMessages.getString(PKG, (String)"AbstractAmazonJobExecutor.JobFlowExecutionStatus", (String[])new String[]{this.hadoopJobFlowId}) + this.emrClient.getCurrentClusterState());
                        }
                        if (this.emrClient.isStepNotSuccess()) {
                            this.setResultError(result);
                            this.logBasic(this.hadoopJobName + " " + BaseMessages.getString(PKG, (String)"AbstractAmazonJobExecutor.JobFlowStepStatus", (String[])new String[]{this.emrClient.getStepId()}) + this.emrClient.getCurrentStepState() + " ");
                            if (this.emrClient.isStepFailed()) {
                                this.logError(this.emrClient.getJobFlowLogUri(), this.emrClient.getStepId());
                            }
                        }
                    }
                }
                catch (Exception e) {
                    this.logError(e.getMessage(), e);
                }
            }
            catch (Throwable t) {
                t.printStackTrace();
                this.setResultError(result);
                this.logError(t.getMessage(), t);
            }
        }
        if (this.appender != null) {
            LogWriter.getInstance().removeAppender((Appender)this.appender);
            this.appender.close();
            ResultFile resultFile = new ResultFile(1, this.appender.getFile(), this.parentJob.getJobname(), this.getName());
            result.getResultFiles().put(resultFile.getFile().toString(), resultFile);
        }
        return result;
    }

    private boolean isJobStoppedByUser() {
        if (this.getParentJob().isInterrupted() || this.getParentJob().isStopped()) {
            return this.emrClient.stopSteps();
        }
        return false;
    }

    private void setResultError(Result result) {
        result.setStopped(true);
        result.setNrErrors(1L);
        result.setResult(false);
    }
}

