/*
 * Decompiled with CFR 0.152.
 */
package org.pentaho.di.trans.dataservice;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.MultimapBuilder;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.subjects.PublishSubject;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.pentaho.di.core.Condition;
import org.pentaho.di.core.RowMetaAndData;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettleFileException;
import org.pentaho.di.core.exception.KettleStepException;
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.core.parameters.NamedParams;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.core.row.ValueMetaAndData;
import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.core.sql.SQL;
import org.pentaho.di.core.util.Utils;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.RowProducer;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransAdapter;
import org.pentaho.di.trans.TransListener;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.dataservice.Context;
import org.pentaho.di.trans.dataservice.DataServiceMeta;
import org.pentaho.di.trans.dataservice.SqlTransGenerator;
import org.pentaho.di.trans.dataservice.client.api.IDataServiceClientService;
import org.pentaho.di.trans.dataservice.clients.TransMutators;
import org.pentaho.di.trans.dataservice.execution.CopyParameters;
import org.pentaho.di.trans.dataservice.execution.DefaultTransWiring;
import org.pentaho.di.trans.dataservice.execution.PrepareExecution;
import org.pentaho.di.trans.dataservice.execution.TransStarter;
import org.pentaho.di.trans.dataservice.optimization.OptimizationImpactInfo;
import org.pentaho.di.trans.dataservice.optimization.PushDownOptimizationMeta;
import org.pentaho.di.trans.dataservice.optimization.ValueMetaResolver;
import org.pentaho.di.trans.dataservice.streaming.StreamServiceKey;
import org.pentaho.di.trans.dataservice.streaming.WindowParametersHelper;
import org.pentaho.di.trans.dataservice.streaming.execution.StreamingGeneratedTransExecution;
import org.pentaho.di.trans.dataservice.streaming.execution.StreamingServiceTransExecutor;
import org.pentaho.di.trans.dataservice.utils.KettleUtils;
import org.pentaho.di.trans.step.RowAdapter;
import org.pentaho.di.trans.step.RowListener;
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.metastore.api.IMetaStore;

public class DataServiceExecutor {
    private static final Log logger = LogFactory.getLog(DataServiceExecutor.class);
    private static final Class<?> PKG = DataServiceExecutor.class;
    private final Trans serviceTrans;
    private final Trans genTrans;
    private final DataServiceMeta service;
    private final SQL sql;
    private final Map<String, String> parameters;
    private final SqlTransGenerator sqlTransGenerator;
    private final ListMultimap<ExecutionPoint, Runnable> listenerMap;
    private final Context context;
    private IDataServiceClientService.StreamingMode windowMode;
    private long windowSize;
    private long windowEvery;
    private long windowLimit;
    private StreamServiceKey streamServiceKey;
    private final AtomicBoolean genTransformationPushBasedIsFinished = new AtomicBoolean(false);
    private final AtomicBoolean transListenerFinishTransAdded = new AtomicBoolean(false);

    private DataServiceExecutor(Builder builder) {
        this.sql = builder.sql;
        this.service = builder.service;
        this.parameters = builder.parameters;
        this.serviceTrans = builder.serviceTrans;
        this.sqlTransGenerator = builder.sqlTransGenerator;
        this.genTrans = builder.genTrans;
        this.context = builder.context;
        this.windowMode = builder.windowMode;
        this.windowSize = builder.windowSize;
        this.windowEvery = builder.windowEvery;
        this.windowLimit = builder.windowLimit;
        this.streamServiceKey = builder.streamServiceKey;
        this.listenerMap = MultimapBuilder.enumKeys(ExecutionPoint.class).linkedListValues().build();
    }

    private void setLogLevel(LogLevel logLevel) {
        if (this.serviceTrans != null) {
            this.serviceTrans.setLogLevel(logLevel);
            this.getServiceTransMeta().setLogLevel(logLevel);
        }
        if (this.genTrans != null) {
            this.genTrans.setLogLevel(logLevel);
            this.getGenTransMeta().setLogLevel(logLevel);
        }
    }

    private static void convertCondition(Condition condition, ValueMetaResolver resolver) {
        if (condition.isAtomic()) {
            if (condition.getFunction() == 9) {
                DataServiceExecutor.convertListCondition(condition, resolver);
            } else {
                DataServiceExecutor.convertAtomicCondition(condition, resolver);
            }
        } else {
            for (Condition child : condition.getChildren()) {
                DataServiceExecutor.convertCondition(child, resolver);
            }
        }
    }

    private static void convertAtomicCondition(Condition condition, ValueMetaResolver resolver) {
        if (condition.getRightExact() == null) {
            return;
        }
        String fieldName = condition.getLeftValuename();
        ValueMetaAndData rhs = condition.getRightExact();
        try {
            ValueMetaInterface resolvedValueMeta = resolver.getValueMeta(fieldName);
            if (5 == resolvedValueMeta.getType() && 1 == rhs.getValueMeta().getType()) {
                return;
            }
            Object resolvedValue = resolver.getTypedValue(fieldName, rhs.getValueMeta().getType(), rhs.getValueData());
            if (resolvedValueMeta.getStorageType() != 0) {
                resolvedValueMeta = resolvedValueMeta.clone();
                resolvedValueMeta.setStorageType(0);
            }
            condition.setRightExact(new ValueMetaAndData(resolvedValueMeta, resolvedValue));
        }
        catch (KettleException kettleException) {
            // empty catch block
        }
    }

    private static void convertListCondition(Condition condition, ValueMetaResolver resolver) {
        String fieldName = condition.getLeftValuename();
        try {
            ValueMetaInterface resolvedValueMeta = resolver.getValueMeta(fieldName);
            Object[] typedValues = resolver.inListToTypedObjectArray(fieldName, condition.getRightExactString());
            Object[] typedValueStrings = new String[typedValues.length];
            for (int i = 0; i < typedValues.length; ++i) {
                typedValueStrings[i] = resolvedValueMeta.getCompatibleString(typedValues[i]);
            }
            condition.getRightExact().setValueData((Object)StringUtils.join((Object[])typedValueStrings, (char)';'));
        }
        catch (KettleException kettleException) {
            // empty catch block
        }
    }

    protected void prepareExecution() throws KettleException {
        ImmutableMultimap.Builder builder = ImmutableMultimap.builder();
        builder.putAll((Object)ExecutionPoint.PREPARE, (Object[])new Runnable[]{new CopyParameters(this.parameters, (NamedParams)this.serviceTrans, new NamedParams[0]), new PrepareExecution(this.serviceTrans), new PrepareExecution(this.genTrans)});
        builder.putAll((Object)ExecutionPoint.READY, (Object[])new Runnable[]{new DefaultTransWiring(this)});
        builder.putAll((Object)ExecutionPoint.START, (Object[])new Runnable[]{new TransStarter(this.genTrans), new TransStarter(this.serviceTrans)});
        this.listenerMap.putAll((Multimap)builder.build());
    }

    public static void writeMetadata(DataOutputStream dos, String ... metadatas) throws IOException {
        for (String metadata : metadatas) {
            dos.writeUTF(metadata);
        }
    }

    public DataServiceExecutor executeQuery(final DataOutputStream dos) throws IOException {
        DataServiceExecutor.writeMetadata(dos, this.getServiceName(), DataServiceExecutor.calculateTransname(this.getSql(), true), this.getServiceTrans().getContainerObjectId(), DataServiceExecutor.calculateTransname(this.getSql(), false), this.getGenTrans().getContainerObjectId());
        final AtomicBoolean rowMetaWritten = new AtomicBoolean(false);
        this.genTransformationPushBasedIsFinished.set(false);
        if (this.transListenerFinishTransAdded.compareAndSet(false, true)) {
            this.getGenTrans().addTransListener((TransListener)new TransAdapter(){

                public void transFinished(Trans trans) throws KettleException {
                    DataServiceExecutor.this.writeMeta(trans, dos, rowMetaWritten);
                }
            });
        }
        PublishSubject consumer = PublishSubject.create();
        Disposable[] disposableWrapper = new Disposable[1];
        consumer.doOnError(t -> {
            logger.error((Object)"Error consuming data from the generated transformation into the consumer", t);
            this.wrapupConsumerResources((Observer<RowMetaAndData>)consumer);
        }).doOnComplete(() -> {
            this.writeMeta(this.getGenTrans(), dos, rowMetaWritten);
            this.genTransformationPushBasedIsFinished.set(true);
            if (disposableWrapper[0] != null) {
                disposableWrapper[0].dispose();
            }
        }).doOnSubscribe(disposable -> {
            disposableWrapper[0] = disposable;
        }).subscribe(rowMetaAndData -> {
            block2: {
                try {
                    RowMetaInterface rowMetaInterface = rowMetaAndData.getRowMeta();
                    this.writeMeta(rowMetaInterface, dos, rowMetaWritten);
                    rowMetaInterface.writeData(dos, rowMetaAndData.getData());
                }
                catch (Exception e) {
                    if (this.getServiceTrans().isStopped()) break block2;
                    throw new KettleStepException((Throwable)e);
                }
            }
        });
        return this.executeQuery((Observer<RowMetaAndData>)consumer);
    }

    private void writeMeta(Trans generatedTransformation, DataOutputStream dos, AtomicBoolean rowMetaWritten) {
        try {
            if (!rowMetaWritten.get()) {
                RowMetaInterface stepFields = generatedTransformation.getTransMeta().getStepFields(this.getResultStepName());
                this.writeMeta(stepFields, dos, rowMetaWritten);
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void writeMeta(RowMetaInterface rowMetaInterface, DataOutputStream dos, AtomicBoolean rowMetaWritten) throws KettleFileException {
        if (rowMetaWritten.compareAndSet(false, true)) {
            rowMetaInterface.writeMeta(dos);
        }
    }

    @VisibleForTesting
    protected void wrapupConsumerResources(Observer<RowMetaAndData> consumer) {
        consumer.onComplete();
        StreamingServiceTransExecutor serviceTransExecutor = this.context.getServiceTransExecutor(this.streamServiceKey);
        String streamingGenTransCacheKey = this.getStreamingGenTransCacheKey();
        serviceTransExecutor.clearCacheByKey(streamingGenTransCacheKey);
        this.context.removeServiceTransExecutor(serviceTransExecutor.getKey().getDataServiceId());
        this.genTransformationPushBasedIsFinished.set(true);
    }

    public DataServiceExecutor executeQuery(Observer<RowMetaAndData> consumer) {
        if (this.service.isStreaming()) {
            Disposable[] disposableWrapper = new Disposable[1];
            PublishSubject accumulator = PublishSubject.create();
            accumulator.doOnNext(rowMetaAndDataList -> {
                rowMetaAndDataList.stream().forEach(arg_0 -> ((Observer)consumer).onNext(arg_0));
                consumer.onComplete();
                accumulator.onComplete();
                if (disposableWrapper[0] != null) {
                    disposableWrapper[0].dispose();
                }
            }).doOnSubscribe(disposable -> {
                disposableWrapper[0] = disposable;
            }).subscribe();
            return this.executeStreamingQuery((Observer<List<RowMetaAndData>>)accumulator, true);
        }
        return this.executeDefaultQuery(consumer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public DataServiceExecutor executeStreamingQuery(Observer<List<RowMetaAndData>> streamingConsumer, boolean pollingMode) {
        String streamingGenTransCacheKey = this.getStreamingGenTransCacheKey();
        if (streamingGenTransCacheKey == null) {
            return null;
        }
        Context context = this.context;
        synchronized (context) {
            StreamingGeneratedTransExecution streamingGenTransFromCache = this.context.getStreamingGeneratedTransExecution(streamingGenTransCacheKey);
            if (streamingGenTransFromCache == null) {
                StreamingGeneratedTransExecution streamWiring = new StreamingGeneratedTransExecution(this.context.getServiceTransExecutor(this.streamServiceKey), this.genTrans, streamingConsumer, pollingMode, this.sqlTransGenerator.getInjectorStepName(), this.sqlTransGenerator.getResultStepName(), this.sqlTransGenerator.getSql().getSqlString(), this.windowMode, this.windowSize, this.windowEvery, this.windowLimit, streamingGenTransCacheKey);
                this.serviceTrans.addTransListener((TransListener)new TransAdapter(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    public void transFinished(Trans trans) throws KettleException {
                        Context context = DataServiceExecutor.this.context;
                        synchronized (context) {
                            StreamingServiceTransExecutor streamingServiceTransExecutor = DataServiceExecutor.this.context.getServiceTransExecutor(DataServiceExecutor.this.streamServiceKey);
                            if (streamingServiceTransExecutor != null && !streamingServiceTransExecutor.getServiceTrans().isRunning()) {
                                DataServiceExecutor.this.context.removeServiceTransExecutor(DataServiceExecutor.this.streamServiceKey);
                            }
                        }
                    }
                });
                this.listenerMap.get((Object)ExecutionPoint.READY).add(streamWiring);
                this.context.addStreamingGeneratedTransExecution(streamingGenTransCacheKey, streamWiring);
                return this.executeQuery();
            }
            this.context.getServiceTransExecutor(this.streamServiceKey).touchServiceListener(streamingGenTransCacheKey);
            streamingGenTransFromCache.addNewRowConsumer(streamingConsumer, pollingMode);
        }
        return this;
    }

    public String getStreamingGenTransCacheKey() {
        StreamingServiceTransExecutor serviceExecutor = this.context.getServiceTransExecutor(this.streamServiceKey);
        if (serviceExecutor != null && serviceExecutor.getKey() != null) {
            return WindowParametersHelper.getCacheKey(this.sqlTransGenerator.getSql().getSqlString(), this.windowMode, this.windowSize, this.windowEvery, (int)serviceExecutor.getWindowMaxRowLimit(), serviceExecutor.getWindowMaxTimeLimit(), this.windowLimit, serviceExecutor.getKey().hashCode());
        }
        return null;
    }

    public DataServiceExecutor executeDefaultQuery(final Observer<RowMetaAndData> consumer) {
        this.listenerMap.get((Object)ExecutionPoint.READY).add(new Runnable(){

            @Override
            public void run() {
                StepInterface resultStep = DataServiceExecutor.this.genTrans.findRunThread(DataServiceExecutor.this.getResultStepName());
                if (resultStep != null) {
                    resultStep.addRowListener((RowListener)new RowAdapter(){

                        public void rowWrittenEvent(RowMetaInterface rowMeta, Object[] row) throws KettleStepException {
                            consumer.onNext((Object)new RowMetaAndData(rowMeta, row));
                        }

                        public void errorRowWrittenEvent(RowMetaInterface rowMeta, Object[] row) throws KettleStepException {
                            consumer.onNext((Object)new RowMetaAndData(rowMeta, row));
                        }
                    });
                    DataServiceExecutor.this.genTrans.addTransListener((TransListener)new TransAdapter(){

                        public void transFinished(Trans trans) throws KettleException {
                            consumer.onComplete();
                        }
                    });
                }
            }
        });
        return this.executeQuery();
    }

    public DataServiceExecutor executeQuery() {
        for (PushDownOptimizationMeta optimizationMeta : this.service.getPushDownOptimizationMeta()) {
            if (!optimizationMeta.isEnabled()) continue;
            optimizationMeta.activate(this);
        }
        this.executeListeners(ExecutionPoint.values());
        return this;
    }

    public void executeListeners(ExecutionPoint ... stages) {
        for (ExecutionPoint stage : stages) {
            ImmutableList tasks = ImmutableList.copyOf((Collection)this.listenerMap.get((Object)stage));
            for (Runnable task : tasks) {
                task.run();
            }
            if (this.listenerMap.get((Object)stage).equals(tasks)) continue;
            this.getGenTrans().getLogChannel().logError("Listeners were modified while executing {0}. Started with {1} and ended with {2}", new Object[]{stage, tasks, this.listenerMap.get((Object)stage)});
        }
    }

    public RowProducer addRowProducer() throws KettleException {
        return this.genTrans.addRowProducer(this.sqlTransGenerator.getInjectorStepName(), 0);
    }

    public void waitUntilFinished() {
        if (!this.service.isStreaming()) {
            this.serviceTrans.waitUntilFinished();
            this.genTrans.waitUntilFinished();
        } else {
            try {
                while (!this.genTrans.isFinishedOrStopped() && !this.genTransformationPushBasedIsFinished.get()) {
                    Thread.sleep(50L);
                }
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public TransMeta getServiceTransMeta() {
        return this.serviceTrans.getTransMeta();
    }

    public TransMeta getGenTransMeta() {
        return this.genTrans.getTransMeta();
    }

    public DataServiceMeta getService() {
        return this.service;
    }

    public Trans getServiceTrans() {
        return this.serviceTrans;
    }

    public Trans getGenTrans() {
        return this.genTrans;
    }

    public String getServiceName() {
        return this.sql.getServiceName();
    }

    public Map<String, String> getParameters() {
        return this.parameters;
    }

    public String getId() {
        return this.serviceTrans.getContainerObjectId();
    }

    public Boolean hasErrors() {
        return this.serviceTrans.getErrors() > 0 || this.genTrans.getErrors() > 0;
    }

    public static String calculateTransname(SQL sql, boolean isService) {
        StringBuilder sbsql = new StringBuilder(sql.getServiceName());
        sbsql.append(" - ");
        if (isService) {
            sbsql.append("Service");
        } else {
            sbsql.append("SQL");
        }
        sbsql.append(" - ");
        sbsql.append(sql.getSqlString().hashCode());
        for (int i = sbsql.length() - 1; i >= 0; --i) {
            if (sbsql.charAt(i) != '\n' && sbsql.charAt(i) != '\r') continue;
            sbsql.setCharAt(i, ' ');
        }
        return sbsql.toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop(boolean stopTrans) {
        Trans trans = this.getServiceTrans();
        synchronized (trans) {
            if (stopTrans || !this.service.isStreaming()) {
                if (this.serviceTrans.isRunning()) {
                    this.serviceTrans.stopAll();
                }
                if (this.genTrans.isRunning()) {
                    this.genTrans.stopAll();
                }
                if (this.service.isStreaming()) {
                    this.genTransformationPushBasedIsFinished.set(true);
                    StreamingServiceTransExecutor serviceExecutor = this.context.getServiceTransExecutor(this.streamServiceKey);
                    if (serviceExecutor != null) {
                        String streamingGenTransCacheKey = this.getStreamingGenTransCacheKey();
                        this.context.removeStreamingGeneratedTransExecution(streamingGenTransCacheKey);
                        serviceExecutor.clearCacheByKey(streamingGenTransCacheKey);
                    }
                }
            }
        }
    }

    public boolean isStopped() {
        return this.genTrans.isStopped();
    }

    public SQL getSql() {
        return this.sql;
    }

    public String getResultStepName() {
        return this.sqlTransGenerator.getResultStepName();
    }

    public int getRowLimit() {
        return this.sqlTransGenerator.getRowLimit();
    }

    public int getServiceRowLimit() {
        return this.sqlTransGenerator.getServiceRowLimit();
    }

    public ListMultimap<ExecutionPoint, Runnable> getListenerMap() {
        return this.listenerMap;
    }

    public static enum ExecutionPoint {
        PREPARE,
        OPTIMIZE,
        READY,
        START;

    }

    public static class Builder {
        private final SQL sql;
        private final DataServiceMeta service;
        private final Context context;
        private Trans serviceTrans;
        private Trans genTrans;
        private int rowLimit = 0;
        private long timeLimit = 0L;
        private IDataServiceClientService.StreamingMode windowMode;
        private long windowSize = 0L;
        private long windowEvery = 0L;
        private long windowLimit = 0L;
        private Map<String, String> parameters = new HashMap<String, String>();
        private LogLevel logLevel;
        private SqlTransGenerator sqlTransGenerator;
        private StreamServiceKey streamServiceKey;
        private boolean normalizeConditions = true;
        private boolean enableMetrics = false;
        private IMetaStore metastore;
        private BiConsumer<String, TransMeta> transMutator = (stepName, transMeta) -> TransMutators.disableAllUnrelatedHops(stepName, transMeta, true);
        private KettleUtils kettleUtils = KettleUtils.getInstance();

        public Builder(SQL sql, DataServiceMeta service, Context context) {
            this.sql = (SQL)Preconditions.checkNotNull((Object)sql, (Object)"SQL must not be null.");
            this.service = (DataServiceMeta)Preconditions.checkNotNull((Object)service, (Object)"Service must not be null.");
            this.context = context;
        }

        public Builder parameters(Map<String, String> parameters) {
            this.parameters = Maps.newHashMap(parameters);
            return this;
        }

        public Builder rowLimit(int rowLimit) {
            this.rowLimit = rowLimit;
            return this;
        }

        public Builder timeLimit(long timeLimit) {
            this.timeLimit = timeLimit;
            return this;
        }

        public Builder windowMode(IDataServiceClientService.StreamingMode windowMode) {
            this.windowMode = windowMode;
            return this;
        }

        public Builder windowSize(long windowSize) {
            this.windowSize = windowSize;
            return this;
        }

        public Builder windowEvery(long windowEvery) {
            this.windowEvery = windowEvery;
            return this;
        }

        public Builder windowLimit(long windowLimit) {
            this.windowLimit = windowLimit;
            return this;
        }

        public Builder logLevel(LogLevel logLevel) {
            this.logLevel = logLevel;
            return this;
        }

        public Builder metastore(IMetaStore metastore) {
            this.metastore = metastore;
            return this;
        }

        public Builder serviceTrans(Trans serviceTrans) {
            this.serviceTrans = serviceTrans;
            return this;
        }

        Builder serviceTransMutator(BiConsumer<String, TransMeta> transMutator) {
            this.transMutator = transMutator;
            return this;
        }

        public Builder serviceTrans(TransMeta serviceTransMeta) {
            serviceTransMeta = (TransMeta)serviceTransMeta.realClone(false);
            serviceTransMeta.clearNameChangedListeners();
            serviceTransMeta.setName(DataServiceExecutor.calculateTransname(this.sql, true));
            serviceTransMeta.activateParameters();
            this.transMutator.accept(this.service.getStepname(), serviceTransMeta);
            return this.serviceTrans(new Trans(serviceTransMeta));
        }

        public Builder sqlTransGenerator(SqlTransGenerator sqlTransGenerator) {
            this.sqlTransGenerator = sqlTransGenerator;
            return this;
        }

        public Builder genTrans(Trans trans) {
            this.genTrans = trans;
            return this;
        }

        public Builder enableMetrics(boolean enable) {
            this.enableMetrics = enable;
            return this;
        }

        public Builder normalizeConditions(boolean enable) {
            this.normalizeConditions = enable;
            return this;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public DataServiceExecutor build() throws KettleException {
            int serviceRowLimit = this.getServiceRowLimit(this.service);
            if (this.service.isStreaming() && this.windowMode == null) {
                throw new KettleException(BaseMessages.getString((Class)PKG, (String)"DataServiceExecutor.Error.WindowModeMandatory", (String[])new String[]{this.sql.getServiceName(), this.service.getName()}));
            }
            if (this.sql.getServiceName() == null || !this.sql.getServiceName().equals(this.service.getName())) {
                throw new KettleException(BaseMessages.getString((Class)PKG, (String)"DataServiceExecutor.Error.TableNameAndDataServiceNameDifferent", (String[])new String[]{this.sql.getServiceName(), this.service.getName()}));
            }
            if (this.service.getServiceTrans() == null) {
                throw new KettleException(BaseMessages.getString((Class)PKG, (String)"DataServiceExecutor.Error.NoServiceTransformation", (String[])new String[]{this.sql.getServiceName(), this.service.getName()}));
            }
            RowMetaInterface serviceFields = this.service.getServiceTrans().getStepFields(this.service.getStepname());
            ValueMetaResolver resolver = new ValueMetaResolver(serviceFields);
            this.sql.parse(resolver.getRowMeta());
            if (this.normalizeConditions) {
                if (this.sql.getWhereCondition() != null && this.sql.getWhereCondition().getCondition() != null) {
                    DataServiceExecutor.convertCondition(this.sql.getWhereCondition().getCondition(), resolver);
                }
                if (this.sql.getHavingCondition() != null && this.sql.getHavingCondition().getCondition() != null) {
                    DataServiceExecutor.convertCondition(this.sql.getHavingCondition().getCondition(), resolver);
                }
            }
            if (this.service.isStreaming()) {
                Context context = this.context;
                synchronized (context) {
                    this.addInlinePushDownParameters();
                    this.streamServiceKey = this.getStreamingServiceKey();
                    StreamingServiceTransExecutor serviceTransExecutor = this.context.getServiceTransExecutor(this.streamServiceKey);
                    if (serviceTransExecutor != null && !serviceTransExecutor.getServiceTrans().getTransMeta().getModifiedDate().equals(this.service.getServiceTrans().getModifiedDate())) {
                        this.context.removeServiceTransExecutor(serviceTransExecutor.getKey());
                        serviceTransExecutor.stopAll();
                        serviceTransExecutor = null;
                    }
                    if (serviceTransExecutor == null) {
                        if (this.serviceTrans == null && this.service.getServiceTrans() != null) {
                            this.serviceTrans(this.service.getServiceTrans());
                        }
                        if (this.serviceTrans != null) {
                            int windowMaxRowLimit = (int)this.getStreamingLimit(this.rowLimit, this.service.getRowLimit(), this.getKettleRowLimit(), 50000L);
                            long windowMaxTimeLimit = this.getStreamingLimit(this.timeLimit, this.service.getTimeLimit(), this.getKettleTimeLimit(), 100000L);
                            serviceTransExecutor = new StreamingServiceTransExecutor(this.streamServiceKey, this.serviceTrans, this.service.getStepname(), windowMaxRowLimit, windowMaxTimeLimit, this.context);
                            this.context.addServiceTransExecutor(serviceTransExecutor);
                        }
                    } else {
                        this.serviceTrans(serviceTransExecutor.getServiceTrans());
                    }
                }
            } else if (this.serviceTrans == null && this.service.getServiceTrans() != null) {
                this.serviceTrans(this.service.getServiceTrans());
            }
            if (this.sqlTransGenerator == null) {
                this.sqlTransGenerator = new SqlTransGenerator(this.sql, this.service.isStreaming() ? 0 : this.rowLimit, this.service.isStreaming() ? 0 : (serviceRowLimit > 0 ? serviceRowLimit : (!this.service.isUserDefined() ? 50000 : 0)));
            }
            if (this.genTrans == null) {
                this.genTrans = new Trans(this.sqlTransGenerator.generateTransMeta());
            }
            this.serviceTrans.setContainerObjectId(UUID.randomUUID().toString());
            this.serviceTrans.setMetaStore(this.metastore);
            this.serviceTrans.setGatheringMetrics(this.enableMetrics);
            this.genTrans.setContainerObjectId(UUID.randomUUID().toString());
            this.genTrans.setMetaStore(this.metastore);
            this.genTrans.setGatheringMetrics(this.enableMetrics);
            this.parameters.putAll(this.getWhereConditionParameters());
            DataServiceExecutor dataServiceExecutor = new DataServiceExecutor(this);
            this.context.addExecutor(dataServiceExecutor);
            if (this.logLevel != null) {
                dataServiceExecutor.setLogLevel(this.logLevel);
            }
            if (!this.service.isStreaming()) {
                dataServiceExecutor.prepareExecution();
            }
            return dataServiceExecutor;
        }

        private StreamServiceKey getStreamingServiceKey() {
            ArrayList<OptimizationImpactInfo> optimizationImpactList = new ArrayList<OptimizationImpactInfo>();
            if (this.serviceTrans == null) {
                this.serviceTrans(this.service.getServiceTrans());
            }
            DataServiceExecutor transientDataServiceExecutor = new DataServiceExecutor(this);
            for (PushDownOptimizationMeta optimizationMeta : this.service.getPushDownOptimizationMeta()) {
                if (!optimizationMeta.isEnabled()) continue;
                optimizationImpactList.add(optimizationMeta.preview(transientDataServiceExecutor));
            }
            return StreamServiceKey.create(this.service.getName(), this.parameters, optimizationImpactList);
        }

        private void addInlinePushDownParameters() {
            if (this.sql.getWhereCondition() != null) {
                this.traverseConditions(this.sql.getWhereCondition().getCondition(), c -> {
                    if (this.isParameterFunction((Condition)c)) {
                        this.parameters.put(c.getLeftValuename(), c.getRightExactString());
                    }
                });
            }
        }

        private boolean isParameterFunction(Condition cond) {
            return cond.getFunction() == 14 && cond.getLeftValuename() != null && cond.getLeftValuename().equals(cond.getRightValuename()) && cond.getRightExact() != null;
        }

        private void traverseConditions(Condition cond, Consumer<Condition> consumer) {
            consumer.accept(cond);
            for (Condition child : cond.getChildren()) {
                this.traverseConditions(child, consumer);
            }
        }

        private long getStreamingLimit(long userLimit, long serviceLimit, long kettleLimit, long defaultLimit) throws KettleException {
            if (serviceLimit > 0L && kettleLimit > 0L && userLimit > 0L) {
                return Math.min(Math.min(serviceLimit, kettleLimit), userLimit);
            }
            if (serviceLimit > 0L && kettleLimit > 0L) {
                return Math.min(serviceLimit, kettleLimit);
            }
            if (serviceLimit > 0L && userLimit > 0L) {
                return Math.min(serviceLimit, userLimit);
            }
            if (kettleLimit > 0L && userLimit > 0L) {
                return Math.min(kettleLimit, userLimit);
            }
            if (kettleLimit > 0L) {
                return kettleLimit;
            }
            if (serviceLimit > 0L) {
                return serviceLimit;
            }
            if (userLimit > 0L) {
                return userLimit;
            }
            return defaultLimit;
        }

        private int getServiceRowLimit(DataServiceMeta service) throws KettleException {
            if (service.getRowLimit() > 0) {
                return service.getRowLimit();
            }
            if (!service.isUserDefined() || service.isStreaming()) {
                return this.getKettleRowLimit();
            }
            return 0;
        }

        @VisibleForTesting
        protected int getKettleRowLimit() throws KettleException {
            int result;
            block4: {
                String limit = this.kettleUtils.getKettleProperty("dataservice.dynamic.limit");
                result = 0;
                if (limit == null || limit.isEmpty()) {
                    limit = this.kettleUtils.getKettleProperty("det.dataservice.dynamic.limit");
                }
                if (!Utils.isEmpty((CharSequence)limit)) {
                    try {
                        result = Integer.parseInt(limit);
                    }
                    catch (NumberFormatException e) {
                        if (this.context == null || this.context.getLogChannel() == null) break block4;
                        this.context.getLogChannel().logError(String.format("%s: %s ", "dataservice.dynamic.limit", e));
                    }
                }
            }
            return result;
        }

        @VisibleForTesting
        protected long getKettleTimeLimit() throws KettleException {
            long result;
            block3: {
                String limit = this.kettleUtils.getKettleProperty("dataservice.dynamic.timelimitmilli");
                result = 0L;
                if (!Utils.isEmpty((CharSequence)limit)) {
                    try {
                        result = Long.parseLong(limit);
                    }
                    catch (NumberFormatException e) {
                        if (this.context == null || this.context.getLogChannel() == null) break block3;
                        this.context.getLogChannel().logError(String.format("%s: %s ", "dataservice.dynamic.timelimitmilli", e));
                    }
                }
            }
            return result;
        }

        private Map<String, String> getWhereConditionParameters() {
            HashMap<String, String> conditionParameters = new HashMap<String, String>();
            if (this.sql.getWhereCondition() != null) {
                this.extractConditionParameters(this.sql.getWhereCondition().getCondition(), conditionParameters);
            }
            return conditionParameters;
        }

        private void extractConditionParameters(Condition condition, Map<String, String> parameters) {
            if (condition.isAtomic()) {
                if (condition.getFunction() == 14) {
                    parameters.put(condition.getLeftValuename(), condition.getRightExactString());
                    this.stripFieldNamesFromTrueFunction(condition);
                }
            } else {
                for (Condition sub : condition.getChildren()) {
                    this.extractConditionParameters(sub, parameters);
                }
            }
        }

        private void stripFieldNamesFromTrueFunction(Condition condition) {
            assert (condition.getFunction() == 14);
            condition.setLeftValuename(null);
            condition.setRightValuename(null);
        }
    }
}

