/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparator;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.AccumulativeTupleBuffer;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
import org.apache.pig.data.AccumulativeBag;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.ReadOnceBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.apache.tez.runtime.library.common.ConfigUtils;

public class POShuffleTezLoad
extends POPackage
implements TezInput {
    private static final long serialVersionUID = 1L;
    private static final Log LOG = LogFactory.getLog(POShuffleTezLoad.class);
    protected List<String> inputKeys = new ArrayList<String>();
    private boolean isSkewedJoin = false;
    private transient List<LogicalInput> inputs;
    private transient List<KeyValuesReader> readers;
    private transient int numTezInputs;
    private transient boolean[] finished;
    private transient boolean[] readOnce;
    private transient WritableComparator comparator = null;
    private transient WritableComparator groupingComparator = null;
    private transient Configuration conf;
    private transient int accumulativeBatchSize;
    private transient boolean readOnceOneBag;

    public POShuffleTezLoad(POPackage pack) {
        super(pack);
    }

    @Override
    public String[] getTezInputs() {
        return this.inputKeys.toArray(new String[this.inputKeys.size()]);
    }

    @Override
    public void replaceInput(String oldInputKey, String newInputKey) {
        while (this.inputKeys.remove(oldInputKey)) {
            this.inputKeys.add(newInputKey);
        }
    }

    @Override
    public void addInputsToSkip(Set<String> inputsToSkip) {
    }

    @Override
    public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf) throws ExecException {
        this.conf = conf;
        this.inputs = new ArrayList<LogicalInput>();
        this.readers = new ArrayList<KeyValuesReader>();
        this.comparator = (WritableComparator)ConfigUtils.getIntermediateInputKeyComparator((Configuration)conf);
        this.groupingComparator = (WritableComparator)ConfigUtils.getInputKeySecondaryGroupingComparator((Configuration)conf);
        this.accumulativeBatchSize = AccumulatorOptimizerUtil.getAccumulativeBatchSize();
        try {
            int i;
            for (String inputKey : this.inputKeys) {
                LogicalInput input = inputs.get(inputKey);
                if (this.inputs.contains(input)) continue;
                this.inputs.add(input);
                KeyValuesReader reader = (KeyValuesReader)input.getReader();
                this.readers.add(reader);
                LOG.info((Object)("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader));
            }
            this.numInputs = this.pkgr.getKeyInfo().size();
            this.numTezInputs = this.inputs.size();
            this.readOnce = new boolean[this.numInputs];
            for (i = 0; i < this.numInputs; ++i) {
                this.readOnce[i] = false;
            }
            this.finished = new boolean[this.numTezInputs];
            for (i = 0; i < this.numTezInputs; ++i) {
                this.finished[i] = !this.readers.get(i).next();
            }
            boolean bl = this.readOnceOneBag = this.numInputs == 1 && (this.pkgr instanceof CombinerPackager || this.pkgr instanceof LitePackager || this.pkgr instanceof BloomPackager);
            if (this.readOnceOneBag) {
                this.readOnce[0] = true;
            }
        }
        catch (Exception e) {
            throw new ExecException(e);
        }
    }

    @Override
    public Result getNextTuple() throws ExecException {
        Result res = this.pkgr.getNext();
        TezAccumulativeTupleBuffer buffer = null;
        if (this.isAccumulative()) {
            buffer = new TezAccumulativeTupleBuffer(this.accumulativeBatchSize);
        }
        while (res.returnStatus == 3) {
            boolean hasData = false;
            Object cur = null;
            PigNullableWritable min = null;
            try {
                if (this.numTezInputs == 1) {
                    if (!this.finished[0]) {
                        hasData = true;
                        cur = this.readers.get(0).getCurrentKey();
                        min = ((PigNullableWritable)cur).clone();
                    }
                } else {
                    for (int i = 0; i < this.numTezInputs; ++i) {
                        if (this.finished[i]) continue;
                        hasData = true;
                        cur = this.readers.get(i).getCurrentKey();
                        if (min != null && this.comparator.compare((Object)min, cur) <= 0) continue;
                        min = ((PigNullableWritable)cur).clone();
                    }
                }
            }
            catch (Exception e) {
                throw new ExecException(e);
            }
            if (!hasData) {
                if (Boolean.valueOf(this.conf.get("pig.invoke.close.in.map", "false")).booleanValue()) {
                    this.parentPlan.endOfAllInput = true;
                }
                return RESULT_EOP;
            }
            this.key = this.pkgr.getKey(min);
            this.keyWritable = min;
            try {
                int i;
                DataBag[] bags = new DataBag[this.numInputs];
                if (this.isAccumulative()) {
                    buffer.setCurrentKey(min);
                    for (i = 0; i < this.numInputs; ++i) {
                        bags[i] = new AccumulativeBag(buffer, i);
                    }
                } else if (this.readOnceOneBag) {
                    bags[0] = new TezReadOnceBag(this.pkgr, min);
                } else {
                    for (i = 0; i < this.numInputs; ++i) {
                        bags[i] = new InternalCachedBag(this.numInputs);
                    }
                    if (this.numTezInputs == 1) {
                        do {
                            Iterable vals = this.readers.get(0).getCurrentValues();
                            for (Object val : vals) {
                                NullableTuple nTup = (NullableTuple)val;
                                byte index = nTup.getIndex();
                                Tuple tup = this.pkgr.getValueTuple(this.keyWritable, nTup, index);
                                bags[index].add(tup);
                            }
                            boolean bl = this.finished[0] = !this.readers.get(0).next();
                        } while (!this.finished[0] && this.groupingComparator.compare((Object)min, cur = this.readers.get(0).getCurrentKey()) == 0);
                    } else {
                        block10: for (i = 0; i < this.numTezInputs; ++i) {
                            if (this.finished[i]) continue;
                            cur = this.readers.get(i).getCurrentKey();
                            while (this.groupingComparator.compare((Object)min, cur) == 0) {
                                Iterable vals = this.readers.get(i).getCurrentValues();
                                for (Object val : vals) {
                                    NullableTuple nTup = (NullableTuple)val;
                                    byte index = nTup.getIndex();
                                    Tuple tup = this.pkgr.getValueTuple(this.keyWritable, nTup, index);
                                    bags[index].add(tup);
                                }
                                boolean bl = this.finished[i] = !this.readers.get(i).next();
                                if (this.finished[i]) continue block10;
                                cur = this.readers.get(i).getCurrentKey();
                            }
                        }
                    }
                }
                this.pkgr.attachInput(this.key, bags, this.readOnce);
                res = this.pkgr.getNext();
            }
            catch (IOException e) {
                throw new ExecException(e);
            }
        }
        return res;
    }

    public void setInputKeys(List<String> inputKeys) {
        this.inputKeys = inputKeys;
    }

    public void addInputKey(String inputKey) {
        this.inputKeys.add(inputKey);
    }

    public void setSkewedJoins(boolean isSkewedJoin) {
        this.isSkewedJoin = isSkewedJoin;
    }

    public boolean isSkewedJoin() {
        return this.isSkewedJoin;
    }

    @Override
    public boolean supportsMultipleInputs() {
        return true;
    }

    private class TezReadOnceBag
    extends ReadOnceBag {
        private static final long serialVersionUID = 1L;
        private Iterator<Object> iter;

        public TezReadOnceBag(Packager pkgr, PigNullableWritable currentKey) throws IOException {
            this.pkgr = pkgr;
            this.keyWritable = currentKey;
            this.iter = ((KeyValuesReader)POShuffleTezLoad.this.readers.get(0)).getCurrentValues().iterator();
        }

        @Override
        public Iterator<Tuple> iterator() {
            return new TezReadOnceBagIterator();
        }

        private class TezReadOnceBagIterator
        implements Iterator<Tuple> {
            private TezReadOnceBagIterator() {
            }

            @Override
            public boolean hasNext() {
                if (TezReadOnceBag.this.iter.hasNext()) {
                    return true;
                }
                try {
                    boolean bl = ((POShuffleTezLoad)POShuffleTezLoad.this).finished[0] = !((KeyValuesReader)POShuffleTezLoad.this.readers.get(0)).next();
                    if (POShuffleTezLoad.this.finished[0]) {
                        return false;
                    }
                    Object cur = ((KeyValuesReader)POShuffleTezLoad.this.readers.get(0)).getCurrentKey();
                    if (POShuffleTezLoad.this.groupingComparator.compare((Object)TezReadOnceBag.this.keyWritable, cur) == 0) {
                        TezReadOnceBag.this.iter = ((KeyValuesReader)POShuffleTezLoad.this.readers.get(0)).getCurrentValues().iterator();
                        if (TezReadOnceBag.this.iter.hasNext()) {
                            return true;
                        }
                        throw new RuntimeException("Unexpected. Key " + TezReadOnceBag.this.keyWritable + " does not have any values");
                    }
                    return false;
                }
                catch (IOException e) {
                    throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e);
                }
            }

            @Override
            public Tuple next() {
                NullableTuple ntup = (NullableTuple)TezReadOnceBag.this.iter.next();
                byte index = ntup.getIndex();
                Tuple ret = null;
                try {
                    ret = TezReadOnceBag.this.pkgr.getValueTuple(TezReadOnceBag.this.keyWritable, ntup, index);
                }
                catch (ExecException e) {
                    throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e);
                }
                return ret;
            }

            @Override
            public void remove() {
                throw new RuntimeException("ReadOnceBag.iterator().remove() is not allowed");
            }
        }
    }

    private class TezAccumulativeTupleBuffer
    implements AccumulativeTupleBuffer {
        private int batchSize;
        private List<Tuple>[] bags;
        private PigNullableWritable min;
        private boolean clearedCurrent = true;

        public TezAccumulativeTupleBuffer(int batchSize) {
            this.batchSize = batchSize;
            this.bags = new List[POShuffleTezLoad.this.numInputs];
            for (int i = 0; i < POShuffleTezLoad.this.numInputs; ++i) {
                this.bags[i] = new ArrayList<Tuple>(batchSize);
            }
        }

        public void setCurrentKey(PigNullableWritable curKey) {
            if (!this.clearedCurrent) {
                this.clear();
            }
            this.min = curKey;
            this.clearedCurrent = false;
        }

        @Override
        public boolean hasNextBatch() {
            Object cur = null;
            try {
                for (int i = 0; i < POShuffleTezLoad.this.numTezInputs; ++i) {
                    if (POShuffleTezLoad.this.finished[i]) continue;
                    cur = ((KeyValuesReader)POShuffleTezLoad.this.readers.get(i)).getCurrentKey();
                    if (POShuffleTezLoad.this.groupingComparator.compare((Object)this.min, cur) != 0) continue;
                    return true;
                }
            }
            catch (IOException e) {
                throw new RuntimeException("Error while checking for next Accumulator batch", e);
            }
            return false;
        }

        @Override
        public void nextBatch() throws IOException {
            int i;
            Object cur = null;
            for (i = 0; i < this.bags.length; ++i) {
                this.bags[i].clear();
            }
            try {
                block3: for (i = 0; i < POShuffleTezLoad.this.numTezInputs; ++i) {
                    if (POShuffleTezLoad.this.finished[i]) continue;
                    cur = ((KeyValuesReader)POShuffleTezLoad.this.readers.get(i)).getCurrentKey();
                    int batchCount = 0;
                    while (POShuffleTezLoad.this.groupingComparator.compare((Object)this.min, cur) == 0) {
                        Iterator iter = ((KeyValuesReader)POShuffleTezLoad.this.readers.get(i)).getCurrentValues().iterator();
                        while (iter.hasNext() && batchCount < this.batchSize) {
                            NullableTuple nTup = (NullableTuple)iter.next();
                            byte index = nTup.getIndex();
                            this.bags[index].add(POShuffleTezLoad.this.pkgr.getValueTuple(POShuffleTezLoad.this.keyWritable, nTup, index));
                            ++batchCount;
                        }
                        if (batchCount == this.batchSize) {
                            if (iter.hasNext()) continue block3;
                            ((POShuffleTezLoad)POShuffleTezLoad.this).finished[i] = !((KeyValuesReader)POShuffleTezLoad.this.readers.get(i)).next();
                            continue block3;
                        }
                        boolean bl = ((POShuffleTezLoad)POShuffleTezLoad.this).finished[i] = !((KeyValuesReader)POShuffleTezLoad.this.readers.get(i)).next();
                        if (POShuffleTezLoad.this.finished[i]) continue block3;
                        cur = ((KeyValuesReader)POShuffleTezLoad.this.readers.get(i)).getCurrentKey();
                    }
                }
            }
            catch (IOException e) {
                throw new RuntimeException("Error while reading next Accumulator batch", e);
            }
        }

        @Override
        public void clear() {
            for (int i = 0; i < this.bags.length; ++i) {
                this.bags[i].clear();
            }
            Object cur = null;
            try {
                block3: for (int i = 0; i < POShuffleTezLoad.this.numTezInputs; ++i) {
                    if (POShuffleTezLoad.this.finished[i]) continue;
                    cur = ((KeyValuesReader)POShuffleTezLoad.this.readers.get(i)).getCurrentKey();
                    while (POShuffleTezLoad.this.groupingComparator.compare((Object)this.min, cur) == 0) {
                        boolean bl = ((POShuffleTezLoad)POShuffleTezLoad.this).finished[i] = !((KeyValuesReader)POShuffleTezLoad.this.readers.get(i)).next();
                        if (POShuffleTezLoad.this.finished[i]) continue block3;
                        cur = ((KeyValuesReader)POShuffleTezLoad.this.readers.get(i)).getCurrentKey();
                    }
                }
            }
            catch (IOException e) {
                throw new RuntimeException("Error while cleaning up for next Accumulator batch", e);
            }
            this.clearedCurrent = true;
        }

        @Override
        public Iterator<Tuple> getTuples(int index) {
            return this.bags[index].iterator();
        }
    }
}

