package com.vortex.sds.util;

import com.alibaba.fastjson.JSON;
import com.google.common.base.Strings;
import com.vortex.sds.api.dto.DeviceFactorsData;
import com.vortex.sds.dto.DeviceFactorKey;
import com.vortex.sds.filtering.FilterChecker;
import com.vortex.sds.listener.DeviceFactorsEvent;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.channel.file.FileChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.SmartLifecycle;

/* loaded from: input_file:com/vortex/sds/util/FlumeFileChannelWorker.class */
public class FlumeFileChannelWorker implements SmartLifecycle, Runnable, ApplicationEventPublisherAware {
    private final int dataDirCount;
    private FileChannel channel;
    private File baseDir;
    private File checkpointDir;
    private File[] dataDirs;
    private String dataDir;
    private File backupDir;
    private File uncompressedBackupCheckpoint;
    private File compressedBackupCheckpoint;
    private boolean enable;
    private long tlogLinger;
    private int bufferSize;
    private ApplicationEventPublisher eventPublisher;
    private static final Logger logger = LoggerFactory.getLogger(FlumeFileChannelWorker.class);
    private int emptyMaxAttempts = 20;
    private AtomicBoolean running = new AtomicBoolean(false);
    private Lock lock = new ReentrantLock();

    public FlumeFileChannelWorker(boolean z, int i, String str, long j, int i2) {
        this.enable = z;
        if (!z) {
            logger.warn("Flume tlog disabled, NO DATA will be stored in database, such situation mainly used in dump data");
        }
        this.dataDirCount = i <= 0 ? 3 : i;
        this.tlogLinger = j;
        this.bufferSize = i2;
        setup(str);
    }

    private void setup(String str) {
        if (Strings.isNullOrEmpty(str)) {
            this.baseDir = new File(new File(System.getProperty("java.io.tmpdir")), "sdslog");
        } else {
            this.baseDir = new File(str);
        }
        if (this.baseDir.mkdir()) {
            logger.info("No tlog home directory exists, create a new directory");
        }
        this.checkpointDir = new File(this.baseDir, "chkpt");
        this.backupDir = new File(this.baseDir, "backup");
        this.uncompressedBackupCheckpoint = new File(this.backupDir, "checkpoint");
        this.compressedBackupCheckpoint = new File(this.backupDir, "checkpoint.snappy");
        Assert.assertTrue(this.checkpointDir.mkdirs() || this.checkpointDir.isDirectory());
        Assert.assertTrue(this.backupDir.mkdirs() || this.backupDir.isDirectory());
        this.dataDirs = new File[this.dataDirCount];
        this.dataDir = "";
        for (int i = 0; i < this.dataDirs.length; i++) {
            this.dataDirs[i] = new File(this.baseDir, DeviceFactorKey.DATA_TYPE + (i + 1));
            Assert.assertTrue(this.dataDirs[i].mkdirs() || this.dataDirs[i].isDirectory());
            this.dataDir += this.dataDirs[i].getAbsolutePath() + FilterChecker.SPLIT;
        }
        this.dataDir = this.dataDir.substring(0, this.dataDir.length() - 1);
        this.channel = createFileChannel();
    }

    private Context createContext() {
        Context context = new Context();
        context.put("checkpointDir", this.checkpointDir.getAbsolutePath());
        if (this.backupDir != null) {
            context.put("backupCheckpointDir", this.backupDir.getAbsolutePath());
        }
        context.put("dataDirs", this.dataDir);
        context.put("keep-alive", String.valueOf(1));
        context.put("capacity", String.valueOf(10000));
        return context;
    }

    private FileChannel createFileChannel() {
        FileChannel fileChannel = new FileChannel();
        fileChannel.setName("FileChannel-kafka");
        Configurables.configure(fileChannel, createContext());
        return fileChannel;
    }

    public void put(List<DeviceFactorsData> list) {
        if (this.enable) {
            FileChannel fileChannel = this.channel;
            if (!isRunning()) {
                logger.warn("Flume work is not running");
            }
            if (!fileChannel.isOpen()) {
                logger.warn("Channel is not open");
            }
            Transaction transaction = fileChannel.getTransaction();
            try {
                try {
                    transaction.begin();
                    Iterator<DeviceFactorsData> it = list.iterator();
                    while (it.hasNext()) {
                        fileChannel.put(EventBuilder.withBody(JSON.toJSONString(it.next()), StandardCharsets.UTF_8));
                    }
                    transaction.commit();
                    transaction.close();
                } catch (Exception e) {
                    logger.error("put data error", e);
                    transaction.rollback();
                    transaction.close();
                }
            } catch (Throwable th) {
                transaction.close();
                throw th;
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        FileChannel fileChannel = this.channel;
        try {
            this.lock.lock();
            if (!isRunning()) {
                logger.warn("Flume worker not running");
                return;
            }
            while (isRunning()) {
                Transaction transaction = fileChannel.getTransaction();
                try {
                    transaction.begin();
                    long j = this.tlogLinger > 0 ? this.tlogLinger : 1000L;
                    int i = this.bufferSize > 0 ? this.bufferSize : 60;
                    ArrayList arrayList = null;
                    long currentTimeMillis = System.currentTimeMillis();
                    int i2 = 0;
                    while (true) {
                        Event take = fileChannel.take();
                        if (take != null) {
                            if (arrayList == null) {
                                arrayList = new ArrayList(i);
                            }
                            arrayList.add(extractData(take));
                        }
                        if (take == null && arrayList == null && this.tlogLinger > 0) {
                            try {
                                TimeUnit.MILLISECONDS.sleep(this.tlogLinger);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        if (take == null) {
                            int i3 = i2;
                            i2++;
                            if (i3 > this.emptyMaxAttempts) {
                                logger.trace("Null event reached max attempts");
                                break;
                            }
                        }
                        if (!((currentTimeMillis + j) - System.currentTimeMillis() < 0)) {
                            if (arrayList != null && arrayList.size() >= i) {
                                logger.trace("Buffer size reached for take event");
                                break;
                            }
                        } else {
                            logger.trace("Time out for take event");
                            break;
                        }
                    }
                    if (arrayList != null) {
                        this.eventPublisher.publishEvent(new DeviceFactorsEvent(arrayList));
                    }
                    transaction.commit();
                    transaction.close();
                } catch (Throwable th) {
                    transaction.close();
                    throw th;
                }
            }
        } finally {
            this.lock.unlock();
        }
    }

    private static DeviceFactorsData extractData(Event event) {
        return (DeviceFactorsData) JSON.parseObject(new String(event.getBody(), StandardCharsets.UTF_8), DeviceFactorsData.class);
    }

    public boolean isAutoStartup() {
        return true;
    }

    public void stop(Runnable runnable) {
        stop();
        if (runnable != null) {
            runnable.run();
        }
    }

    public void start() {
        if (isRunning()) {
            logger.info("Flume worker already running");
            return;
        }
        try {
            this.lock.lock();
            FileChannel fileChannel = this.channel;
            if (fileChannel == null) {
                throw new IllegalStateException("No file channel found");
            }
            this.running.set(true);
            fileChannel.start();
            new Thread(this, "flume-inner").start();
            logger.info("Flume worker started");
        } finally {
            this.lock.unlock();
        }
    }

    public void stop() {
        try {
            this.lock.lock();
            this.running.set(false);
            FileChannel fileChannel = this.channel;
            if (fileChannel != null) {
                fileChannel.stop();
                logger.info("Flume File Channel stopped");
            }
        } finally {
            this.lock.unlock();
        }
    }

    public boolean isRunning() {
        return this.running.get();
    }

    public int getPhase() {
        return Integer.MAX_VALUE;
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.eventPublisher = applicationEventPublisher;
    }
}
