package org.wso2.extension.siddhi.io.kafka.source;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;

/* loaded from: input_file:org/wso2/extension/siddhi/io/kafka/source/ConsumerKafkaGroup.class */
public class ConsumerKafkaGroup {
    private static final Logger LOG = Logger.getLogger(ConsumerKafkaGroup.class);
    private final String[] topics;
    private final String[] partitions;
    private final Properties props;
    private List<KafkaConsumerThread> kafkaConsumerThreadList = new ArrayList();
    private Map<String, Map<Integer, Long>> topicOffsetMap;
    private ScheduledExecutorService executorService;
    private String threadingOption;
    private boolean isBinaryMessage;
    private Map<String, Map<SequenceKey, Integer>> perConsumerLastReceivedSeqNo;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerKafkaGroup(String[] strArr, String[] strArr2, Properties properties, Map<String, Map<Integer, Long>> map, Map<String, Map<SequenceKey, Integer>> map2, String str, ScheduledExecutorService scheduledExecutorService, boolean z) {
        this.topicOffsetMap = new HashMap();
        this.perConsumerLastReceivedSeqNo = new HashMap();
        this.threadingOption = str;
        this.topicOffsetMap = map;
        this.perConsumerLastReceivedSeqNo = map2;
        this.topics = strArr;
        this.partitions = strArr2;
        this.props = properties;
        this.executorService = scheduledExecutorService;
        this.isBinaryMessage = z;
    }

    public void setTopicOffsetMap(Map<String, Map<Integer, Long>> map) {
        this.topicOffsetMap = map;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void pause() {
        this.kafkaConsumerThreadList.forEach((v0) -> {
            v0.pause();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void resume() {
        this.kafkaConsumerThreadList.forEach((v0) -> {
            v0.resume();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void restore(Map<String, Map<Integer, Long>> map) {
        this.kafkaConsumerThreadList.forEach(kafkaConsumerThread -> {
            kafkaConsumerThread.restore(map);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() {
        this.kafkaConsumerThreadList.forEach((v0) -> {
            v0.shutdownConsumer();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void run(SourceEventListener sourceEventListener) {
        try {
            if (KafkaSource.SINGLE_THREADED.equals(this.threadingOption)) {
                this.kafkaConsumerThreadList.add(new KafkaConsumerThread(sourceEventListener, this.topics, this.partitions, this.props, this.topicOffsetMap, false, this.isBinaryMessage));
                LOG.info("Kafka Consumer thread starting to listen on topic(s): " + Arrays.toString(this.topics) + " with partition/s: " + Arrays.toString(this.partitions));
            } else if (KafkaSource.TOPIC_WISE.equals(this.threadingOption)) {
                for (String str : this.topics) {
                    this.kafkaConsumerThreadList.add(new KafkaConsumerThread(sourceEventListener, new String[]{str}, this.partitions, this.props, this.topicOffsetMap, false, this.isBinaryMessage));
                    LOG.info("Kafka Consumer thread starting to listen on topic: " + str + " with partition/s: " + Arrays.toString(this.partitions));
                }
            } else if (KafkaSource.PARTITION_WISE.equals(this.threadingOption)) {
                for (String str2 : this.topics) {
                    for (String str3 : this.partitions) {
                        this.kafkaConsumerThreadList.add(new KafkaConsumerThread(sourceEventListener, new String[]{str2}, new String[]{str3}, this.props, this.topicOffsetMap, true, this.isBinaryMessage));
                        LOG.info("Kafka Consumer thread starting to listen on topic: " + str2 + " with partition: " + str3);
                    }
                }
            }
            for (KafkaConsumerThread kafkaConsumerThread : this.kafkaConsumerThreadList) {
                if (this.perConsumerLastReceivedSeqNo != null) {
                    Map<SequenceKey, Integer> map = this.perConsumerLastReceivedSeqNo.get(kafkaConsumerThread.getConsumerThreadId());
                    kafkaConsumerThread.setLastReceivedSeqNoMap(map != null ? map : new HashMap<>());
                }
                this.executorService.submit(kafkaConsumerThread);
            }
        } catch (Throwable th) {
            LOG.error("Error while creating KafkaConsumerThread for topic(s): " + Arrays.toString(this.topics), th);
        }
    }

    public Map<String, Map<Integer, Long>> getTopicOffsetMap() {
        HashMap hashMap = new HashMap();
        Iterator<KafkaConsumerThread> it = this.kafkaConsumerThreadList.iterator();
        while (it.hasNext()) {
            for (Map.Entry<String, Map<Integer, Long>> entry : it.next().getTopicOffsetMap().entrySet()) {
                hashMap.put(entry.getKey(), entry.getValue());
            }
        }
        return hashMap;
    }

    public Map<String, Map<SequenceKey, Integer>> getPerConsumerLastReceivedSeqNo() {
        if (this.perConsumerLastReceivedSeqNo != null) {
            for (KafkaConsumerThread kafkaConsumerThread : this.kafkaConsumerThreadList) {
                this.perConsumerLastReceivedSeqNo.put(kafkaConsumerThread.getConsumerThreadId(), kafkaConsumerThread.getLastReceivedSeqNoMap());
            }
        }
        return this.perConsumerLastReceivedSeqNo;
    }
}
