package com.vortex.device.lib.kafka.listener;

import com.vortex.device.lib.kafka.consumer.SimpleConsumer;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/vortex/device/lib/kafka/listener/BatchAbsKafkaMsgListener.class */
public abstract class BatchAbsKafkaMsgListener implements IKafkaMsgListener {
    protected final Logger logger = LoggerFactory.getLogger(getClass());

    protected abstract Long handleMessage(TopicPartition topicPartition, List<ConsumerRecord<String, String>> list);

    @Override // com.vortex.device.lib.kafka.listener.IKafkaMsgListener
    public void onProcessAndConfirm(SimpleConsumer simpleConsumer, ConsumerRecords<String, String> consumerRecords) {
        if (consumerRecords == null || consumerRecords.isEmpty()) {
            return;
        }
        Object obj = simpleConsumer.m2getConfig().get("enable.auto.commit");
        boolean z = obj == null || Boolean.parseBoolean(obj.toString());
        for (TopicPartition topicPartition : consumerRecords.partitions()) {
            List<ConsumerRecord<String, String>> records = consumerRecords.records(topicPartition);
            Long handleMessage = handleMessage(topicPartition, records);
            if (!z) {
                Long valueOf = Long.valueOf(records.get(0).offset());
                Long valueOf2 = Long.valueOf(records.get(records.size() - 1).offset());
                if (handleMessage == null || handleMessage.longValue() < valueOf.longValue()) {
                    simpleConsumer.seek(topicPartition, valueOf.longValue());
                    this.logger.info("topic[{}] partition[{}] to offset[{}]: processed none", new Object[]{topicPartition.topic(), Integer.valueOf(topicPartition.partition()), valueOf});
                } else if (handleMessage.longValue() < valueOf.longValue() || handleMessage.longValue() >= valueOf2.longValue()) {
                    simpleConsumer.doCommitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(handleMessage.longValue() + 1)));
                    this.logger.info("topic[{}] partition[{}] to offset[{}]: processed all", new Object[]{topicPartition.topic(), Integer.valueOf(topicPartition.partition()), Long.valueOf(handleMessage.longValue() + 1)});
                } else {
                    simpleConsumer.doCommitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(handleMessage.longValue())));
                    simpleConsumer.seek(topicPartition, handleMessage.longValue());
                    this.logger.info("topic[{}] partition[{}] to offset[{}]: processed part", new Object[]{topicPartition.topic(), Integer.valueOf(topicPartition.partition()), handleMessage});
                }
            }
        }
    }

    @Override // com.vortex.device.lib.kafka.listener.IKafkaMsgListener
    public void onFailed(Throwable th) {
    }
}
