package org.apache.kafka.streams.processor.internals;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.ChangedSerializer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.RecordCollector;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-1.0.1.jar:org/apache/kafka/streams/processor/internals/SinkNode.class */
public class SinkNode<K, V> extends ProcessorNode<K, V> {
    private final String topic;
    private Serializer<K> keySerializer;
    private Serializer<V> valSerializer;
    private final StreamPartitioner<? super K, ? super V> partitioner;
    private ProcessorContext context;

    public SinkNode(String str, String str2, Serializer<K> serializer, Serializer<V> serializer2, StreamPartitioner<? super K, ? super V> streamPartitioner) {
        super(str);
        this.topic = str2;
        this.keySerializer = serializer;
        this.valSerializer = serializer2;
        this.partitioner = streamPartitioner;
    }

    @Override // org.apache.kafka.streams.processor.internals.ProcessorNode
    public void addChild(ProcessorNode<?, ?> processorNode) {
        throw new UnsupportedOperationException("sink node does not allow addChild");
    }

    @Override // org.apache.kafka.streams.processor.internals.ProcessorNode
    public void init(ProcessorContext processorContext) {
        super.init(processorContext);
        this.context = processorContext;
        if (this.keySerializer == null) {
            this.keySerializer = (Serializer<K>) processorContext.keySerde().serializer();
        }
        if (this.valSerializer == null) {
            this.valSerializer = (Serializer<V>) processorContext.valueSerde().serializer();
        }
        if ((this.valSerializer instanceof ChangedSerializer) && ((ChangedSerializer) this.valSerializer).inner() == null) {
            ((ChangedSerializer) this.valSerializer).setInner(processorContext.valueSerde().serializer());
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.ProcessorNode
    public void process(K k, V v) {
        RecordCollector recordCollector = ((RecordCollector.Supplier) this.context).recordCollector();
        long timestamp = this.context.timestamp();
        if (timestamp < 0) {
            throw new StreamsException("Invalid (negative) timestamp of " + timestamp + " for output record <" + k + ":" + v + ">.");
        }
        try {
            recordCollector.send(this.topic, (String) k, (K) v, Long.valueOf(timestamp), (Serializer<String>) this.keySerializer, (Serializer<K>) this.valSerializer, (StreamPartitioner<? super String, ? super K>) this.partitioner);
        } catch (ClassCastException e) {
            throw new StreamsException(String.format("A serializer (key: %s / value: %s) is not compatible to the actual key or value type (key type: %s / value type: %s). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.", this.keySerializer.getClass().getName(), this.valSerializer.getClass().getName(), k == null ? "unknown because key is null" : k.getClass().getName(), v == null ? "unknown because value is null" : v.getClass().getName()), e);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.ProcessorNode
    public String toString() {
        return toString("");
    }

    @Override // org.apache.kafka.streams.processor.internals.ProcessorNode
    public String toString(String str) {
        StringBuilder sb = new StringBuilder(super.toString(str));
        sb.append(str).append("\ttopic:\t\t");
        sb.append(this.topic);
        sb.append(StringUtils.LF);
        return sb.toString();
    }
}
