/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pig.backend.hadoop.executionengine.spark.converter;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Objects;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.IndexedKey;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.rdd.RDD;
import scala.Tuple2;

public class SecondaryKeySortUtil {
    private static final Log LOG = LogFactory.getLog(SecondaryKeySortUtil.class);

    public static RDD<Tuple> handleSecondarySort(RDD<Tuple2<IndexedKey, Tuple>> rdd, POPackage pkgOp) {
        JavaPairRDD pairRDD = JavaPairRDD.fromRDD(rdd, SparkUtil.getManifest(IndexedKey.class), SparkUtil.getManifest(Tuple.class));
        int partitionNums = pairRDD.partitions().size();
        JavaPairRDD sorted = pairRDD.repartitionAndSortWithinPartitions((Partitioner)new IndexedKeyPartitioner(partitionNums));
        return sorted.mapPartitions((FlatMapFunction)new AccumulateByKey(pkgOp), true).rdd();
    }

    private static class IndexedKeyPartitioner
    extends Partitioner {
        private int partition;

        public IndexedKeyPartitioner(int partition) {
            this.partition = partition;
        }

        public int getPartition(Object obj) {
            IndexedKey indexedKey = (IndexedKey)obj;
            Tuple key = (Tuple)indexedKey.getKey();
            int hashCode = 0;
            try {
                hashCode = Objects.hashCode(key.get(0));
            }
            catch (ExecException e) {
                throw new RuntimeException("IndexedKeyPartitioner#getPartition: ", e);
            }
            return Math.abs(hashCode) % this.partition;
        }

        public int numPartitions() {
            return this.partition;
        }
    }

    private static class AccumulateByKey
    implements FlatMapFunction<Iterator<Tuple2<IndexedKey, Tuple>>, Tuple>,
    Serializable {
        private POPackage pkgOp;

        public AccumulateByKey(POPackage pkgOp) {
            this.pkgOp = pkgOp;
        }

        public Iterable<Tuple> call(final Iterator<Tuple2<IndexedKey, Tuple>> it) throws Exception {
            return new Iterable<Tuple>(){
                Object curKey = null;
                ArrayList curValues = new ArrayList();
                boolean initialized = false;

                @Override
                public Iterator<Tuple> iterator() {
                    return new Iterator<Tuple>(){

                        @Override
                        public boolean hasNext() {
                            return it.hasNext() || curKey != null;
                        }

                        @Override
                        public Tuple next() {
                            while (it.hasNext()) {
                                Tuple2 t = (Tuple2)it.next();
                                Object tMainKey = null;
                                try {
                                    tMainKey = ((Tuple)((IndexedKey)t._1()).getKey()).get(0);
                                    if (initialized && (curKey == null && tMainKey != null || curKey != null && !curKey.equals(tMainKey))) {
                                        Tuple result = AccumulateByKey.this.restructTuple(curKey, new ArrayList(curValues));
                                        curValues.clear();
                                        curKey = tMainKey;
                                        curValues.add(t._2());
                                        return result;
                                    }
                                    curKey = tMainKey;
                                    curValues.add(t._2());
                                    initialized = true;
                                }
                                catch (ExecException e) {
                                    throw new RuntimeException("AccumulateByKey throw exception: ", e);
                                }
                            }
                            if (!initialized) {
                                throw new RuntimeException("No tuples seen");
                            }
                            Tuple res = AccumulateByKey.this.restructTuple(curKey, curValues);
                            curKey = null;
                            return res;
                        }

                        @Override
                        public void remove() {
                            throw new UnsupportedOperationException();
                        }
                    };
                }
            };
        }

        private Tuple restructTuple(final Object curKey, ArrayList<Tuple> curValues) {
            try {
                Tuple retVal = null;
                PigNullableWritable retKey = new PigNullableWritable(){

                    @Override
                    public Object getValueAsPigType() {
                        return curKey;
                    }
                };
                final Iterator<Tuple> tupleItearator = curValues.iterator();
                Iterator<NullableTuple> iterator = new Iterator<NullableTuple>(){

                    @Override
                    public boolean hasNext() {
                        return tupleItearator.hasNext();
                    }

                    @Override
                    public NullableTuple next() {
                        Tuple t = (Tuple)tupleItearator.next();
                        return new NullableTuple(t);
                    }

                    @Override
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
                this.pkgOp.setInputs(null);
                this.pkgOp.attachInput(retKey, iterator);
                Result res = this.pkgOp.getNextTuple();
                if (res.returnStatus == 0) {
                    retVal = (Tuple)res.result;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("AccumulateByKey out: " + retVal));
                }
                return retVal;
            }
            catch (ExecException e) {
                throw new RuntimeException("AccumulateByKey#restructTuple throws exception: ", e);
            }
        }
    }
}

