package com.vortex.base.kafka.consumer.kafka;

import com.google.common.base.Preconditions;
import com.vortex.base.kafka.consumer.cfg.ConsumerCfg;
import com.vortex.base.kafka.consumer.cfg.KafkaConfiguration;
import com.vortex.base.kafka.consumer.processor.IKafkaMsgProcessor;
import com.vortex.base.kafka.consumer.util.NamedThreadFactory;
import java.time.Duration;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.SmartLifecycle;

/* loaded from: input_file:com/vortex/base/kafka/consumer/kafka/AbstractKafkaConsumer.class */
public abstract class AbstractKafkaConsumer implements Runnable, SmartLifecycle {

    @Autowired
    protected KafkaConfiguration kafkaConfiguration;

    @Autowired
    protected KafkaConsumer consumer;

    @Autowired
    private ConsumerCfg consumerCfg;

    @Autowired
    private IConsumerOffsetService consumerOffsetService;
    private ThreadFactory threadFactory;

    @Autowired
    private IKafkaMsgProcessor msgProcessor;
    protected final Logger logger = LoggerFactory.getLogger(getClass());
    private AtomicBoolean isRunning = new AtomicBoolean(false);

    private void init() {
        this.logger.info("init ...");
        subscribe(this.consumer);
        this.threadFactory = new NamedThreadFactory("kafka consumer", (thread, th) -> {
            this.logger.error("uncaughtException, thread[{}] state[{}] Throwable: {}", new Object[]{thread, thread.getState(), th});
        });
        new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1), this.threadFactory).execute(this);
    }

    private void subscribe(KafkaConsumer kafkaConsumer) {
        Preconditions.checkNotNull(this.consumerCfg.getTopicRegex(), "topic regex is null");
        kafkaConsumer.subscribe(Pattern.compile(this.consumerCfg.getTopicRegex()), getConsumerRebalanceListener(kafkaConsumer));
    }

    protected ConsumerRebalanceListener getConsumerRebalanceListener(KafkaConsumer kafkaConsumer) {
        return new SimpleConsumerRebalanceListener(this.kafkaConfiguration.getGroupId(), kafkaConsumer, this.consumerCfg.isSeekToEndOnNoNextOffset(), this.consumerOffsetService);
    }

    private void destroy() {
        this.logger.info("destroy ...");
        if (this.consumer != null) {
            this.consumer.close();
        }
        this.threadFactory = null;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.logger.info("run ...");
        while (this.isRunning.get()) {
            long currentTimeMillis = System.currentTimeMillis();
            try {
                ConsumerRecords<String, String> poll = this.consumer.poll(Duration.ofMillis(this.consumerCfg.getPollTimeoutMs()));
                if (poll != null && !poll.isEmpty()) {
                    this.logger.info("pulled kafka msg, count:{}, cost:{} ms", Integer.valueOf(poll.count()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                    long currentTimeMillis2 = System.currentTimeMillis();
                    this.msgProcessor.process(poll);
                    commitOffset();
                    this.logger.info("processed kafka msg, count:{}, cost:{} ms\n\n", Integer.valueOf(poll.count()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis2));
                }
            } catch (Exception e) {
                this.logger.error(e.toString(), e);
            }
        }
    }

    protected void commitOffset() {
    }

    public boolean isAutoStartup() {
        return true;
    }

    public int getPhase() {
        return 0;
    }

    public boolean isRunning() {
        return this.isRunning.get();
    }

    public void start() {
        if (this.isRunning.get()) {
            return;
        }
        init();
        this.isRunning.set(true);
    }

    public void stop() {
    }

    public void stop(Runnable runnable) {
        destroy();
        this.isRunning.set(false);
    }
}
