package com.vortex.sds.util;

import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.vortex.sds.constant.SdsTopic;
import com.vortex.sds.dto.DeviceFactorData;
import com.vortex.sds.dto.DeviceFactorsData;
import com.vortex.sds.dto.HistoryDeviceFactorData;
import com.vortex.util.kafka.msg.KafkaMsg;
import com.vortex.util.kafka.producer.SimpleProcuder;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.channel.file.FileChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;

/* loaded from: input_file:com/vortex/sds/util/FlumeFileChannelWorker.class */
public class FlumeFileChannelWorker implements SmartLifecycle, Runnable {
    private final int dataDirCount;
    private FileChannel channel;
    private File baseDir;
    private File checkpointDir;
    private File[] dataDirs;
    private String dataDir;
    private File backupDir;
    private File uncompressedBackupCheckpoint;
    private File compressedBackupCheckpoint;
    private SimpleProcuder realTimeProducer;
    private SimpleProcuder historyProducer;
    private long tlogLinger;
    private static final Logger logger = LoggerFactory.getLogger(FlumeFileChannelWorker.class);
    private final ExecutorService executorService = Executors.newFixedThreadPool(20);
    private final Gson gson = new GsonBuilder().create();

    public FlumeFileChannelWorker(int i, String str, long j, SimpleProcuder simpleProcuder, SimpleProcuder simpleProcuder2) {
        this.dataDirCount = i <= 0 ? 3 : i;
        this.realTimeProducer = simpleProcuder;
        this.historyProducer = simpleProcuder2;
        this.tlogLinger = j;
        setup(str);
    }

    private void setup(String str) {
        if (Strings.isNullOrEmpty(str)) {
            this.baseDir = new File(new File(System.getProperty("java.io.tmpdir")), "sdslog");
        } else {
            this.baseDir = new File(str);
        }
        if (this.baseDir.mkdir()) {
            logger.info("No tlog home directory exists, create a new directory");
        }
        this.checkpointDir = new File(this.baseDir, "chkpt");
        this.backupDir = new File(this.baseDir, "backup");
        this.uncompressedBackupCheckpoint = new File(this.backupDir, "checkpoint");
        this.compressedBackupCheckpoint = new File(this.backupDir, "checkpoint.snappy");
        Assert.assertTrue(this.checkpointDir.mkdirs() || this.checkpointDir.isDirectory());
        Assert.assertTrue(this.backupDir.mkdirs() || this.backupDir.isDirectory());
        this.dataDirs = new File[this.dataDirCount];
        this.dataDir = "";
        for (int i = 0; i < this.dataDirs.length; i++) {
            this.dataDirs[i] = new File(this.baseDir, "data" + (i + 1));
            Assert.assertTrue(this.dataDirs[i].mkdirs() || this.dataDirs[i].isDirectory());
            this.dataDir += this.dataDirs[i].getAbsolutePath() + ",";
        }
        this.dataDir = this.dataDir.substring(0, this.dataDir.length() - 1);
        this.channel = createFileChannel();
    }

    protected Context createContext() {
        Context context = new Context();
        context.put("checkpointDir", this.checkpointDir.getAbsolutePath());
        if (this.backupDir != null) {
            context.put("backupCheckpointDir", this.backupDir.getAbsolutePath());
        }
        context.put("dataDirs", this.dataDir);
        context.put("keep-alive", String.valueOf(1));
        context.put("capacity", String.valueOf(10000));
        return context;
    }

    protected FileChannel createFileChannel() {
        FileChannel fileChannel = new FileChannel();
        fileChannel.setName("FileChannel-kafka");
        Configurables.configure(fileChannel, createContext());
        return fileChannel;
    }

    public void put(List<DeviceFactorsData> list) {
        FileChannel fileChannel = this.channel;
        Transaction transaction = fileChannel.getTransaction();
        transaction.begin();
        list.forEach(deviceFactorsData -> {
            fileChannel.put(EventBuilder.withBody(this.gson.toJson(deviceFactorsData), StandardCharsets.UTF_8));
        });
        transaction.commit();
        transaction.close();
    }

    @Override // java.lang.Runnable
    public void run() {
        FileChannel fileChannel = this.channel;
        while (true) {
            Transaction transaction = fileChannel.getTransaction();
            transaction.begin();
            Event take = fileChannel.take();
            if (take != null) {
                String str = new String(take.getBody(), StandardCharsets.UTF_8);
                DeviceFactorsData deviceFactorsData = (DeviceFactorsData) this.gson.fromJson(str, DeviceFactorsData.class);
                this.executorService.execute(() -> {
                    try {
                        logger.trace("Get metadata {}. ", ((RecordMetadata) this.realTimeProducer.send(KafkaMsg.buildMsg("SDS_REAL_TIME_DATA", deviceFactorsData.getDeviceCode(), deviceFactorsData)).get()).toString());
                        for (DeviceFactorData deviceFactorData : deviceFactorsData.getDeviceFactorDataList()) {
                            try {
                                Long acquisitionDatetime = deviceFactorData.getAcquisitionDatetime();
                                if (acquisitionDatetime != null && !SdsTopic.isToday(acquisitionDatetime) && acquisitionDatetime.longValue() < System.currentTimeMillis()) {
                                    this.historyProducer.send(KafkaMsg.buildMsg("SDS_HISTORY_TIME_DATA", deviceFactorData.getDeviceId(), new HistoryDeviceFactorData(deviceFactorData.getDeviceId(), acquisitionDatetime.longValue())));
                                    logger.info("history data, device id [{}], date [{}] ", deviceFactorData.getDeviceId(), acquisitionDatetime);
                                }
                            } catch (Exception e) {
                                logger.error("fail to send history data to kafka, device id: " + deviceFactorData.getDeviceId(), e);
                            }
                        }
                    } catch (Exception e2) {
                        logger.error("fail to save real time data to kafka, data: " + str, e2);
                    }
                });
            } else {
                logger.trace("No event found currently, sleep awhile");
                try {
                    if (this.tlogLinger > 0) {
                        TimeUnit.MILLISECONDS.sleep(this.tlogLinger);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            transaction.commit();
            transaction.close();
        }
    }

    public boolean isAutoStartup() {
        return false;
    }

    public void stop(Runnable runnable) {
        stop();
    }

    public void start() {
        FileChannel fileChannel = this.channel;
        if (fileChannel == null) {
            throw new IllegalStateException("No file channel found");
        }
        fileChannel.start();
        new Thread(this, FlumeFileChannelWorker.class.getSimpleName()).start();
        logger.info("Flume worker started");
    }

    public void stop() {
        FileChannel fileChannel = this.channel;
        if (fileChannel != null) {
            fileChannel.stop();
            logger.info("Flume File Channel stopped");
        }
    }

    public boolean isRunning() {
        if (this.channel == null) {
            return false;
        }
        return this.channel.isOpen();
    }

    public int getPhase() {
        return Integer.MAX_VALUE;
    }
}
