/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pig.backend.hadoop.executionengine.spark.converter;

import java.io.IOException;
import java.io.Serializable;
import java.util.Comparator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigContext;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.IndexedKey;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.PigSecondaryKeyComparatorSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.SecondaryKeySortUtil;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DefaultBagFactory;
import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.spark.HashPartitioner;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.PairRDDFunctions;
import org.apache.spark.rdd.RDD;
import scala.Function1;
import scala.Function2;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
import scala.runtime.AbstractFunction2;

public class ReduceByConverter
implements RDDConverter<Tuple, Tuple, POReduceBySpark> {
    private static final Log LOG = LogFactory.getLog(ReduceByConverter.class);
    private static final TupleFactory tf = TupleFactory.getInstance();

    @Override
    public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POReduceBySpark op) throws IOException {
        SparkUtil.assertPredecessorSize(predecessors, op, 1);
        SparkPigContext.get();
        int parallelism = SparkPigContext.getParallelism(predecessors, op);
        RDD<Tuple> rdd = predecessors.get(0);
        RDD rddPair = rdd.map((Function1)new LocalRearrangeFunction(op.getLROp(), op.isUseSecondaryKey(), op.getSecondarySortOrder()), SparkUtil.getTuple2Manifest());
        if (op.isUseSecondaryKey()) {
            return SecondaryKeySortUtil.handleSecondarySort((RDD<Tuple2<IndexedKey, Tuple>>)rddPair, op.getPKGOp());
        }
        PairRDDFunctions pairRDDFunctions = new PairRDDFunctions(rddPair, SparkUtil.getManifest(IndexedKey.class), SparkUtil.getManifest(Tuple.class), null);
        RDD tupleRDD = pairRDDFunctions.reduceByKey(SparkUtil.getPartitioner(op.getCustomPartitioner(), parallelism), (Function2)new MergeValuesFunction(op));
        LOG.debug((Object)("Custom Partitioner and parallelims used : " + op.getCustomPartitioner() + ", " + parallelism));
        return tupleRDD.map((Function1)new ToTupleFunction(op), SparkUtil.getManifest(Tuple.class));
    }

    private JavaRDD<Tuple2<IndexedKey, Tuple>> handleSecondarySort(RDD<Tuple> rdd, POReduceBySpark op, int parallelism) {
        RDD rddPair = rdd.map((Function1)new ToKeyNullValueFunction(), SparkUtil.getTuple2Manifest());
        JavaPairRDD pairRDD = new JavaPairRDD(rddPair, SparkUtil.getManifest(Tuple.class), SparkUtil.getManifest(Object.class));
        JavaPairRDD sorted = pairRDD.repartitionAndSortWithinPartitions((Partitioner)new HashPartitioner(parallelism), (Comparator)new PigSecondaryKeyComparatorSpark(op.getSecondarySortOrder()));
        JavaRDD jrdd = sorted.keys();
        JavaRDD jrddPair = jrdd.map((Function)new ToKeyValueFunction(op));
        return jrddPair;
    }

    private static class LocalRearrangeFunction
    extends AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>>
    implements Serializable {
        private final POLocalRearrange lra;
        private boolean useSecondaryKey;
        private boolean[] secondarySortOrder;

        public LocalRearrangeFunction(POLocalRearrange lra, boolean useSecondaryKey, boolean[] secondarySortOrder) {
            if (useSecondaryKey) {
                this.useSecondaryKey = useSecondaryKey;
                this.secondarySortOrder = secondarySortOrder;
            }
            this.lra = lra;
        }

        public Tuple2<IndexedKey, Tuple> apply(Tuple t) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("LocalRearrangeFunction in " + t));
            }
            try {
                this.lra.setInputs(null);
                this.lra.attachInput(t);
                Result result = this.lra.getNextTuple();
                if (result == null) {
                    throw new RuntimeException("Null response found for LocalRearange on tuple: " + t);
                }
                switch (result.returnStatus) {
                    case 0: {
                        Tuple resultTuple = (Tuple)result.result;
                        Object key = resultTuple.get(1);
                        IndexedKey indexedKey = new IndexedKey((Byte)resultTuple.get(0), key);
                        if (this.useSecondaryKey) {
                            indexedKey.setUseSecondaryKey(this.useSecondaryKey);
                            indexedKey.setSecondarySortOrder(this.secondarySortOrder);
                        }
                        Tuple outValue = TupleFactory.getInstance().newTuple();
                        outValue.append(key);
                        outValue.append(resultTuple.get(2));
                        Tuple2 out = new Tuple2((Object)indexedKey, (Object)outValue);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug((Object)("LocalRearrangeFunction out " + out));
                        }
                        return out;
                    }
                }
                throw new RuntimeException("Unexpected response code from operator " + this.lra + " : " + result);
            }
            catch (ExecException e) {
                throw new RuntimeException("Couldn't do LocalRearange on tuple: " + t, e);
            }
        }
    }

    private static final class ToTupleFunction
    extends AbstractFunction1<Tuple2<IndexedKey, Tuple>, Tuple>
    implements Serializable {
        private final POReduceBySpark poReduce;

        public ToTupleFunction(POReduceBySpark poReduce) {
            this.poReduce = poReduce;
        }

        public Tuple apply(Tuple2<IndexedKey, Tuple> v1) {
            LOG.debug((Object)("ToTupleFunction in : " + v1));
            DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
            DefaultTuple t = new DefaultTuple();
            Tuple packagedTuple = null;
            try {
                Object key = ((Tuple)v1._2()).get(0);
                bag.add((Tuple)((Tuple)v1._2()).get(1));
                t.append(key);
                t.append(bag);
                this.poReduce.getPKGOp().getPkgr().attachInput(key, new DataBag[]{(DataBag)t.get(1)}, new boolean[]{true});
                packagedTuple = (Tuple)this.poReduce.getPKGOp().getPkgr().getNext().result;
            }
            catch (ExecException e) {
                throw new RuntimeException(e);
            }
            LOG.debug((Object)("ToTupleFunction out : " + packagedTuple));
            return packagedTuple;
        }
    }

    private static final class MergeValuesFunction
    extends AbstractFunction2<Tuple, Tuple, Tuple>
    implements Serializable {
        private final POReduceBySpark poReduce;

        public MergeValuesFunction(POReduceBySpark poReduce) {
            this.poReduce = poReduce;
        }

        public Tuple apply(Tuple v1, Tuple v2) {
            LOG.debug((Object)("MergeValuesFunction in : " + v1 + " , " + v2));
            Tuple result = tf.newTuple(2);
            DataBag bag = DefaultBagFactory.getInstance().newDefaultBag();
            DefaultTuple t = new DefaultTuple();
            try {
                Object key = v1.get(0);
                if (key == null) {
                    key = "";
                } else {
                    result.set(0, key);
                }
                bag.add((Tuple)v1.get(1));
                bag.add((Tuple)v2.get(1));
                t.append(key);
                t.append(bag);
                this.poReduce.getPKGOp().getPkgr().attachInput(key, new DataBag[]{(DataBag)t.get(1)}, new boolean[]{true});
                Tuple packagedTuple = (Tuple)this.poReduce.getPKGOp().getPkgr().getNext().result;
                LOG.debug((Object)("MergeValuesFunction packagedTuple : " + t));
                this.poReduce.attachInput(packagedTuple);
                Result r = this.poReduce.getNext(this.poReduce.getResultType());
                Tuple valueTuple = tf.newTuple();
                for (Object o : ((Tuple)r.result).getAll()) {
                    if (o.equals(key)) continue;
                    valueTuple.append(o);
                }
                result.set(1, valueTuple);
                LOG.debug((Object)("MergeValuesFunction out : " + result));
                return result;
            }
            catch (ExecException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static class ToKeyValueFunction
    implements Function<Tuple, Tuple2<IndexedKey, Tuple>>,
    Serializable {
        private POReduceBySpark poReduce = null;

        public ToKeyValueFunction(POReduceBySpark poReduce) {
            this.poReduce = poReduce;
        }

        public Tuple2<IndexedKey, Tuple> call(Tuple t) {
            try {
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("ToKeyValueFunction in " + t));
                }
                Object key = this.poReduce != null && this.poReduce.isUseSecondaryKey() ? ((Tuple)t.get(1)).get(0) : t.get(1);
                Tuple tupleWithKey = tf.newTuple();
                tupleWithKey.append(key);
                tupleWithKey.append(t.get(2));
                Tuple2 out = new Tuple2((Object)new IndexedKey((Byte)t.get(0), key), (Object)tupleWithKey);
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("ToKeyValueFunction out " + out));
                }
                return out;
            }
            catch (ExecException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static class ToKeyNullValueFunction
    extends AbstractFunction1<Tuple, Tuple2<Tuple, Object>>
    implements Serializable {
        private ToKeyNullValueFunction() {
        }

        public Tuple2<Tuple, Object> apply(Tuple t) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("ToKeyNullValueFunction in " + t));
            }
            Tuple2 out = new Tuple2((Object)t, null);
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("ToKeyNullValueFunction out " + out));
            }
            return out;
        }
    }
}

