/*
 * Decompiled with CFR 0.152.
 */
package org.apache.oozie.service;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.oozie.BundleJobBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.command.bundle.BundlePauseXCommand;
import org.apache.oozie.command.bundle.BundleStartXCommand;
import org.apache.oozie.command.bundle.BundleUnpauseXCommand;
import org.apache.oozie.command.coord.CoordPauseXCommand;
import org.apache.oozie.command.coord.CoordUnpauseXCommand;
import org.apache.oozie.executor.jpa.BundleJobsGetNeedStartJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobsGetPausedJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobsGetUnpausedJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobsGetPausedJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobsGetUnpausedJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.lock.LockToken;
import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.MemoryLocksService;
import org.apache.oozie.service.SchedulerService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.ConfigUtils;
import org.apache.oozie.util.XCallable;
import org.apache.oozie.util.XLog;

public class PauseTransitService
implements Service {
    public static final String CONF_PREFIX = "oozie.service.PauseTransitService.";
    public static final String CONF_BUNDLE_PAUSE_START_INTERVAL = "oozie.service.PauseTransitService.PauseTransit.interval";
    private static final XLog LOG = XLog.getLog(PauseTransitService.class);
    public static final String CONF_CALLABLE_BATCH_SIZE = "oozie.service.PauseTransitService.callable.batch.size";

    @Override
    public void init(Services services) {
        PauseTransitRunnable bundlePauseStartRunnable = new PauseTransitRunnable();
        services.get(SchedulerService.class).schedule(bundlePauseStartRunnable, 10L, (long)ConfigurationService.getInt(services.getConf(), CONF_BUNDLE_PAUSE_START_INTERVAL), SchedulerService.Unit.SEC);
    }

    @Override
    public void destroy() {
    }

    @Override
    public Class<? extends Service> getInterface() {
        return PauseTransitService.class;
    }

    @VisibleForTesting
    public static class PauseTransitRunnable
    implements Runnable {
        private JPAService jpaService = Services.get().get(JPAService.class);
        private LockToken lock;
        private List<XCallable<Void>> callables;

        public PauseTransitRunnable() {
            if (this.jpaService == null) {
                LOG.error("Missing JPAService");
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            block9: {
                try {
                    this.lock = Services.get().get(MemoryLocksService.class).getWriteLock(PauseTransitService.class.getName(), Service.lockTimeout);
                    if (this.lock == null) {
                        LOG.info("This PauseTransitService instance willnot run since there is already an instance running");
                    } else {
                        LOG.info("Acquired lock for [{0}]", PauseTransitService.class.getName());
                        this.updateBundle();
                        this.updateCoord();
                        if (null != this.callables) {
                            boolean ret = Services.get().get(CallableQueueService.class).queueSerial(this.callables);
                            if (!ret) {
                                XLog.getLog(this.getClass()).warn("Unable to queue the callables commands for PauseTransitService. Most possibly command queue is full. Queue size is :" + Services.get().get(CallableQueueService.class).queueSize());
                            }
                            this.callables = null;
                        }
                    }
                    if (this.lock == null) break block9;
                    this.lock.release();
                }
                catch (Exception ex) {
                    try {
                        LOG.warn((Object)"Exception happened when pausing/unpausing/starting bundle/coord jobs", ex);
                        if (this.lock == null) break block9;
                        this.lock.release();
                    }
                    catch (Throwable throwable) {
                        if (this.lock != null) {
                            this.lock.release();
                            LOG.info("Released lock for [{0}]", PauseTransitService.class.getName());
                        }
                        throw throwable;
                    }
                    LOG.info("Released lock for [{0}]", PauseTransitService.class.getName());
                }
                LOG.info("Released lock for [{0}]", PauseTransitService.class.getName());
            }
        }

        private void updateBundle() {
            Date d = new Date();
            List<BundleJobBean> jobList = null;
            try {
                jobList = this.jpaService.execute(new BundleJobsGetUnpausedJPAExecutor(-1));
                if (jobList != null) {
                    for (BundleJobBean bundleJob : jobList) {
                        if (bundleJob.getPauseTime() == null || bundleJob.getPauseTime().after(d)) continue;
                        this.queueCallable(new BundlePauseXCommand(bundleJob));
                        LOG.debug("Queuing BundlePauseXCommand for bundle job = " + bundleJob.getId());
                    }
                }
            }
            catch (JPAExecutorException ex) {
                LOG.warn((Object)"JPAExecutorException happened when pausing/unpausing/starting Bundle jobs", ex);
            }
            try {
                jobList = this.jpaService.execute(new BundleJobsGetPausedJPAExecutor(-1));
                if (jobList != null) {
                    for (BundleJobBean bundleJob : jobList) {
                        if (bundleJob.getPauseTime() != null && !bundleJob.getPauseTime().after(d)) continue;
                        this.queueCallable(new BundleUnpauseXCommand(bundleJob));
                        LOG.debug("Queuing BundleUnpauseXCommand for bundle job = " + bundleJob.getId());
                    }
                }
            }
            catch (JPAExecutorException ex) {
                LOG.warn((Object)"JPAExecutorException happened when pausing/unpausing/starting Bundle jobs", ex);
            }
            try {
                jobList = this.jpaService.execute(new BundleJobsGetNeedStartJPAExecutor(d));
                if (jobList != null) {
                    for (BundleJobBean bundleJob : jobList) {
                        this.queueCallable(new BundleStartXCommand(bundleJob.getId()));
                        LOG.debug("Queuing BundleStartXCommand for bundle job = " + bundleJob.getId());
                    }
                }
            }
            catch (JPAExecutorException ex) {
                LOG.warn((Object)"JPAExecutorException happened when pausing/unpausing/starting Bundle jobs", ex);
            }
        }

        private void updateCoord() {
            Date d = new Date();
            List<CoordinatorJobBean> jobList = null;
            boolean backwardSupportForCoordStatus = ConfigUtils.isBackwardSupportForCoordStatus();
            try {
                jobList = this.jpaService.execute(new CoordJobsGetUnpausedJPAExecutor(-1));
                if (jobList != null) {
                    for (CoordinatorJobBean coordJob : jobList) {
                        if (backwardSupportForCoordStatus && coordJob.getAppNamespace() != null && coordJob.getAppNamespace().equals("uri:oozie:coordinator:0.1") || coordJob.getPauseTime() == null || coordJob.getPauseTime().after(d)) continue;
                        this.queueCallable(new CoordPauseXCommand(coordJob));
                        LOG.debug("Queuing CoordPauseXCommand for coordinator job = " + coordJob.getId());
                    }
                }
            }
            catch (JPAExecutorException ex) {
                LOG.warn((Object)"JPAExecutorException happened when pausing/unpausing Coordinator jobs", ex);
            }
            try {
                jobList = this.jpaService.execute(new CoordJobsGetPausedJPAExecutor(-1));
                if (jobList != null) {
                    for (CoordinatorJobBean coordJob : jobList) {
                        if (backwardSupportForCoordStatus && coordJob.getAppNamespace() != null && coordJob.getAppNamespace().equals("uri:oozie:coordinator:0.1") || coordJob.getPauseTime() != null && !coordJob.getPauseTime().after(d)) continue;
                        this.queueCallable(new CoordUnpauseXCommand(coordJob));
                        LOG.debug("Queuing CoordUnpauseXCommand for coordinator job = " + coordJob.getId());
                    }
                }
            }
            catch (JPAExecutorException ex) {
                LOG.warn((Object)"JPAExecutorException happened when pausing/unpausing Coordinator jobs", ex);
            }
        }

        private void queueCallable(XCallable<Void> callable) {
            if (this.callables == null) {
                this.callables = new ArrayList<XCallable<Void>>();
            }
            this.callables.add(callable);
            if (this.callables.size() == ConfigurationService.getInt(PauseTransitService.CONF_CALLABLE_BATCH_SIZE)) {
                boolean ret = Services.get().get(CallableQueueService.class).queueSerial(this.callables);
                if (!ret) {
                    XLog.getLog(this.getClass()).warn("Unable to queue the callables commands for PauseTransitService. Most possibly command queue is full. Queue size is :" + Services.get().get(CallableQueueService.class).queueSize());
                }
                this.callables = new ArrayList<XCallable<Void>>();
            }
        }
    }
}

