package com.vortex.sds.job.transfer;

import com.alibaba.fastjson.JSON;
import com.vortex.sds.config.TsdbProperties;
import com.vortex.sds.dao.mongo.IDeviceFactorDataRepository;
import com.vortex.sds.dao.tsdb.TsdbDeviceFactorDataRepository;
import com.vortex.sds.dto.DeviceFactorData;
import com.vortex.sds.model.mongo.DeviceDataModel;
import java.time.Duration;
import java.util.Collections;
import java.util.Date;
import java.util.concurrent.atomic.AtomicBoolean;
import org.joda.time.LocalDateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.mongodb.core.mapping.event.AbstractMongoEventListener;
import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;

/* loaded from: input_file:com/vortex/sds/job/transfer/ToTsdbMongoEventListener.class */
public class ToTsdbMongoEventListener extends AbstractMongoEventListener<DeviceDataModel> {
    private static final Logger log;
    private MongoToTsdbTransformer mongoToTsdbTransformer;
    private TsdbDeviceFactorDataRepository tsdbDeviceFactorDataRepository;
    private IDeviceFactorDataRepository mongoDataRepository;
    private final AtomicBoolean firstDataSaved = new AtomicBoolean(false);
    private Disposable fluxDisposable;
    private EmitterProcessor<DeviceDataModel> emitter;
    private Integer backwardCount;
    static final /* synthetic */ boolean $assertionsDisabled;

    public ToTsdbMongoEventListener(MongoToTsdbTransformer mongoToTsdbTransformer, TsdbDeviceFactorDataRepository tsdbDeviceFactorDataRepository, IDeviceFactorDataRepository iDeviceFactorDataRepository, TsdbProperties tsdbProperties) {
        this.mongoToTsdbTransformer = mongoToTsdbTransformer;
        this.tsdbDeviceFactorDataRepository = tsdbDeviceFactorDataRepository;
        this.mongoDataRepository = iDeviceFactorDataRepository;
        int intValue = tsdbProperties.getListenerBatchSize().intValue() == 0 ? 1000 : tsdbProperties.getListenerBatchSize().intValue();
        this.backwardCount = Integer.valueOf(tsdbProperties.getBackwardCount() == null ? 1000 : tsdbProperties.getBackwardCount().intValue());
        Long listenerTimeoutDurationMillis = tsdbProperties.getListenerTimeoutDurationMillis();
        Duration ofSeconds = listenerTimeoutDurationMillis == null ? Duration.ofSeconds(5L) : Duration.ofMillis(listenerTimeoutDurationMillis.longValue());
        log.info("To TSDB process pool size is {}", Integer.valueOf(intValue));
        this.emitter = EmitterProcessor.create();
        Flux doOnDiscard = Flux.from(this.emitter).map(mongoToTsdbTransformer).bufferTimeout(intValue, ofSeconds).doOnDiscard(DeviceFactorData.class, deviceFactorData -> {
            log.info("Shutdown saving: {}", JSON.toJSONString(deviceFactorData));
            tsdbDeviceFactorDataRepository.saveDataFactorData(Collections.singletonList(deviceFactorData));
        });
        tsdbDeviceFactorDataRepository.getClass();
        this.fluxDisposable = doOnDiscard.subscribe(tsdbDeviceFactorDataRepository::saveDataFactorData);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.warn("Try to save data to TSDB in shutdown hook");
            this.fluxDisposable.dispose();
        }));
    }

    public void onAfterSave(AfterSaveEvent<DeviceDataModel> afterSaveEvent) {
        DeviceDataModel deviceDataModel = (DeviceDataModel) afterSaveEvent.getSource();
        if (!$assertionsDisabled && deviceDataModel.getId() == null) {
            throw new AssertionError();
        }
        if (!this.firstDataSaved.get()) {
            synchronized (this) {
                if (!this.firstDataSaved.get()) {
                    this.firstDataSaved.set(true);
                    Long createDatetime = deviceDataModel.getCreateDatetime();
                    String id = deviceDataModel.getId();
                    log.info("Get first data saved to mongo, and extract its timestamp {}, ie {}", createDatetime, LocalDateTime.fromDateFields(new Date(createDatetime.longValue())));
                    new Thread(new BackwardMongoDataToTsdbAction(this.backwardCount, createDatetime, id, this.mongoToTsdbTransformer, this.tsdbDeviceFactorDataRepository, this.mongoDataRepository)).start();
                }
            }
        }
        this.emitter.onNext(afterSaveEvent.getSource());
    }

    static {
        $assertionsDisabled = !ToTsdbMongoEventListener.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(ToTsdbMongoEventListener.class);
    }
}
