package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.internals.ThreadCache;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-1.1.0.jar:org/apache/kafka/streams/processor/internals/ProcessorContextImpl.class */
public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
    private final StreamTask task;
    private final RecordCollector collector;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ProcessorContextImpl(TaskId taskId, StreamTask streamTask, StreamsConfig streamsConfig, RecordCollector recordCollector, ProcessorStateManager processorStateManager, StreamsMetrics streamsMetrics, ThreadCache threadCache) {
        super(taskId, streamsConfig, streamsMetrics, processorStateManager, threadCache);
        this.task = streamTask;
        this.collector = recordCollector;
    }

    public ProcessorStateManager getStateMgr() {
        return (ProcessorStateManager) this.stateManager;
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector.Supplier
    public RecordCollector recordCollector() {
        return this.collector;
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public StateStore getStateStore(String str) {
        if (currentNode() == null) {
            throw new TopologyBuilderException("Accessing from an unknown node");
        }
        StateStore globalStore = this.stateManager.getGlobalStore(str);
        if (globalStore != null) {
            return globalStore;
        }
        if (currentNode().stateStores.contains(str)) {
            return this.stateManager.getStore(str);
        }
        throw new TopologyBuilderException("Processor " + currentNode().name() + " has no access to StateStore " + str);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public <K, V> void forward(K k, V v, int i) {
        ProcessorNode currentNode = currentNode();
        ProcessorNode<?, ?> processorNode = currentNode().children().get(i);
        setCurrentNode(processorNode);
        try {
            processorNode.process(k, v);
            setCurrentNode(currentNode);
        } catch (Throwable th) {
            setCurrentNode(currentNode);
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public <K, V> void forward(K k, V v, String str) {
        for (ProcessorNode<?, ?> processorNode : currentNode().children()) {
            if (processorNode.name().equals(str)) {
                ProcessorNode currentNode = currentNode();
                setCurrentNode(processorNode);
                try {
                    processorNode.process(k, v);
                    setCurrentNode(currentNode);
                    return;
                } catch (Throwable th) {
                    setCurrentNode(currentNode);
                    throw th;
                }
            }
        }
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public void commit() {
        this.task.needCommit();
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    public Cancellable schedule(long j, PunctuationType punctuationType, Punctuator punctuator) {
        return this.task.schedule(j, punctuationType, punctuator);
    }

    @Override // org.apache.kafka.streams.processor.ProcessorContext
    @Deprecated
    public void schedule(long j) {
        schedule(j, PunctuationType.STREAM_TIME, new Punctuator() { // from class: org.apache.kafka.streams.processor.internals.ProcessorContextImpl.1
            @Override // org.apache.kafka.streams.processor.Punctuator
            public void punctuate(long j2) {
                ProcessorContextImpl.this.currentNode().processor().punctuate(j2);
            }
        });
    }
}
