package org.apache.kafka.streams.processor.internals;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.slf4j.Logger;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-1.1.0.jar:org/apache/kafka/streams/processor/internals/RecordCollectorImpl.class */
public class RecordCollectorImpl implements RecordCollector {
    private final Logger log;
    private final Producer<byte[], byte[]> producer;
    private final Map<TopicPartition, Long> offsets = new HashMap();
    private final String logPrefix;
    private final ProductionExceptionHandler productionExceptionHandler;
    private static final String LOG_MESSAGE = "Error sending record (key {} value {} timestamp {}) to topic {} due to {}; No more records will be sent and no more offsets will be recorded for this task.";
    private static final String EXCEPTION_MESSAGE = "%sAbort sending since %s with a previous record (key %s value %s timestamp %d) to topic %s due to %s";
    private static final String PARAMETER_HINT = "\nYou can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error.";
    private static final String HANDLER_CONTINUED_MESSAGE = "Error sending records (key {} value {} timestamp {}) to topic {} due to {}; The exception handler chose to CONTINUE processing in spite of this error.";
    private volatile KafkaException sendException;

    public RecordCollectorImpl(Producer<byte[], byte[]> producer, String str, LogContext logContext, ProductionExceptionHandler productionExceptionHandler) {
        this.producer = producer;
        this.logPrefix = String.format("task [%s] ", str);
        this.log = logContext.logger(getClass());
        this.productionExceptionHandler = productionExceptionHandler;
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public <K, V> void send(String str, K k, V v, Long l, Serializer<K> serializer, Serializer<V> serializer2, StreamPartitioner<? super K, ? super V> streamPartitioner) {
        Integer num = null;
        if (streamPartitioner != null) {
            List<PartitionInfo> partitionsFor = this.producer.partitionsFor(str);
            if (partitionsFor.size() <= 0) {
                throw new StreamsException("Could not get partition information for topic '" + str + "'. This can happen if the topic does not exist.");
            }
            num = streamPartitioner.partition(k, v, partitionsFor.size());
        }
        send(str, (String) k, (K) v, num, l, (Serializer<String>) serializer, (Serializer<K>) serializer2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean productionExceptionIsFatal(Exception exc) {
        return ((exc instanceof AuthenticationException) || (exc instanceof AuthorizationException) || (exc instanceof SecurityDisabledException)) || ((exc instanceof InvalidTopicException) || (exc instanceof UnknownServerException) || (exc instanceof SerializationException) || (exc instanceof OffsetMetadataTooLarge) || (exc instanceof IllegalStateException));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <K, V> void recordSendError(K k, V v, Long l, String str, Exception exc) {
        String str2 = LOG_MESSAGE;
        String str3 = EXCEPTION_MESSAGE;
        if (exc instanceof RetriableException) {
            str2 = str2 + PARAMETER_HINT;
            str3 = str3 + PARAMETER_HINT;
        }
        this.log.error(str2, k, v, l, str, exc);
        this.sendException = new StreamsException(String.format(str3, this.logPrefix, "an error caught", k, v, l, str, exc.getMessage()), exc);
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public <K, V> void send(final String str, final K k, final V v, Integer num, final Long l, Serializer<K> serializer, Serializer<V> serializer2) {
        checkForException();
        final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(str, num, l, serializer.serialize(str, k), serializer2.serialize(str, v));
        try {
            this.producer.send(producerRecord, new Callback() { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorImpl.1
                @Override // org.apache.kafka.clients.producer.Callback
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    if (exc == null) {
                        if (RecordCollectorImpl.this.sendException != null) {
                            return;
                        }
                        RecordCollectorImpl.this.offsets.put(new TopicPartition(recordMetadata.topic(), recordMetadata.partition()), Long.valueOf(recordMetadata.offset()));
                    } else if (RecordCollectorImpl.this.sendException == null) {
                        if (exc instanceof ProducerFencedException) {
                            RecordCollectorImpl.this.log.warn(RecordCollectorImpl.LOG_MESSAGE, k, v, l, str, exc.getMessage());
                            RecordCollectorImpl.this.sendException = new ProducerFencedException(String.format(RecordCollectorImpl.EXCEPTION_MESSAGE, RecordCollectorImpl.this.logPrefix, "producer got fenced", k, v, l, str, exc.getMessage()));
                        } else if (RecordCollectorImpl.this.productionExceptionIsFatal(exc)) {
                            RecordCollectorImpl.this.recordSendError(k, v, l, str, exc);
                        } else if (RecordCollectorImpl.this.productionExceptionHandler.handle(producerRecord, exc) == ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL) {
                            RecordCollectorImpl.this.recordSendError(k, v, l, str, exc);
                        } else {
                            RecordCollectorImpl.this.log.debug(RecordCollectorImpl.HANDLER_CONTINUED_MESSAGE, k, v, l, str, exc);
                        }
                    }
                }
            });
        } catch (TimeoutException e) {
            this.log.error("Timeout exception caught when sending record to topic {}. This might happen if the producer cannot send data to the Kafka cluster and thus, its internal buffer fills up. You can increase producer parameter `max.block.ms` to increase this timeout.", str);
            throw new StreamsException(String.format("%sFailed to send record to topic %s due to timeout.", this.logPrefix, str));
        } catch (Exception e2) {
            throw new StreamsException(String.format(EXCEPTION_MESSAGE, this.logPrefix, "an error caught", k, v, l, str, e2.getMessage()), e2);
        }
    }

    private void checkForException() {
        if (this.sendException != null) {
            throw this.sendException;
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public void flush() {
        this.log.debug("Flushing producer");
        this.producer.flush();
        checkForException();
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public void close() {
        this.log.debug("Closing producer");
        this.producer.close();
        checkForException();
    }

    @Override // org.apache.kafka.streams.processor.internals.RecordCollector
    public Map<TopicPartition, Long> offsets() {
        return this.offsets;
    }

    Producer<byte[], byte[]> producer() {
        return this.producer;
    }
}
