package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.BrokerNotFoundException;
import org.apache.kafka.streams.errors.StreamsException;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-1.0.1.jar:org/apache/kafka/streams/processor/internals/StreamsKafkaClient.class */
public class StreamsKafkaClient {
    private static final ConfigDef CONFIG = StreamsConfig.configDef().withClientSslSupport().withClientSaslSupport();
    private final KafkaClient kafkaClient;
    private final List<MetricsReporter> reporters;
    private final Config streamsConfig;
    private final Map<String, String> defaultTopicConfigs = new HashMap();
    private static final int MAX_INFLIGHT_REQUESTS = 100;

    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-1.0.1.jar:org/apache/kafka/streams/processor/internals/StreamsKafkaClient$Config.class */
    public static class Config extends AbstractConfig {
        static Config fromStreamsConfig(StreamsConfig streamsConfig) {
            return new Config(streamsConfig.originals());
        }

        Config(Map<?, ?> map) {
            super(StreamsKafkaClient.CONFIG, map, false);
        }
    }

    StreamsKafkaClient(Config config, KafkaClient kafkaClient, List<MetricsReporter> list) {
        this.streamsConfig = config;
        this.kafkaClient = kafkaClient;
        this.reporters = list;
        extractDefaultTopicConfigs(config.originalsWithPrefix(StreamsConfig.TOPIC_PREFIX));
    }

    private void extractDefaultTopicConfigs(Map<String, Object> map) {
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            if (entry.getValue() != null) {
                this.defaultTopicConfigs.put(entry.getKey(), entry.getValue().toString());
            }
        }
    }

    public static StreamsKafkaClient create(Config config) {
        SystemTime systemTime = new SystemTime();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        String string = config.getString("client.id");
        linkedHashMap.put("client-id", string);
        Metadata metadata = new Metadata(config.getLong("retry.backoff.ms").longValue(), config.getLong("metadata.max.age.ms").longValue(), false);
        metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(config.getList("bootstrap.servers"))), Collections.emptySet(), systemTime.milliseconds());
        MetricConfig tags = new MetricConfig().samples(config.getInt("metrics.num.samples").intValue()).timeWindow(config.getLong("metrics.sample.window.ms").longValue(), TimeUnit.MILLISECONDS).tags(linkedHashMap);
        List configuredInstances = config.getConfiguredInstances("metric.reporters", MetricsReporter.class);
        configuredInstances.add(new JmxReporter("kafka.admin.client"));
        Metrics metrics = new Metrics(tags, configuredInstances, systemTime);
        ChannelBuilder createChannelBuilder = ClientUtils.createChannelBuilder(config);
        LogContext createLogContext = createLogContext(string);
        return new StreamsKafkaClient(config, new NetworkClient((Selectable) new Selector(config.getLong("connections.max.idle.ms").longValue(), metrics, systemTime, "kafka-client", createChannelBuilder, createLogContext), metadata, string, 100, config.getLong("reconnect.backoff.ms").longValue(), config.getLong("reconnect.backoff.max.ms").longValue(), config.getInt("send.buffer.bytes").intValue(), config.getInt("receive.buffer.bytes").intValue(), config.getInt("request.timeout.ms").intValue(), (Time) systemTime, true, new ApiVersions(), createLogContext), configuredInstances);
    }

    private static LogContext createLogContext(String str) {
        return new LogContext("[StreamsKafkaClient clientId=" + str + "] ");
    }

    public static StreamsKafkaClient create(StreamsConfig streamsConfig) {
        return create(Config.fromStreamsConfig(streamsConfig));
    }

    public void close() throws IOException {
        try {
            this.kafkaClient.close();
            Iterator<MetricsReporter> it = this.reporters.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
        } catch (Throwable th) {
            Iterator<MetricsReporter> it2 = this.reporters.iterator();
            while (it2.hasNext()) {
                it2.next().close();
            }
            throw th;
        }
    }

    public void createTopics(Map<InternalTopicConfig, Integer> map, int i, long j, MetadataResponse metadataResponse) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<InternalTopicConfig, Integer> entry : map.entrySet()) {
            InternalTopicConfig key = entry.getKey();
            Integer value = entry.getValue();
            Properties properties = key.toProperties(j);
            HashMap hashMap2 = new HashMap(this.defaultTopicConfigs);
            for (String str : properties.stringPropertyNames()) {
                hashMap2.put(str, properties.getProperty(str));
            }
            hashMap.put(key.name(), new CreateTopicsRequest.TopicDetails(value.intValue(), (short) i, hashMap2));
        }
        ClientResponse sendRequest = sendRequest(this.kafkaClient.newClientRequest(getControllerReadyBrokerId(metadataResponse), new CreateTopicsRequest.Builder(hashMap, this.streamsConfig.getInt("request.timeout.ms").intValue()), Time.SYSTEM.milliseconds(), true));
        if (!sendRequest.hasResponse()) {
            throw new StreamsException("Empty response for client request.");
        }
        if (!(sendRequest.responseBody() instanceof CreateTopicsResponse)) {
            throw new StreamsException("Inconsistent response type for internal topic creation request. Expected CreateTopicsResponse but received " + sendRequest.responseBody().getClass().getName());
        }
        CreateTopicsResponse createTopicsResponse = (CreateTopicsResponse) sendRequest.responseBody();
        for (InternalTopicConfig internalTopicConfig : map.keySet()) {
            ApiError apiError = createTopicsResponse.errors().get(internalTopicConfig.name());
            if (apiError.isFailure() && !apiError.is(Errors.TOPIC_ALREADY_EXISTS)) {
                throw new StreamsException("Could not create topic: " + internalTopicConfig.name() + " due to " + apiError.messageWithFallback());
            }
        }
    }

    private String ensureOneNodeIsReady(List<Node> list) {
        String str = null;
        long milliseconds = Time.SYSTEM.milliseconds() + this.streamsConfig.getInt("request.timeout.ms").intValue();
        boolean z = false;
        while (!z && Time.SYSTEM.milliseconds() < milliseconds) {
            for (Node node : list) {
                if (this.kafkaClient.ready(node, Time.SYSTEM.milliseconds())) {
                    str = Integer.toString(node.id());
                    z = true;
                    break;
                }
            }
            try {
                this.kafkaClient.poll(50L, Time.SYSTEM.milliseconds());
            } catch (Exception e) {
                throw new StreamsException("Could not poll.", e);
            }
        }
        if (str == null) {
            throw new BrokerNotFoundException("Could not find any available broker. Check your StreamsConfig setting 'bootstrap.servers'. This error might also occur, if you try to connect to pre-0.10 brokers. Kafka Streams requires broker version 0.10.1.x or higher.");
        }
        return str;
    }

    private String getControllerReadyBrokerId(MetadataResponse metadataResponse) {
        return ensureOneNodeIsReady(Collections.singletonList(metadataResponse.controller()));
    }

    private String getAnyReadyBrokerId() {
        Metadata metadata = new Metadata(this.streamsConfig.getLong("retry.backoff.ms").longValue(), this.streamsConfig.getLong("metadata.max.age.ms").longValue(), false);
        metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(this.streamsConfig.getList("bootstrap.servers"))), Collections.emptySet(), Time.SYSTEM.milliseconds());
        return ensureOneNodeIsReady(metadata.fetch().nodes());
    }

    private ClientResponse sendRequest(ClientRequest clientRequest) {
        try {
            this.kafkaClient.send(clientRequest, Time.SYSTEM.milliseconds());
            long milliseconds = Time.SYSTEM.milliseconds() + this.streamsConfig.getInt("request.timeout.ms").intValue();
            while (Time.SYSTEM.milliseconds() < milliseconds) {
                try {
                    List<ClientResponse> poll = this.kafkaClient.poll(100L, Time.SYSTEM.milliseconds());
                    if (!poll.isEmpty()) {
                        if (poll.size() > 1) {
                            throw new StreamsException("Sent one request but received multiple or no responses.");
                        }
                        ClientResponse clientResponse = poll.get(0);
                        if (clientResponse.requestHeader().correlationId() == clientRequest.correlationId()) {
                            return clientResponse;
                        }
                        throw new StreamsException("Inconsistent response received from the broker " + clientRequest.destination() + ", expected correlation id " + clientRequest.correlationId() + ", but received " + clientResponse.requestHeader().correlationId());
                    }
                } catch (IllegalStateException e) {
                    throw new StreamsException("Could not poll.", e);
                }
            }
            throw new StreamsException("Failed to get response from broker within timeout");
        } catch (Exception e2) {
            throw new StreamsException("Could not send request.", e2);
        }
    }

    public MetadataResponse fetchMetadata() {
        ClientResponse sendRequest = sendRequest(this.kafkaClient.newClientRequest(getAnyReadyBrokerId(), MetadataRequest.Builder.allTopics(), Time.SYSTEM.milliseconds(), true));
        if (!sendRequest.hasResponse()) {
            throw new StreamsException("Empty response for client request.");
        }
        if (sendRequest.responseBody() instanceof MetadataResponse) {
            return (MetadataResponse) sendRequest.responseBody();
        }
        throw new StreamsException("Inconsistent response type for internal topic metadata request. Expected MetadataResponse but received " + sendRequest.responseBody().getClass().getName());
    }

    public void checkBrokerCompatibility(boolean z) throws StreamsException {
        ClientResponse sendRequest = sendRequest(this.kafkaClient.newClientRequest(getAnyReadyBrokerId(), new ApiVersionsRequest.Builder(), Time.SYSTEM.milliseconds(), true));
        if (!sendRequest.hasResponse()) {
            throw new StreamsException("Empty response for client request.");
        }
        if (!(sendRequest.responseBody() instanceof ApiVersionsResponse)) {
            throw new StreamsException("Inconsistent response type for API versions request. Expected ApiVersionsResponse but received " + sendRequest.responseBody().getClass().getName());
        }
        ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) sendRequest.responseBody();
        if (apiVersionsResponse.apiVersion(ApiKeys.CREATE_TOPICS.id) == null) {
            throw new StreamsException("Kafka Streams requires broker version 0.10.1.x or higher.");
        }
        if (z && !brokerSupportsTransactions(apiVersionsResponse)) {
            throw new StreamsException("Setting processing.guarantee=exactly_once requires broker version 0.11.0.x or higher.");
        }
    }

    private boolean brokerSupportsTransactions(ApiVersionsResponse apiVersionsResponse) {
        return (apiVersionsResponse.apiVersion(ApiKeys.INIT_PRODUCER_ID.id) == null || apiVersionsResponse.apiVersion(ApiKeys.ADD_PARTITIONS_TO_TXN.id) == null || apiVersionsResponse.apiVersion(ApiKeys.ADD_OFFSETS_TO_TXN.id) == null || apiVersionsResponse.apiVersion(ApiKeys.END_TXN.id) == null || apiVersionsResponse.apiVersion(ApiKeys.WRITE_TXN_MARKERS.id) == null || apiVersionsResponse.apiVersion(ApiKeys.TXN_OFFSET_COMMIT.id) == null) ? false : true;
    }
}
