/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.util;

import com.google.common.base.Function;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pig.EvalFunc;
import org.apache.pig.builtin.MonitoredUDF;
import org.apache.pig.data.Tuple;
import org.apache.pig.tools.pigstats.PigStatusReporter;

public class MonitoredUDFExecutor
implements Serializable {
    private final transient ListeningExecutorService exec = MoreExecutors.listeningDecorator((ExecutorService)MoreExecutors.getExitingExecutorService((ThreadPoolExecutor)new ScheduledThreadPoolExecutor(1)));
    private final transient TimeUnit timeUnit;
    private final transient long duration;
    private final transient Object defaultValue;
    private final transient EvalFunc evalFunc;
    private final transient Function<Tuple, Object> closure;
    private final transient Class<? extends ErrorCallback> errorCallback;
    private final transient Method errorHandler;
    private final transient Method timeoutHandler;

    public MonitoredUDFExecutor(EvalFunc udf) {
        this.evalFunc = udf;
        MonitoredUDF anno = udf.getClass().getAnnotation(MonitoredUDF.class);
        this.timeUnit = anno.timeUnit();
        this.duration = anno.duration();
        this.errorCallback = anno.errorCallback();
        try {
            this.errorHandler = this.errorCallback.getMethod("handleError", EvalFunc.class, Exception.class);
            this.timeoutHandler = this.errorCallback.getMethod("handleTimeout", EvalFunc.class, Exception.class);
        }
        catch (SecurityException e1) {
            throw new RuntimeException("Unable to use the monitored callback due to a Security Exception while working with " + this.evalFunc.getClass().getName());
        }
        catch (NoSuchMethodException e1) {
            throw new RuntimeException("Unable to use the monitored callback because a required method not found while working with " + this.evalFunc.getClass().getName());
        }
        Type retType = udf.getReturnType();
        this.defaultValue = this.getDefaultValue(anno, retType);
        this.closure = new Function<Tuple, Object>(){

            public Object apply(Tuple input) {
                try {
                    return MonitoredUDFExecutor.this.evalFunc.exec(input);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

    private Object getDefaultValue(MonitoredUDF anno, Type retType) {
        if (retType.equals(Integer.TYPE) || retType.equals(Integer.class)) {
            return anno.intDefault().length == 0 ? null : Integer.valueOf(anno.intDefault()[0]);
        }
        if (retType.equals(Double.TYPE) || retType.equals(Double.class)) {
            return anno.doubleDefault().length == 0 ? null : Double.valueOf(anno.doubleDefault()[0]);
        }
        if (retType.equals(Float.TYPE) || retType.equals(Float.class)) {
            return anno.floatDefault().length == 0 ? null : Float.valueOf(anno.floatDefault()[0]);
        }
        if (retType.equals(Long.TYPE) || retType.equals(Long.class)) {
            return anno.longDefault().length == 0 ? null : Long.valueOf(anno.longDefault()[0]);
        }
        if (retType.equals(String.class)) {
            return anno.stringDefault().length == 0 ? null : anno.stringDefault()[0];
        }
        return null;
    }

    public void terminate() {
        this.exec.shutdownNow();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Object monitorExec(final Tuple input) throws IOException {
        CheckedFuture f = Futures.makeChecked((ListenableFuture)this.exec.submit((Callable)new Callable<Object>(){

            @Override
            public Object call() throws Exception {
                return MonitoredUDFExecutor.this.closure.apply((Object)input);
            }
        }), (Function)new Function<Exception, Exception>(){

            public Exception apply(Exception e) {
                return e;
            }
        });
        Object result = this.defaultValue;
        try {
            try {
                result = f.get(this.duration, this.timeUnit);
            }
            catch (TimeoutException e) {
                this.timeoutHandler.invoke(null, this.evalFunc, e);
            }
            catch (Exception e) {
                this.errorHandler.invoke(null, this.evalFunc, e);
            }
            finally {
                f.cancel(true);
            }
        }
        catch (IllegalArgumentException e) {
            throw new IOException(e);
        }
        catch (IllegalAccessException e) {
            throw new IOException(e);
        }
        catch (InvocationTargetException e) {
            throw new IOException(e);
        }
        return result;
    }

    public static class ErrorCallback {
        public static void handleError(EvalFunc evalFunc, Exception e) {
            evalFunc.getLogger().error((Object)e);
            PigStatusReporter reporter = PigStatusReporter.getInstance();
            if (reporter != null && reporter.getCounter(evalFunc.getClass().getName(), e.toString()) != null) {
                reporter.getCounter(evalFunc.getClass().getName(), e.toString()).increment(1L);
            }
        }

        public static void handleTimeout(EvalFunc evalFunc, Exception e) {
            evalFunc.getLogger().error((Object)e);
            PigStatusReporter reporter = PigStatusReporter.getInstance();
            if (reporter != null && reporter.getCounter(evalFunc.getClass().getName(), "MonitoredUDF Timeout") != null) {
                reporter.getCounter(evalFunc.getClass().getName(), "MonitoredUDF Timeout").increment(1L);
            }
        }
    }
}

