package com.vortex.sds.job.transfer;

import com.mongodb.MongoException;
import com.vortex.sds.api.dto.DeviceFactorData;
import com.vortex.sds.dao.mongo.IDeviceFactorDataRepository;
import com.vortex.sds.dao.tsdb.TsdbDeviceFactorDataRepository;
import com.vortex.sds.model.mongo.DeviceDataModel;
import java.text.NumberFormat;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.validation.constraints.NotNull;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.bson.Document;
import org.glassfish.jersey.internal.guava.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.mongodb.core.DocumentCallbackHandler;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.TopicProcessor;
import reactor.util.concurrent.WaitStrategy;

/* loaded from: input_file:com/vortex/sds/job/transfer/FullMongoDataToTsdbAction.class */
class FullMongoDataToTsdbAction implements Runnable {
    private TsdbDeviceFactorDataRepository tsdbDeviceFactorDataRepository;
    private IDeviceFactorDataRepository mongoDataRepository;
    private MongoConverter mongoConverter;
    private LocalDateTime startDate;
    private LocalDateTime endDate;
    private long startMillis;
    private static final int MAX = 1024;
    private MongoToTsdbTransformer mongoToTsdbTransformer;
    private Set<String> reDumpFactorCodes;
    private boolean reDump;
    private static final Logger log = LoggerFactory.getLogger(FullMongoDataToTsdbAction.class);
    private static NumberFormat NUMBER_FORMAT = NumberFormat.getNumberInstance();
    private int amount = 10;
    private TemporalUnit unit = ChronoUnit.MINUTES;
    private AtomicLong totalFetchCount = new AtomicLong();
    private AtomicLong taskCount = new AtomicLong();
    private long totalCount = 0;
    private AtomicLong savingTotalMillis = new AtomicLong();
    private AtomicLong storedCount = new AtomicLong();
    private int executorsCount = 10;
    private Semaphore threadSemaphore = new Semaphore(this.executorsCount);
    private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("thread-call-runner-%d").build();
    private ThreadPoolExecutor executor = new ThreadPoolExecutor(this.executorsCount, Integer.MAX_VALUE, 1, TimeUnit.MINUTES, new LinkedBlockingQueue(), this.namedThreadFactory);
    private TopicProcessor<DeviceFactorData> processor = TopicProcessor.builder().autoCancel(false).bufferSize(8192).waitStrategy(WaitStrategy.blocking()).build();

    /* loaded from: input_file:com/vortex/sds/job/transfer/FullMongoDataToTsdbAction$ActionThreadFactory.class */
    public static class ActionThreadFactory implements ThreadFactory {
        private static final AtomicInteger THREAD_COUNT = new AtomicInteger();

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(@NotNull Runnable runnable) {
            return new Thread(runnable, "from-mongo-" + THREAD_COUNT.incrementAndGet());
        }
    }

    /* loaded from: input_file:com/vortex/sds/job/transfer/FullMongoDataToTsdbAction$PutRun.class */
    class PutRun implements Runnable {
        Semaphore semaphore;
        LocalDateTime start;
        LocalDateTime end;

        PutRun(Semaphore semaphore, LocalDateTime localDateTime, LocalDateTime localDateTime2) {
            this.semaphore = semaphore;
            this.start = localDateTime;
            this.end = localDateTime2;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                FullMongoDataToTsdbAction.log.info("Finding task count: {}", Long.valueOf(FullMongoDataToTsdbAction.this.taskCount.incrementAndGet()));
                long currentTimeMillis = System.currentTimeMillis();
                long millis = FullMongoDataToTsdbAction.toMillis(this.start);
                long millis2 = FullMongoDataToTsdbAction.toMillis(this.end);
                final AtomicInteger atomicInteger = new AtomicInteger();
                FullMongoDataToTsdbAction.this.mongoDataRepository.findByObjectIdTime(millis, millis2, new DocumentCallbackHandler() { // from class: com.vortex.sds.job.transfer.FullMongoDataToTsdbAction.PutRun.1
                    public void processDocument(Document document) throws MongoException, DataAccessException {
                        DeviceFactorData apply = FullMongoDataToTsdbAction.this.mongoToTsdbTransformer.apply((DeviceDataModel) FullMongoDataToTsdbAction.this.mongoConverter.read(DeviceDataModel.class, document));
                        try {
                            if (!FullMongoDataToTsdbAction.this.reDump) {
                                FullMongoDataToTsdbAction.this.processor.onNext(apply);
                            } else if (FullMongoDataToTsdbAction.this.reDumpFactorCodes.contains(apply.getDeviceFactorCode())) {
                                FullMongoDataToTsdbAction.this.processor.onNext(apply);
                            }
                            atomicInteger.incrementAndGet();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
                long currentTimeMillis2 = System.currentTimeMillis();
                long j = currentTimeMillis2 - currentTimeMillis;
                long j2 = currentTimeMillis2 - FullMongoDataToTsdbAction.this.startMillis;
                int i = atomicInteger.get();
                long addAndGet = FullMongoDataToTsdbAction.this.totalFetchCount.addAndGet(i);
                FullMongoDataToTsdbAction.log.info(">>>> {} - {} ended, count {}/{}, rate {}/{}, cost {}/{} ms", new Object[]{this.start, this.end, Integer.valueOf(i), Long.valueOf(addAndGet), FullMongoDataToTsdbAction.NUMBER_FORMAT.format((i * 1000.0d) / j), FullMongoDataToTsdbAction.NUMBER_FORMAT.format((addAndGet * 1000.0d) / j2), Long.valueOf(j), Long.valueOf(j2)});
                this.semaphore.release();
            } catch (Throwable th) {
                this.semaphore.release();
                throw th;
            }
        }
    }

    public FullMongoDataToTsdbAction(MongoToTsdbTransformer mongoToTsdbTransformer, TsdbDeviceFactorDataRepository tsdbDeviceFactorDataRepository, IDeviceFactorDataRepository iDeviceFactorDataRepository, MongoConverter mongoConverter, LocalDateTime localDateTime, LocalDateTime localDateTime2, Set<String> set) {
        this.startMillis = 0L;
        this.tsdbDeviceFactorDataRepository = tsdbDeviceFactorDataRepository;
        this.mongoDataRepository = iDeviceFactorDataRepository;
        this.mongoConverter = mongoConverter;
        this.startDate = localDateTime;
        this.endDate = localDateTime2;
        this.startMillis = System.currentTimeMillis();
        this.mongoToTsdbTransformer = mongoToTsdbTransformer;
        this.reDumpFactorCodes = set;
        this.reDump = CollectionUtils.isNotEmpty(set);
        Flux.from(this.processor).bufferTimeout(MAX, Duration.ofSeconds(1L), () -> {
            return new ArrayList(MAX);
        }).subscribe((v1) -> {
            saveAndSummary(v1);
        });
    }

    @Override // java.lang.Runnable
    public void run() {
        LocalDateTime localDateTime = this.endDate;
        LocalDateTime localDateTime2 = this.startDate;
        Semaphore semaphore = new Semaphore(6);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(new ActionThreadFactory());
        while (localDateTime.isAfter(localDateTime2)) {
            LocalDateTime localDateTime3 = localDateTime2;
            LocalDateTime plus = localDateTime3.plus(this.amount, this.unit);
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            newCachedThreadPool.submit(new PutRun(semaphore, localDateTime3, plus));
            log.info("<<<<  {} - {}, fetching started", localDateTime3, plus);
            localDateTime2 = plus;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.time.ZonedDateTime] */
    public static long toMillis(LocalDateTime localDateTime) {
        return localDateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
    }

    private void saveAndSummary(List<DeviceFactorData> list) {
        int size = list.size();
        if (size == 0) {
            return;
        }
        try {
            this.threadSemaphore.acquire();
            this.executor.submit(() -> {
                long currentTimeMillis = System.currentTimeMillis();
                try {
                    try {
                        this.tsdbDeviceFactorDataRepository.saveDataFactorData(list);
                        this.threadSemaphore.release();
                        long currentTimeMillis2 = System.currentTimeMillis();
                        long j = currentTimeMillis2 - currentTimeMillis;
                        long j2 = currentTimeMillis2 - this.startMillis;
                        log.debug("Size: {} stored: {}, time: {}/{}/{}, speed: {} per/s, last date: {}", new Object[]{Integer.valueOf(size), Long.valueOf(this.storedCount.addAndGet(size)), Duration.ofMillis(j), Duration.ofMillis(this.savingTotalMillis.addAndGet(j)), Duration.ofMillis(j2), Double.valueOf((this.storedCount.get() * 1000.0d) / j2), DateFormatUtils.ISO_DATETIME_FORMAT.format(((DeviceFactorData) list.get(list.size() - 1)).getAcquisitionDatetime())});
                    } catch (Exception e) {
                        log.error("Transfer batch saving error", e);
                        Iterator it = list.iterator();
                        while (it.hasNext()) {
                            this.processor.onNext((DeviceFactorData) it.next());
                        }
                        this.threadSemaphore.release();
                    }
                } catch (Throwable th) {
                    this.threadSemaphore.release();
                    throw th;
                }
            });
            this.totalCount += size;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static {
        NUMBER_FORMAT.setMaximumFractionDigits(2);
        NUMBER_FORMAT.setGroupingUsed(false);
    }
}
