package org.wso2.extension.siddhi.io.kafka.multidc.source;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.log4j.Logger;
import org.wso2.extension.siddhi.io.kafka.source.KafkaSource;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.stream.input.source.Source;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.OptionHolder;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;

@Extension(name = "kafkaMultiDC", namespace = "source", description = "The Kafka Multi Data Center(DC) Source receives records from the same topic in brokers deployed in two different kafka cluster. It will filter out all duplicate messages and try to ensurethat the events are received in the correct order by using sequence numbers. events are received in format such as `text`, `XML` JSON` and `Binary`.The Kafka Source will create the default partition '0' for a given topic, if the topic is not already been created in the Kafka cluster.", parameters = {@Parameter(name = KafkaSource.ADAPTOR_SUBSCRIBER_ZOOKEEPER_CONNECT_SERVERS, description = "This should contain the kafka server list which the kafka source should be listening to. This should be given in comma separated values. eg: 'localhost:9092,localhost:9093' ", type = {DataType.STRING}), @Parameter(name = KafkaMultiDCSource.KAFKA_TOPIC, description = "The topic  which the source would be listening to. eg: 'topic_one' ", type = {DataType.STRING}), @Parameter(name = KafkaMultiDCSource.KAFKA_PARTITION_NO, description = "The partition number for the given topic", type = {DataType.INT}, optional = true, defaultValue = "0"), @Parameter(name = KafkaSource.IS_BINARY_MESSAGE, description = "To receive the binary events via KafkaMultiDCSource, it is needed to set this parameter value to `true`.", type = {DataType.BOOL}, optional = true, defaultValue = "false"), @Parameter(name = KafkaSource.ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES, description = "This may contain all the other possible configurations which the consumer should be created with.eg: producer.type:async,batch.size:200", type = {DataType.STRING}, optional = true, defaultValue = "null")}, examples = {@Example(description = "The following query will listen to 'kafka_topic' topic deployed in broker host1:9092 and host1:9093 with partition 1. There will be a thread created for each broker. The receiving xml events will be mapped to a siddhi event and will be send to the FooStream.", syntax = "@App:name('TestExecutionPlan') \ndefine stream BarStream (symbol string, price float, volume long); \n@info(name = 'query1') \n@source(type='kafkaMultiDC', topic='kafka_topic', bootstrap.servers='host1:9092,host1:9093', partition.no='1', @map(type='xml'))\nDefine stream FooStream (symbol string, price float, volume long);\nfrom FooStream select symbol, price, volume insert into BarStream;\n")})
/* loaded from: input_file:org/wso2/extension/siddhi/io/kafka/multidc/source/KafkaMultiDCSource.class */
public class KafkaMultiDCSource extends Source {
    private static final String KAFKA_TOPIC = "topic";
    private static final String KAFKA_PARTITION_NO = "partition.no";
    private static final Logger LOG = Logger.getLogger(KafkaMultiDCSource.class);
    private static final String LAST_RECEIVED_SEQ_NO_KEY = "lastConsumedSeqNo";
    private SourceEventListener eventListener;
    private Map<String, KafkaSource> sources = new HashMap();
    private String[] bootstrapServers;
    private SourceSynchronizer synchronizer;

    public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] strArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.eventListener = sourceEventListener;
        String validateAndGetStaticValue = optionHolder.validateAndGetStaticValue(KafkaSource.ADAPTOR_SUBSCRIBER_ZOOKEEPER_CONNECT_SERVERS);
        boolean parseBoolean = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(KafkaSource.IS_BINARY_MESSAGE, "false"));
        this.bootstrapServers = validateAndGetStaticValue.split(KafkaSource.HEADER_SEPARATOR);
        if (this.bootstrapServers.length != 2) {
            throw new SiddhiAppValidationException("There should be two servers listed in 'bootstrap.servers' configuration to ensure fault tolerant kafka messaging.");
        }
        this.synchronizer = new SourceSynchronizer(sourceEventListener, this.bootstrapServers, 1000, 10);
        LOG.info("Initializing kafka source for bootstrap server :" + this.bootstrapServers[0]);
        Interceptor interceptor = new Interceptor(this.bootstrapServers[0], this.synchronizer, parseBoolean);
        OptionHolder createOptionHolders = createOptionHolders(this.bootstrapServers[0], optionHolder);
        KafkaSource kafkaSource = new KafkaSource();
        kafkaSource.init(interceptor, createOptionHolders, strArr, configReader, siddhiAppContext);
        this.sources.put(this.bootstrapServers[0], kafkaSource);
        LOG.info("Initializing kafka source for bootstrap server :" + this.bootstrapServers[1]);
        Interceptor interceptor2 = new Interceptor(this.bootstrapServers[1], this.synchronizer, parseBoolean);
        OptionHolder createOptionHolders2 = createOptionHolders(this.bootstrapServers[1], optionHolder);
        KafkaSource kafkaSource2 = new KafkaSource();
        kafkaSource2.init(interceptor2, createOptionHolders2, strArr, configReader, siddhiAppContext);
        this.sources.put(this.bootstrapServers[1], kafkaSource2);
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{String.class, byte[].class};
    }

    public void connect(Source.ConnectionCallback connectionCallback) throws ConnectionUnavailableException {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<String, KafkaSource> entry : this.sources.entrySet()) {
            try {
                entry.getValue().connect(connectionCallback);
                LOG.info("Connect to bootstrap server " + ((Object) entry.getKey()));
            } catch (ConnectionUnavailableException e) {
                sb.append("Error occurred while connecting to ").append((Object) entry.getKey()).append(":").append(e.getMessage()).append("\n");
            }
        }
        if (sb.toString().isEmpty()) {
            return;
        }
        LOG.error("Error while trying to connect boot strap server(s): " + sb.toString());
        throw new ConnectionUnavailableException(sb.toString());
    }

    public void disconnect() {
        this.sources.values().forEach((v0) -> {
            v0.disconnect();
        });
    }

    public void destroy() {
        this.sources.values().forEach((v0) -> {
            v0.destroy();
        });
    }

    public void pause() {
        this.sources.values().forEach((v0) -> {
            v0.pause();
        });
    }

    public void resume() {
        this.sources.values().forEach((v0) -> {
            v0.resume();
        });
    }

    public Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, KafkaSource> entry : this.sources.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue().currentState());
        }
        hashMap.put(LAST_RECEIVED_SEQ_NO_KEY, this.synchronizer.getLastConsumedSeqNo());
        return hashMap;
    }

    public void restoreState(Map<String, Object> map) {
        this.synchronizer.setLastConsumedSeqNo(((Long) map.get(LAST_RECEIVED_SEQ_NO_KEY)).longValue());
        this.sources.get(this.bootstrapServers[0]).restoreState((Map) map.get(this.bootstrapServers[0]));
        this.sources.get(this.bootstrapServers[1]).restoreState((Map) map.get(this.bootstrapServers[1]));
    }

    private OptionHolder createOptionHolders(String str, OptionHolder optionHolder) {
        HashMap hashMap = new HashMap();
        hashMap.put(KafkaSource.ADAPTOR_SUBSCRIBER_ZOOKEEPER_CONNECT_SERVERS, str);
        hashMap.put(KafkaSource.ADAPTOR_SUBSCRIBER_GROUP_ID, UUID.randomUUID().toString());
        hashMap.put(KafkaSource.THREADING_OPTION, KafkaSource.SINGLE_THREADED);
        hashMap.put(KafkaSource.SEQ_ENABLED, "false");
        hashMap.put(KafkaSource.ADAPTOR_SUBSCRIBER_PARTITION_NO_LIST, optionHolder.validateAndGetStaticValue(KAFKA_PARTITION_NO, "0"));
        hashMap.put(KafkaSource.ADAPTOR_SUBSCRIBER_TOPIC, optionHolder.validateAndGetStaticValue(KAFKA_TOPIC));
        hashMap.put(KafkaSource.ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES, optionHolder.validateAndGetStaticValue(KafkaSource.ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES, (String) null));
        hashMap.put(KafkaSource.IS_BINARY_MESSAGE, optionHolder.validateAndGetStaticValue(KafkaSource.IS_BINARY_MESSAGE, "false"));
        return new OptionHolder(this.eventListener.getStreamDefinition(), hashMap, new HashMap(), KafkaSource.class.getAnnotation(Extension.class));
    }
}
