package com.vortex.sds.service.impl;

import com.alibaba.dubbo.common.utils.NamedThreadFactory;
import com.vortex.sds.dao.mongo.IDeviceFactorDataRepository;
import com.vortex.sds.model.mongo.DeviceDataModel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/vortex/sds/service/impl/SimpleDataWorker.class */
public class SimpleDataWorker implements Runnable {
    private static final int DEFAULT_QUEUE_SIZE = 10000;
    private static final int DEFAULT_BATCH_SIZE = 300;
    private static final int DEFAULT_LINER_MS = 1000;
    private int queueSize = DEFAULT_QUEUE_SIZE;
    private int batchSize = DEFAULT_BATCH_SIZE;
    private int linerMs = DEFAULT_LINER_MS;
    private int workerSize = DEFAULT_WORKER_SIZE;
    private BlockingQueue<List<DeviceDataModel>> blockingQueue;
    private ExecutorService bossExecutor;
    private ExecutorService workerExecutor;

    @Autowired
    private IDeviceFactorDataRepository deviceFactorDataRepository;
    private static final Logger logger = LoggerFactory.getLogger(SimpleDataWorker.class);
    private static final int DEFAULT_WORKER_SIZE = Runtime.getRuntime().availableProcessors() * 2;

    public void start() {
        this.blockingQueue = new LinkedBlockingQueue(this.queueSize);
        this.bossExecutor = Executors.newSingleThreadExecutor(new NamedThreadFactory("simpleDataThread"));
        this.workerExecutor = new ThreadPoolExecutor(this.workerSize, this.workerSize, 120L, TimeUnit.SECONDS, new SynchronousQueue(), new NamedThreadFactory("simpleDataWorker"), new RejectedExecutionHandler() { // from class: com.vortex.sds.service.impl.SimpleDataWorker.1
            @Override // java.util.concurrent.RejectedExecutionHandler
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                try {
                    threadPoolExecutor.getQueue().put(runnable);
                } catch (InterruptedException e) {
                    SimpleDataWorker.logger.error("等待执行异常", e);
                }
            }
        });
        this.bossExecutor.execute(this);
    }

    @PreDestroy
    private void onDestroy() {
        this.bossExecutor.shutdown();
        this.bossExecutor = null;
    }

    public void addToQueue(List<DeviceDataModel> list) {
        this.blockingQueue.add(list);
    }

    @PostConstruct
    private void init() {
    }

    @Override // java.lang.Runnable
    public void run() {
        ArrayList arrayList = new ArrayList(this.batchSize);
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                if (this.blockingQueue.size() > DEFAULT_LINER_MS) {
                    logger.info("simple data worker queue size:{}", Integer.valueOf(this.blockingQueue.size()));
                }
                List<DeviceDataModel> poll = this.blockingQueue.poll(3L, TimeUnit.SECONDS);
                if (poll != null) {
                    arrayList.addAll(poll);
                }
                if (!arrayList.isEmpty() && (arrayList.size() >= DEFAULT_BATCH_SIZE || System.currentTimeMillis() - currentTimeMillis >= 1000)) {
                    currentTimeMillis = System.currentTimeMillis();
                    final ArrayList arrayList2 = arrayList;
                    arrayList = new ArrayList(this.batchSize);
                    this.workerExecutor.execute(new Runnable() { // from class: com.vortex.sds.service.impl.SimpleDataWorker.2
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                SimpleDataWorker.this.deviceFactorDataRepository.saveFactorData(arrayList2);
                            } catch (Exception e) {
                                SimpleDataWorker.logger.info("保存数据出错", e);
                            }
                        }
                    });
                }
            } catch (InterruptedException e) {
                logger.error("save device factor data error", e);
            } catch (Exception e2) {
                logger.error("save device factor data error", e2);
            }
        }
    }

    public int getQueueSize() {
        return this.queueSize;
    }

    public void setQueueSize(int i) {
        this.queueSize = i;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public void setBatchSize(int i) {
        this.batchSize = i;
    }

    public int getLinerMs() {
        return this.linerMs;
    }

    public void setLinerMs(int i) {
        this.linerMs = i;
    }

    public int getWorkerSize() {
        return this.workerSize;
    }

    public void setWorkerSize(int i) {
        this.workerSize = i;
    }
}
