/*
 * Decompiled with CFR 0.152.
 */
package org.apache.oozie.service;

import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.BundleActionBean;
import org.apache.oozie.BundleJobBean;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.coord.CoordActionInputCheckCommand;
import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
import org.apache.oozie.command.coord.CoordActionReadyCommand;
import org.apache.oozie.command.coord.CoordActionReadyXCommand;
import org.apache.oozie.command.coord.CoordActionStartCommand;
import org.apache.oozie.command.coord.CoordActionStartXCommand;
import org.apache.oozie.command.coord.CoordKillXCommand;
import org.apache.oozie.command.coord.CoordResumeXCommand;
import org.apache.oozie.command.coord.CoordSubmitXCommand;
import org.apache.oozie.command.coord.CoordSuspendXCommand;
import org.apache.oozie.command.wf.ActionEndCommand;
import org.apache.oozie.command.wf.ActionEndXCommand;
import org.apache.oozie.command.wf.ActionStartCommand;
import org.apache.oozie.command.wf.ActionStartXCommand;
import org.apache.oozie.command.wf.KillXCommand;
import org.apache.oozie.command.wf.ResumeXCommand;
import org.apache.oozie.command.wf.SignalCommand;
import org.apache.oozie.command.wf.SignalXCommand;
import org.apache.oozie.command.wf.SuspendXCommand;
import org.apache.oozie.executor.jpa.BundleActionsGetWaitingOlderJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionsGetForRecoveryJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionsGetReadyGroupbyJobIDJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionsGetPendingJPAExecutor;
import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.InstrumentationService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.SchedulerService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.JobUtils;
import org.apache.oozie.util.XCallable;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Attribute;
import org.jdom.Element;
import org.jdom.JDOMException;

public class RecoveryService
implements Service {
    public static final String CONF_PREFIX = "oozie.service.RecoveryService.";
    public static final String CONF_PREFIX_WF_ACTIONS = "oozie.service.wf.actions.";
    public static final String CONF_PREFIX_COORD = "oozie.service.coord.";
    public static final String CONF_PREFIX_BUNDLE = "oozie.service.bundle.";
    public static final String CONF_SERVICE_INTERVAL = "oozie.service.RecoveryService.interval";
    public static final String CONF_CALLABLE_BATCH_SIZE = "oozie.service.RecoveryService.callable.batch.size";
    public static final String CONF_WF_ACTIONS_OLDER_THAN = "oozie.service.wf.actions.older.than";
    public static final String CONF_COORD_OLDER_THAN = "oozie.service.coord.older.than";
    public static final String CONF_BUNDLE_OLDER_THAN = "oozie.service.bundle.older.than";
    private static final String INSTRUMENTATION_GROUP = "recovery";
    private static final String INSTR_RECOVERED_ACTIONS_COUNTER = "actions";
    private static final String INSTR_RECOVERED_COORD_ACTIONS_COUNTER = "coord_actions";
    private static final String INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER = "bundle_actions";
    private static boolean useXCommand = true;

    @Override
    public void init(Services services) {
        Configuration conf = services.getConf();
        RecoveryRunnable recoveryRunnable = new RecoveryRunnable(conf.getInt(CONF_WF_ACTIONS_OLDER_THAN, 120), conf.getInt(CONF_COORD_OLDER_THAN, 600), conf.getInt(CONF_BUNDLE_OLDER_THAN, 600));
        services.get(SchedulerService.class).schedule(recoveryRunnable, 10L, (long)conf.getInt(CONF_SERVICE_INTERVAL, 600), SchedulerService.Unit.SEC);
        if (!Services.get().getConf().getBoolean("oozie.useXCommand", true)) {
            useXCommand = false;
        }
    }

    @Override
    public void destroy() {
    }

    @Override
    public Class<? extends Service> getInterface() {
        return RecoveryService.class;
    }

    private static Configuration mergeConfig(Element coordElem, BundleJobBean bundleJob) throws CommandException {
        XLog.Info.get().clear();
        XLog log = XLog.getLog("RecoveryService");
        String jobConf = bundleJob.getConf();
        XConfiguration runConf = null;
        try {
            runConf = new XConfiguration(new StringReader(jobConf));
        }
        catch (IOException e1) {
            log.warn("Configuration parse error in:" + jobConf);
            throw new CommandException(ErrorCode.E1306, e1.getMessage(), e1);
        }
        Element localConfigElement = coordElem.getChild("configuration", coordElem.getNamespace());
        if (localConfigElement != null) {
            XConfiguration localConf;
            String strConfig = XmlUtils.prettyPrint(localConfigElement).toString();
            try {
                localConf = new XConfiguration(new StringReader(strConfig));
            }
            catch (IOException e1) {
                log.warn("Configuration parse error in:" + strConfig);
                throw new CommandException(ErrorCode.E1307, e1.getMessage(), e1);
            }
            XConfiguration.copy(localConf, runConf);
        }
        String appPath = coordElem.getChild("app-path", coordElem.getNamespace()).getValue();
        runConf.set("oozie.coord.application.path", appPath);
        try {
            JobUtils.normalizeAppPath(runConf.get("user.name"), runConf.get("group.name"), runConf);
        }
        catch (IOException e) {
            throw new CommandException(ErrorCode.E1001, runConf.get("oozie.coord.application.path"));
        }
        return runConf;
    }

    static class RecoveryRunnable
    implements Runnable {
        private final long olderThan;
        private final long coordOlderThan;
        private final long bundleOlderThan;
        private long delay = 0L;
        private List<XCallable<?>> callables;
        private List<XCallable<?>> delayedCallables;
        private StringBuilder msg = null;
        private JPAService jpaService = null;

        public RecoveryRunnable(long olderThan, long coordOlderThan, long bundleOlderThan) {
            this.olderThan = olderThan;
            this.coordOlderThan = coordOlderThan;
            this.bundleOlderThan = bundleOlderThan;
        }

        @Override
        public void run() {
            XLog.Info.get().clear();
            XLog log = XLog.getLog(this.getClass());
            this.msg = new StringBuilder();
            this.jpaService = Services.get().get(JPAService.class);
            this.runWFRecovery();
            this.runCoordActionRecovery();
            this.runCoordActionRecoveryForReady();
            this.runBundleRecovery();
            log.debug("QUEUING [{0}] for potential recovery", this.msg.toString());
            boolean ret = false;
            if (null != this.callables) {
                ret = Services.get().get(CallableQueueService.class).queueSerial(this.callables);
                if (!ret) {
                    log.warn("Unable to queue the callables commands for RecoveryService. Most possibly command queue is full. Queue size is :" + Services.get().get(CallableQueueService.class).queueSize());
                }
                this.callables = null;
            }
            if (null != this.delayedCallables) {
                ret = Services.get().get(CallableQueueService.class).queueSerial(this.delayedCallables, this.delay);
                if (!ret) {
                    log.warn("Unable to queue the delayedCallables commands for RecoveryService. Most possibly Callable queue is full. Queue size is :" + Services.get().get(CallableQueueService.class).queueSize());
                }
                this.delayedCallables = null;
                this.delay = 0L;
            }
        }

        private void runBundleRecovery() {
            XLog.Info.get().clear();
            XLog log = XLog.getLog(this.getClass());
            try {
                List<BundleActionBean> bactions = this.jpaService.execute(new BundleActionsGetWaitingOlderJPAExecutor(this.bundleOlderThan));
                this.msg.append(", BUNDLE_ACTIONS : " + bactions.size());
                for (BundleActionBean baction : bactions) {
                    Services.get().get(InstrumentationService.class).get().incr(RecoveryService.INSTRUMENTATION_GROUP, RecoveryService.INSTR_RECOVERED_BUNDLE_ACTIONS_COUNTER, 1L);
                    if (baction.getStatus() == Job.Status.PREP) {
                        BundleJobBean bundleJob = null;
                        try {
                            if (this.jpaService != null) {
                                bundleJob = this.jpaService.execute(new BundleJobGetJPAExecutor(baction.getBundleId()));
                            }
                            if (bundleJob == null) continue;
                            Element bAppXml = XmlUtils.parseXml(bundleJob.getJobXml());
                            List coordElems = bAppXml.getChildren("coordinator", bAppXml.getNamespace());
                            for (Element coordElem : coordElems) {
                                Attribute name = coordElem.getAttribute("name");
                                if (!name.getValue().equals(baction.getCoordName())) continue;
                                Configuration coordConf = RecoveryService.mergeConfig(coordElem, bundleJob);
                                coordConf.set("oozie.bundle.id", baction.getBundleId());
                                this.queueCallable(new CoordSubmitXCommand(coordConf, bundleJob.getAuthToken(), bundleJob.getId(), name.getValue()));
                            }
                            continue;
                        }
                        catch (JDOMException jex) {
                            throw new CommandException(ErrorCode.E1301, new Object[]{jex});
                        }
                        catch (JPAExecutorException je) {
                            throw new CommandException(je);
                        }
                    }
                    if (baction.getStatus() == Job.Status.KILLED) {
                        this.queueCallable(new CoordKillXCommand(baction.getCoordId()));
                        continue;
                    }
                    if (baction.getStatus() == Job.Status.SUSPENDED) {
                        this.queueCallable(new CoordSuspendXCommand(baction.getCoordId()));
                        continue;
                    }
                    if (baction.getStatus() != Job.Status.RUNNING) continue;
                    this.queueCallable(new CoordResumeXCommand(baction.getCoordId()));
                }
            }
            catch (Exception ex) {
                log.error("Exception, {0}", ex.getMessage(), ex);
            }
        }

        private void runCoordActionRecovery() {
            XLog.Info.get().clear();
            XLog log = XLog.getLog(this.getClass());
            try {
                List<CoordinatorActionBean> cactions = this.jpaService.execute(new CoordActionsGetForRecoveryJPAExecutor(this.coordOlderThan));
                this.msg.append(", COORD_ACTIONS : " + cactions.size());
                for (CoordinatorActionBean caction : cactions) {
                    Services.get().get(InstrumentationService.class).get().incr(RecoveryService.INSTRUMENTATION_GROUP, RecoveryService.INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1L);
                    if (caction.getStatus() == CoordinatorAction.Status.WAITING) {
                        if (useXCommand) {
                            this.queueCallable(new CoordActionInputCheckXCommand(caction.getId()));
                        } else {
                            this.queueCallable(new CoordActionInputCheckCommand(caction.getId()));
                        }
                        log.info("Recover a WAITTING coord action and resubmit CoordActionInputCheckXCommand :" + caction.getId());
                        continue;
                    }
                    if (caction.getStatus() == CoordinatorAction.Status.SUBMITTED) {
                        CoordinatorJobBean coordJob = this.jpaService.execute(new CoordJobGetJPAExecutor(caction.getJobId()));
                        if (useXCommand) {
                            this.queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(), coordJob.getAuthToken()));
                        } else {
                            this.queueCallable(new CoordActionStartCommand(caction.getId(), coordJob.getUser(), coordJob.getAuthToken()));
                        }
                        log.info("Recover a SUBMITTED coord action and resubmit CoordActionStartCommand :" + caction.getId());
                        continue;
                    }
                    if (caction.getStatus() == CoordinatorAction.Status.SUSPENDED) {
                        if (caction.getExternalId() == null) continue;
                        this.queueCallable(new SuspendXCommand(caction.getExternalId()));
                        log.debug("Recover a SUSPENDED coord action and resubmit SuspendXCommand :" + caction.getId());
                        continue;
                    }
                    if (caction.getStatus() == CoordinatorAction.Status.KILLED) {
                        if (caction.getExternalId() == null) continue;
                        this.queueCallable(new KillXCommand(caction.getExternalId()));
                        log.debug("Recover a KILLED coord action and resubmit KillXCommand :" + caction.getId());
                        continue;
                    }
                    if (caction.getStatus() != CoordinatorAction.Status.RUNNING || caction.getExternalId() == null) continue;
                    this.queueCallable(new ResumeXCommand(caction.getExternalId()));
                    log.debug("Recover a RUNNING coord action and resubmit ResumeXCommand :" + caction.getId());
                }
            }
            catch (Exception ex) {
                log.error("Exception, {0}", ex.getMessage(), ex);
            }
        }

        private void runCoordActionRecoveryForReady() {
            XLog.Info.get().clear();
            XLog log = XLog.getLog(this.getClass());
            try {
                List<String> jobids = this.jpaService.execute(new CoordActionsGetReadyGroupbyJobIDJPAExecutor(this.coordOlderThan));
                this.msg.append(", COORD_READY_JOBS : " + jobids.size());
                for (String jobid : jobids) {
                    if (useXCommand) {
                        this.queueCallable(new CoordActionReadyXCommand(jobid));
                    } else {
                        this.queueCallable(new CoordActionReadyCommand(jobid));
                    }
                    log.info("Recover READY coord actions for jobid :" + jobid);
                }
            }
            catch (Exception ex) {
                log.error("Exception, {0}", ex.getMessage(), ex);
            }
        }

        private void runWFRecovery() {
            XLog.Info.get().clear();
            XLog log = XLog.getLog(this.getClass());
            try {
                List<WorkflowActionBean> actions = null;
                try {
                    actions = this.jpaService.execute(new WorkflowActionsGetPendingJPAExecutor(this.olderThan));
                }
                catch (JPAExecutorException ex) {
                    log.warn((Object)"Exception while reading pending actions from storage", ex);
                }
                this.msg.append(" WF_ACTIONS " + actions.size());
                for (WorkflowActionBean action : actions) {
                    Date nextRunTime;
                    Services.get().get(InstrumentationService.class).get().incr(RecoveryService.INSTRUMENTATION_GROUP, RecoveryService.INSTR_RECOVERED_ACTIONS_COUNTER, 1L);
                    if (action.getStatus() == WorkflowAction.Status.PREP || action.getStatus() == WorkflowAction.Status.START_MANUAL) {
                        if (useXCommand) {
                            this.queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
                            continue;
                        }
                        this.queueCallable(new ActionStartCommand(action.getId(), action.getType()));
                        continue;
                    }
                    if (action.getStatus() == WorkflowAction.Status.START_RETRY) {
                        nextRunTime = action.getPendingAge();
                        if (useXCommand) {
                            this.queueCallable(new ActionStartXCommand(action.getId(), action.getType()), nextRunTime.getTime() - System.currentTimeMillis());
                            continue;
                        }
                        this.queueCallable(new ActionStartCommand(action.getId(), action.getType()), nextRunTime.getTime() - System.currentTimeMillis());
                        continue;
                    }
                    if (action.getStatus() == WorkflowAction.Status.DONE || action.getStatus() == WorkflowAction.Status.END_MANUAL) {
                        if (useXCommand) {
                            this.queueCallable(new ActionEndXCommand(action.getId(), action.getType()));
                            continue;
                        }
                        this.queueCallable(new ActionEndCommand(action.getId(), action.getType()));
                        continue;
                    }
                    if (action.getStatus() == WorkflowAction.Status.END_RETRY) {
                        nextRunTime = action.getPendingAge();
                        if (useXCommand) {
                            this.queueCallable(new ActionEndXCommand(action.getId(), action.getType()), nextRunTime.getTime() - System.currentTimeMillis());
                            continue;
                        }
                        this.queueCallable(new ActionEndCommand(action.getId(), action.getType()), nextRunTime.getTime() - System.currentTimeMillis());
                        continue;
                    }
                    if (action.getStatus() == WorkflowAction.Status.OK || action.getStatus() == WorkflowAction.Status.ERROR) {
                        if (useXCommand) {
                            this.queueCallable(new SignalXCommand(action.getJobId(), action.getId()));
                            continue;
                        }
                        this.queueCallable(new SignalCommand(action.getJobId(), action.getId()));
                        continue;
                    }
                    if (action.getStatus() != WorkflowAction.Status.USER_RETRY) continue;
                    this.queueCallable(new ActionStartXCommand(action.getId(), action.getType()));
                }
            }
            catch (Exception ex) {
                log.error("Exception, {0}", ex.getMessage(), ex);
            }
        }

        private void queueCallable(XCallable<?> callable) {
            if (this.callables == null) {
                this.callables = new ArrayList();
            }
            this.callables.add(callable);
            if (this.callables.size() == Services.get().getConf().getInt(RecoveryService.CONF_CALLABLE_BATCH_SIZE, 10)) {
                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(this.callables);
                if (!ret) {
                    XLog.getLog(this.getClass()).warn("Unable to queue the callables commands for RecoveryService. Most possibly command queue is full. Queue size is :" + Services.get().get(CallableQueueService.class).queueSize());
                }
                this.callables = new ArrayList();
            }
        }

        private void queueCallable(XCallable<?> callable, long delay) {
            if (this.delayedCallables == null) {
                this.delayedCallables = new ArrayList();
            }
            this.delay = Math.max(this.delay, delay);
            this.delayedCallables.add(callable);
            if (this.delayedCallables.size() == Services.get().getConf().getInt(RecoveryService.CONF_CALLABLE_BATCH_SIZE, 10)) {
                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(this.delayedCallables, this.delay);
                if (!ret) {
                    XLog.getLog(this.getClass()).warn("Unable to queue the delayedCallables commands for RecoveryService. Most possibly Callable queue is full. Queue size is :" + Services.get().get(CallableQueueService.class).queueSize());
                }
                this.delayedCallables = new ArrayList();
                this.delay = 0L;
            }
        }
    }
}

