/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.TezParallelismEstimator;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.tez.dag.api.EdgeProperty;

public class TezOperDependencyParallelismEstimator
implements TezParallelismEstimator {
    static final double DEFAULT_FLATTEN_FACTOR = 10.0;
    static final double DEFAULT_FILTER_FACTOR = 0.7;
    static final double DEFAULT_LIMIT_FACTOR = 0.1;
    static final double DEFAULT_DISTINCT_FACTOR = 0.9;
    static final double DEFAULT_AGGREGATION_FACTOR = 0.7;
    private PigContext pc;
    private int maxTaskCount;
    private long bytesPerReducer;

    @Override
    public void setPigContext(PigContext pc) {
        this.pc = pc;
    }

    @Override
    public int estimateParallelism(TezOperPlan plan, TezOperator tezOper, Configuration conf) throws IOException {
        if (tezOper.isVertexGroup()) {
            return -1;
        }
        this.maxTaskCount = conf.getInt("pig.exec.reducers.max", 999);
        this.bytesPerReducer = conf.getLong("pig.exec.reducers.bytes.per.reducer", 1000000000L);
        if (!tezOper.isIntermediateReducer().booleanValue() && tezOper.getRequestedParallelism() != -1) {
            return tezOper.getRequestedParallelism();
        }
        if (tezOper.getEstimatedParallelism() != -1) {
            return tezOper.getEstimatedParallelism();
        }
        List<TezOperator> preds = plan.getPredecessors(tezOper);
        if (preds == null) {
            throw new IOException("Cannot estimate parallelism for source vertex");
        }
        double estimatedParallelism = 0.0;
        for (Map.Entry<OperatorKey, TezEdgeDescriptor> entry : tezOper.inEdges.entrySet()) {
            boolean applyFactor;
            TezOperator pred = TezOperDependencyParallelismEstimator.getPredecessorWithKey(plan, tezOper, entry.getKey().toString());
            if (entry.getValue().dataMovementType != EdgeProperty.DataMovementType.SCATTER_GATHER && entry.getValue().dataMovementType != EdgeProperty.DataMovementType.ONE_TO_ONE) continue;
            double predParallelism = pred.getEffectiveParallelism(this.pc.defaultParallel);
            if (predParallelism == -1.0) {
                throw new IOException("Cannot estimate parallelism for " + tezOper.getOperatorKey().toString() + ", effective parallelism for predecessor " + tezOper.getOperatorKey().toString() + " is -1");
            }
            boolean bl = applyFactor = !tezOper.isUnion();
            if (!pred.isVertexGroup() && applyFactor) {
                predParallelism *= pred.getParallelismFactor(tezOper);
                if (pred.getTotalInputFilesSize() > 0L) {
                    int parallelismBySize = (int)Math.ceil((double)pred.getTotalInputFilesSize() / (double)this.bytesPerReducer);
                    predParallelism = Math.max(predParallelism, (double)parallelismBySize);
                }
            }
            estimatedParallelism += predParallelism;
        }
        int roundedEstimatedParallelism = (int)Math.ceil(estimatedParallelism);
        if (tezOper.isIntermediateReducer().booleanValue() && tezOper.isOverrideIntermediateParallelism()) {
            roundedEstimatedParallelism = Math.min(roundedEstimatedParallelism, this.maxTaskCount);
            int userSpecifiedParallelism = this.pc.defaultParallel;
            if (tezOper.getRequestedParallelism() != -1) {
                userSpecifiedParallelism = tezOper.getRequestedParallelism();
            }
            int intermediateParallelism = Math.max(userSpecifiedParallelism, roundedEstimatedParallelism);
            if (userSpecifiedParallelism != -1 && intermediateParallelism > 200 && intermediateParallelism > 2 * userSpecifiedParallelism) {
                intermediateParallelism = 2 * userSpecifiedParallelism;
            }
            roundedEstimatedParallelism = intermediateParallelism;
        } else {
            roundedEstimatedParallelism = Math.min(roundedEstimatedParallelism, this.maxTaskCount);
        }
        if (roundedEstimatedParallelism == 0) {
            roundedEstimatedParallelism = 1;
        }
        return roundedEstimatedParallelism;
    }

    private static TezOperator getPredecessorWithKey(TezOperPlan plan, TezOperator tezOper, String inputKey) {
        List<TezOperator> preds = plan.getPredecessors(tezOper);
        for (TezOperator pred : preds) {
            if (pred.isVertexGroup()) {
                for (OperatorKey unionPred : pred.getVertexGroupMembers()) {
                    if (!unionPred.toString().equals(inputKey)) continue;
                    return (TezOperator)plan.getOperator(unionPred);
                }
                continue;
            }
            if (!pred.getOperatorKey().toString().equals(inputKey)) continue;
            return pred;
        }
        return null;
    }

    public static class TezParallelismFactorVisitor
    extends PhyPlanVisitor {
        private double factor = 1.0;
        private String outputKey;
        private TezOperator tezOp;

        public TezParallelismFactorVisitor(TezOperator tezOp, TezOperator successor) {
            super(tezOp.plan, (PlanWalker<PhysicalOperator, PhysicalPlan>)new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(tezOp.plan));
            this.tezOp = tezOp;
            this.outputKey = tezOp.getOperatorKey().toString();
            if (successor != null) {
                TezEdgeDescriptor edge = tezOp.outEdges.get(successor.getOperatorKey());
                if (!edge.combinePlan.isEmpty() || edge.needsDistinctCombiner()) {
                    this.factor = successor.isDistinct() ? 0.9 : 0.7;
                }
            }
        }

        @Override
        public void visitFilter(POFilter fl) throws VisitorException {
            ConstantExpression cons;
            if (fl.getPlan().size() == 1 && fl.getPlan().getRoots().get(0) instanceof ConstantExpression && (cons = (ConstantExpression)fl.getPlan().getRoots().get(0)).getValue().equals(Boolean.TRUE)) {
                return;
            }
            this.factor *= 0.7;
        }

        @Override
        public void visitPOForEach(POForEach nfe) throws VisitorException {
            List<Boolean> flattens = nfe.getToBeFlattened();
            List<PhysicalPlan> inputPlans = nfe.getInputPlans();
            boolean containFlatten = false;
            for (int i = 0; i < flattens.size(); ++i) {
                PhysicalPlan inputPlan;
                PhysicalOperator root;
                if (!flattens.get(i).booleanValue() || !((root = (PhysicalOperator)(inputPlan = inputPlans.get(i)).getRoots().get(0)) instanceof POProject) || root.getResultType() != 120) continue;
                containFlatten = true;
                break;
            }
            if (containFlatten) {
                this.factor *= 10.0;
            }
        }

        @Override
        public void visitLimit(POLimit lim) throws VisitorException {
            this.factor = 0.1;
        }

        @Override
        public void visitFRJoin(POFRJoin join) throws VisitorException {
            this.factor *= 10.0;
        }

        @Override
        public void visitMergeJoin(POMergeJoin join) throws VisitorException {
            this.factor *= 10.0;
        }

        @Override
        public void visitPackage(POPackage pkg) throws VisitorException {
            if (pkg.getPkgr() instanceof JoinPackager) {
                this.factor *= 10.0;
            } else if (pkg.getPkgr() instanceof CombinerPackager) {
                this.factor = this.tezOp.isDistinct() ? (this.factor *= 0.9) : (this.factor *= 0.7);
            }
        }

        @Override
        public void visitSplit(POSplit sp) throws VisitorException {
            PhysicalPlan plan = TezParallelismFactorVisitor.getSplitBranch(sp, this.outputKey);
            this.pushWalker(this.mCurrentWalker.spawnChildWalker(plan));
            this.visit();
            this.popWalker();
        }

        private static PhysicalPlan getSplitBranch(POSplit split, String outputKey) throws VisitorException {
            List<PhysicalPlan> plans = split.getPlans();
            for (PhysicalPlan plan : plans) {
                LinkedList<POLocalRearrangeTez> lrs = PlanHelper.getPhysicalOperators(plan, POLocalRearrangeTez.class);
                if (!lrs.isEmpty()) {
                    return plan;
                }
                LinkedList<POValueOutputTez> vos = PlanHelper.getPhysicalOperators(plan, POValueOutputTez.class);
                if (vos.isEmpty()) continue;
                return plan;
            }
            return null;
        }

        public double getFactor() {
            return this.factor;
        }
    }
}

