package com.vortex.base.kafka.consumer.kafka;

import com.vortex.base.kafka.consumer.dto.KafkaRecord;
import com.vortex.base.kafka.consumer.dto.ValueEvent;
import java.util.Collections;
import java.util.List;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;

@ConditionalOnProperty(value = {"spring.kafka.consumer.enable-auto-commit"}, havingValue = "false")
@Service
/* loaded from: input_file:com/vortex/base/kafka/consumer/kafka/ManualCommitServiceImpl.class */
public class ManualCommitServiceImpl implements IManualCommitService {
    private static final Logger LOGGER = LoggerFactory.getLogger(ManualCommitServiceImpl.class);

    @Autowired
    private KafkaConsumer consumer;

    @Override // com.vortex.base.kafka.consumer.kafka.IManualCommitService
    public void commitOffset(ValueEvent valueEvent) {
        if (valueEvent == null) {
            LOGGER.error("valueEvent is null");
            return;
        }
        Object value = valueEvent.getValue();
        if (value == null) {
            LOGGER.error("value is null");
            return;
        }
        if (!(value instanceof KafkaRecord)) {
            LOGGER.error("value Class is not KafkaRecord");
            return;
        }
        KafkaRecord kafkaRecord = (KafkaRecord) value;
        String str = kafkaRecord.getTopicPartition().topic();
        int partition = kafkaRecord.getTopicPartition().partition();
        List<ConsumerRecord<String, String>> recordList = kafkaRecord.getRecordList();
        if (CollectionUtils.isEmpty(recordList)) {
            LOGGER.error("unbelievable, recordList is empty. {}-{}-{}", str, Integer.valueOf(partition));
            return;
        }
        ConsumerRecord<String, String> consumerRecord = recordList.get(0);
        ConsumerRecord<String, String> consumerRecord2 = kafkaRecord.getRecordList().get(kafkaRecord.getRecordList().size() - 1);
        Long successfulOffset = kafkaRecord.getSuccessfulOffset();
        if (successfulOffset == null) {
            this.consumer.seek(kafkaRecord.getTopicPartition(), consumerRecord.offset());
            LOGGER.error("successfulOffset is null. seek to first of list, {}-{}-{}", new Object[]{str, Integer.valueOf(partition), Long.valueOf(consumerRecord.offset())});
            return;
        }
        if (successfulOffset.longValue() < consumerRecord.offset() || successfulOffset.longValue() > consumerRecord2.offset()) {
            this.consumer.seek(kafkaRecord.getTopicPartition(), consumerRecord.offset());
            LOGGER.error("successfulOffset is invalid. seek to first of list, {}-{}-{}", new Object[]{str, Integer.valueOf(partition), Long.valueOf(consumerRecord.offset())});
            return;
        }
        long longValue = successfulOffset.longValue() + 1;
        if (successfulOffset.longValue() < consumerRecord2.offset()) {
            this.consumer.seek(kafkaRecord.getTopicPartition(), longValue);
            LOGGER.info("successfulOffset is not the last one. seek to next, {}-{}-{}", new Object[]{str, Integer.valueOf(partition), Long.valueOf(longValue)});
        }
        try {
            this.consumer.commitSync(Collections.singletonMap(kafkaRecord.getTopicPartition(), new OffsetAndMetadata(longValue)));
            LOGGER.info("commitSync, {}-{}-{}", new Object[]{str, Integer.valueOf(partition), Long.valueOf(longValue)});
        } catch (Exception e) {
            LOGGER.error("caught exception. commitSync, {}-{}-{}. exception: {}", new Object[]{str, Integer.valueOf(partition), Long.valueOf(longValue), e.toString()});
        }
    }
}
