package com.vortex.platform.gpsdata.core.another;

import com.vortex.platform.gpsdata.config.GpsConfig;
import com.vortex.platform.gpsdata.core.PositionControl;
import com.vortex.platform.gpsdata.core.ThreadGroupIdentity;
import com.vortex.platform.gpsdata.core.processor.ErrorConsumer;
import com.vortex.platform.gpsdata.core.processor.GpsDataBatchSaveProcessor;
import com.vortex.platform.gpsdata.core.processor.InnerKafkaRecordToGpsData;
import com.vortex.platform.gpsdata.service.GpsDataFullService;
import com.vortex.platform.gpsdata.service.MileageRecalService;
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Service;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;

@EnableConfigurationProperties({KafkaProperties.class})
@Service
/* loaded from: input_file:com/vortex/platform/gpsdata/core/another/AnotherFlowStart.class */
public class AnotherFlowStart implements SmartLifecycle {
    private static final Logger logger = LoggerFactory.getLogger(DawnPositionFlux.class);
    private static final int MAX_RESTART_WAIT_TIME_MS_ = 10000;
    private static final int WAIT_TIME_INTERVAL_MS = 10000;
    private ReceiverOptions<String, String> kafkaReceiverOptions;
    private ThreadGroupIdentity groupIdentity;
    private GpsConfig gpsConfig;
    private ValidateHandler validateHandler;
    private SupplementHandler supplementHandler;
    private PositionCheckHandler positionCheckHandler;
    private MileageCalHandler mileageCalHandler;
    private RealTimeHandler realTimeHandler;
    private GpsDataBatchSaveProcessor gpsDataBatchSaveProcessor;
    private PositionControl positionControl;
    private final AtomicLong counter = new AtomicLong(0);
    private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
    private Disposable.Composite fluxDisposable = Disposables.composite();
    private AtomicBoolean running = new AtomicBoolean(false);
    private DeflectionHandler deflectionHandler = new DeflectionHandler();

    /* loaded from: input_file:com/vortex/platform/gpsdata/core/another/AnotherFlowStart$RestartTask.class */
    public class RestartTask implements Runnable {
        ErrorConsumer errorConsumer;

        public RestartTask(ErrorConsumer errorConsumer) {
            this.errorConsumer = errorConsumer;
        }

        @Override // java.lang.Runnable
        public void run() {
            new Thread(new Runnable() { // from class: com.vortex.platform.gpsdata.core.another.AnotherFlowStart.RestartTask.1
                @Override // java.lang.Runnable
                public void run() {
                    InnerKafkaRecordToGpsData innerKafkaRecordToGpsData = new InnerKafkaRecordToGpsData();
                    Flux receive = KafkaReceiver.create(AnotherFlowStart.this.kafkaReceiverOptions).receive();
                    innerKafkaRecordToGpsData.getClass();
                    AnotherFlowStart.this.fluxDisposable.add(receive.handle((v1, v2) -> {
                        r1.handle(v1, v2);
                    }).flatMap((v0) -> {
                        return Flux.fromIterable(v0);
                    }).groupBy(gpsFullDataWrap -> {
                        return AnotherFlowStart.this.groupIdentity.apply(gpsFullDataWrap.getGuid());
                    }).map(groupedFlux -> {
                        Flux map = groupedFlux.publishOn(Schedulers.newSingle((String) groupedFlux.key())).map(GpsDataProxy::new);
                        ValidateHandler validateHandler = AnotherFlowStart.this.validateHandler;
                        validateHandler.getClass();
                        Flux handle = map.handle((v1, v2) -> {
                            r1.handle(v1, v2);
                        });
                        DeflectionHandler deflectionHandler = AnotherFlowStart.this.deflectionHandler;
                        deflectionHandler.getClass();
                        Flux transform = handle.handle((v1, v2) -> {
                            r1.handle(v1, v2);
                        }).transform(flux -> {
                            return new DawnPositionFlux(flux, AnotherFlowStart.this.positionControl, AnotherFlowStart.this.gpsConfig);
                        });
                        SupplementHandler supplementHandler = AnotherFlowStart.this.supplementHandler;
                        supplementHandler.getClass();
                        Flux handle2 = transform.handle((v1, v2) -> {
                            r1.handle(v1, v2);
                        });
                        PositionCheckHandler positionCheckHandler = AnotherFlowStart.this.positionCheckHandler;
                        positionCheckHandler.getClass();
                        Flux handle3 = handle2.handle((v1, v2) -> {
                            r1.handle(v1, v2);
                        });
                        MileageCalHandler mileageCalHandler = AnotherFlowStart.this.mileageCalHandler;
                        mileageCalHandler.getClass();
                        Flux handle4 = handle3.handle((v1, v2) -> {
                            r1.handle(v1, v2);
                        });
                        RealTimeHandler realTimeHandler = AnotherFlowStart.this.realTimeHandler;
                        realTimeHandler.getClass();
                        Flux bufferTimeout = handle4.handle((v1, v2) -> {
                            r1.handle(v1, v2);
                        }).subscribeOn(Schedulers.elastic()).bufferTimeout(300, Duration.ofSeconds(5L));
                        GpsDataBatchSaveProcessor gpsDataBatchSaveProcessor = AnotherFlowStart.this.gpsDataBatchSaveProcessor;
                        gpsDataBatchSaveProcessor.getClass();
                        Disposable subscribe = bufferTimeout.handle((v1, v2) -> {
                            r1.handle(v1, v2);
                        }).subscribe((Consumer) null, RestartTask.this.errorConsumer);
                        AnotherFlowStart.this.fluxDisposable.add(subscribe);
                        return subscribe;
                    }).subscribe((Consumer) null, RestartTask.this.errorConsumer));
                }
            }, "kafka-start").start();
        }
    }

    @Autowired
    public AnotherFlowStart(ReceiverOptions<String, String> receiverOptions, ThreadGroupIdentity threadGroupIdentity, GpsConfig gpsConfig, GpsDataFullService gpsDataFullService, MileageRecalService mileageRecalService, PositionControl positionControl) {
        this.kafkaReceiverOptions = receiverOptions;
        this.groupIdentity = threadGroupIdentity;
        this.gpsConfig = gpsConfig;
        this.positionControl = positionControl;
        this.validateHandler = new ValidateHandler(gpsConfig);
        this.supplementHandler = new SupplementHandler(positionControl, mileageRecalService);
        this.positionCheckHandler = new PositionCheckHandler(positionControl, gpsConfig);
        this.mileageCalHandler = new MileageCalHandler(positionControl);
        this.realTimeHandler = new RealTimeHandler(positionControl);
        this.gpsDataBatchSaveProcessor = new GpsDataBatchSaveProcessor(gpsDataFullService);
    }

    public void start() {
        logger.debug("gps data processing start with Another Flow");
        startInOtherThread();
    }

    public void dispose() {
        if (this.fluxDisposable != null) {
            this.fluxDisposable.dispose();
        }
        this.fluxDisposable = Disposables.composite();
    }

    private void startInOtherThread() {
        long andAdd = this.counter.getAndAdd(1L);
        logger.info("restart the kafka thead,already restart times:{}", Long.valueOf(andAdd));
        long j = andAdd * 10000;
        this.scheduledExecutorService.schedule(new RestartTask(new ErrorConsumer(this)), j >= 10000 ? 10000L : j, TimeUnit.MILLISECONDS);
    }

    public void stop() {
        if (this.fluxDisposable != null) {
            this.fluxDisposable.dispose();
        }
        this.running.set(false);
    }

    public boolean isRunning() {
        return this.running.get();
    }

    public boolean isAutoStartup() {
        return true;
    }

    public void stop(Runnable runnable) {
    }

    public int getPhase() {
        return Integer.MAX_VALUE;
    }
}
