package com.vortex.vehicle.data.consumer;

import com.vortex.device.util.thread.NamedThreadFactory;
import com.vortex.vehicle.data.config.VehicleGpsKafkaConfig;
import java.lang.Thread;
import java.time.Duration;
import java.util.Iterator;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.SmartLifecycle;

/* loaded from: input_file:com/vortex/vehicle/data/consumer/AbstractKafkaReceiver.class */
public abstract class AbstractKafkaReceiver implements Runnable, SmartLifecycle {
    private KafkaConsumer<String, String> consumer;

    @Autowired
    private VehicleGpsKafkaConfig vehicleGpsKafkaConfig;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private AtomicBoolean isRunning = new AtomicBoolean(false);

    private void init() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.vehicleGpsKafkaConfig.getBootstrapServers());
        properties.put("max.partition.fetch.bytes", this.vehicleGpsKafkaConfig.getMaxPartitionFetchBytes());
        properties.put("max.poll.records", this.vehicleGpsKafkaConfig.getMaxPollRecords());
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id", this.vehicleGpsKafkaConfig.getGroupId());
        properties.put("client.id", UUID.randomUUID().toString());
        properties.put("enable.auto.commit", this.vehicleGpsKafkaConfig.getEnableAutoCommit());
        properties.put("auto.commit.interval.ms", this.vehicleGpsKafkaConfig.getAutoCommitIntervalMs());
        properties.put("session.timeout.ms", this.vehicleGpsKafkaConfig.getSessionTimeoutMs());
        properties.put("auto.offset.reset", this.vehicleGpsKafkaConfig.getReset());
        this.consumer = new KafkaConsumer<>(properties);
        subscribe(this.consumer);
        new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1024), new NamedThreadFactory("kafka receiver") { // from class: com.vortex.vehicle.data.consumer.AbstractKafkaReceiver.1
            public Thread newThread(Runnable runnable) {
                Thread newThread = super.newThread(runnable);
                newThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: com.vortex.vehicle.data.consumer.AbstractKafkaReceiver.1.1
                    @Override // java.lang.Thread.UncaughtExceptionHandler
                    public void uncaughtException(Thread thread, Throwable th) {
                        AbstractKafkaReceiver.this.logger.error("thread {} exception: {}", thread, th);
                    }
                });
                return newThread;
            }
        }, new ThreadPoolExecutor.AbortPolicy()).execute(this);
    }

    private void destroy() {
        if (this.consumer != null) {
            this.consumer.close();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.isRunning.get()) {
            long currentTimeMillis = System.currentTimeMillis();
            ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(this.vehicleGpsKafkaConfig.getPollTimeoutMs()));
            if (poll != null && !poll.isEmpty()) {
                this.logger.info("pull from kafka cost:{} ms, pull count:{}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Integer.valueOf(poll.count()));
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    try {
                        process((ConsumerRecord) it.next());
                    } catch (Exception e) {
                        this.logger.error(e.toString(), e);
                    }
                }
                this.logger.info("deal kafka msg cost:{} ms, pull count:{}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Integer.valueOf(poll.count()));
            }
        }
    }

    protected abstract void process(ConsumerRecord<String, String> consumerRecord);

    protected abstract void subscribe(KafkaConsumer<String, String> kafkaConsumer);

    public void start() {
        this.logger.info("smartLifecycle.start()");
        if (this.isRunning.compareAndSet(false, true)) {
            init();
        }
    }

    public void stop() {
        this.logger.info("smartLifecycle.stop()");
        if (this.isRunning.compareAndSet(true, false)) {
            destroy();
        }
    }

    public void stop(Runnable runnable) {
        this.logger.info("smartLifecycle.stop(Runnable callback)");
    }

    public boolean isRunning() {
        this.logger.info("smartLifecycle.isRunning()");
        return this.isRunning.get();
    }
}
