package com.vortex.sds.job.transfer;

import com.vortex.sds.config.TsdbProperties;
import com.vortex.sds.dao.tsdb.TsdbDeviceFactorDataRepository;
import com.vortex.sds.dto.DeviceFactorsData;
import com.vortex.util.kafka.msg.KafkaMsg;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import reactor.core.Disposable;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;

/* loaded from: input_file:com/vortex/sds/job/transfer/ToTsdbKafkaListener.class */
public class ToTsdbKafkaListener implements SmartLifecycle, Runnable {
    private static final Logger log = LoggerFactory.getLogger(ToTsdbKafkaListener.class);
    private TsdbProperties tsdbProperties;
    private DeviceFactorDataToModelTransformer deviceFactorDataToModelTransformer;
    private MongoToTsdbTransformer mongoToTsdbTransformer;
    private TsdbDeviceFactorDataRepository tsdbDeviceFactorDataRepository;
    private Disposable disposable;
    private AtomicBoolean running = new AtomicBoolean();
    private AtomicLong dataCounter = new AtomicLong();

    /* loaded from: input_file:com/vortex/sds/job/transfer/ToTsdbKafkaListener$RecordsConsumer.class */
    class RecordsConsumer<T extends ConsumerRecord<String, String>> implements Consumer<List<T>> {
        RecordsConsumer() {
        }

        @Override // java.util.function.Consumer
        public void accept(List<T> list) {
            LinkedList linkedList = new LinkedList();
            list.forEach(consumerRecord -> {
                try {
                    ToTsdbKafkaListener.this.deviceFactorDataToModelTransformer.apply((DeviceFactorsData) KafkaMsg.recordToMsg(consumerRecord).getPojo(DeviceFactorsData.class)).forEach(deviceDataModel -> {
                        linkedList.add(ToTsdbKafkaListener.this.mongoToTsdbTransformer.apply(deviceDataModel));
                    });
                } catch (Throwable th) {
                    th.printStackTrace();
                }
            });
            int size = linkedList.size();
            ToTsdbKafkaListener.log.info("record size: {}, factors size: {}", Integer.valueOf(list.size()), Integer.valueOf(size));
            if (size > 0) {
                long addAndGet = ToTsdbKafkaListener.this.dataCounter.addAndGet(size);
                try {
                    ToTsdbKafkaListener.this.tsdbDeviceFactorDataRepository.saveDataFactorData(linkedList);
                } catch (Throwable th) {
                    th.printStackTrace();
                    ToTsdbKafkaListener.log.error("Device factor save error, caused by [{}]", th.getMessage());
                }
                ToTsdbKafkaListener.log.info("Device factor data saved, about [{}/{}] saved to TSDB", Integer.valueOf(size), Long.valueOf(addAndGet));
            }
        }
    }

    public ToTsdbKafkaListener(TsdbProperties tsdbProperties, DeviceFactorDataToModelTransformer deviceFactorDataToModelTransformer, MongoToTsdbTransformer mongoToTsdbTransformer, TsdbDeviceFactorDataRepository tsdbDeviceFactorDataRepository) {
        this.tsdbProperties = tsdbProperties;
        this.deviceFactorDataToModelTransformer = deviceFactorDataToModelTransformer;
        this.mongoToTsdbTransformer = mongoToTsdbTransformer;
        this.tsdbDeviceFactorDataRepository = tsdbDeviceFactorDataRepository;
    }

    public boolean isAutoStartup() {
        return false;
    }

    public void stop(Runnable runnable) {
        stop();
        if (runnable != null) {
            runnable.run();
        }
    }

    public void start() {
        new Thread(this).start();
        this.running.set(true);
    }

    public void stop() {
        this.running.set(false);
        if (this.disposable != null) {
            this.disposable.dispose();
        }
    }

    public boolean isRunning() {
        return this.running.get();
    }

    public int getPhase() {
        return Integer.MAX_VALUE;
    }

    @Override // java.lang.Runnable
    public void run() {
        log.info("\n<<<<<<\n Starting kafka listener flux\n<<<<<<");
        KafkaReceiver.create(ReceiverOptions.create(this.tsdbProperties.getKafka().buildConsumerProperties()).withKeyDeserializer(new StringDeserializer()).withValueDeserializer(new StringDeserializer()).subscription(Collections.singletonList("SDS_REAL_TIME_DATA")).schedulerSupplier(() -> {
            return Schedulers.newElastic("kafka-receiver-scheduler");
        })).receiveAutoAck().flatMap((v0) -> {
            return v0.collectList();
        }).subscribe(new RecordsConsumer());
        log.info("\n<<<<<<\nKafka listener flux started\n<<<<<<");
    }
}
