package org.apache.skywalking.apm.commons.datacarrier.consumer;

import java.util.ArrayList;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;

/* loaded from: input_file:org/apache/skywalking/apm/commons/datacarrier/consumer/ConsumerPool.class */
public class ConsumerPool<T> {
    private boolean running;
    private ConsumerThread[] consumerThreads;
    private Channels<T> channels;
    private ReentrantLock lock;

    public ConsumerPool(String str, Channels<T> channels, Class<? extends IConsumer<T>> cls, int i, long j) {
        this(channels, i);
        for (int i2 = 0; i2 < i; i2++) {
            this.consumerThreads[i2] = new ConsumerThread("DataCarrier." + str + ".Consumser." + i2 + ".Thread", getNewConsumerInstance(cls), j);
            this.consumerThreads[i2].setDaemon(true);
        }
    }

    public ConsumerPool(String str, Channels<T> channels, IConsumer<T> iConsumer, int i, long j) {
        this(channels, i);
        iConsumer.init();
        for (int i2 = 0; i2 < i; i2++) {
            this.consumerThreads[i2] = new ConsumerThread("DataCarrier." + str + ".Consumser." + i2 + ".Thread", iConsumer, j);
            this.consumerThreads[i2].setDaemon(true);
        }
    }

    private ConsumerPool(Channels<T> channels, int i) {
        this.running = false;
        this.channels = channels;
        this.consumerThreads = new ConsumerThread[i];
        this.lock = new ReentrantLock();
    }

    private IConsumer<T> getNewConsumerInstance(Class<? extends IConsumer<T>> cls) {
        try {
            IConsumer<T> newInstance = cls.newInstance();
            newInstance.init();
            return newInstance;
        } catch (IllegalAccessException e) {
            throw new ConsumerCannotBeCreatedException(e);
        } catch (InstantiationException e2) {
            throw new ConsumerCannotBeCreatedException(e2);
        }
    }

    public void begin() {
        if (this.running) {
            return;
        }
        try {
            this.lock.lock();
            allocateBuffer2Thread();
            for (ConsumerThread consumerThread : this.consumerThreads) {
                consumerThread.start();
            }
            this.running = true;
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    private void allocateBuffer2Thread() {
        int channelSize = this.channels.getChannelSize();
        if (channelSize >= this.consumerThreads.length) {
            for (int i = 0; i < channelSize; i++) {
                this.consumerThreads[i % this.consumerThreads.length].addDataSource(this.channels.getBuffer(i));
            }
            return;
        }
        ArrayList[] arrayListArr = new ArrayList[channelSize];
        for (int i2 = 0; i2 < this.consumerThreads.length; i2++) {
            int i3 = i2 % channelSize;
            if (arrayListArr[i3] == null) {
                arrayListArr[i3] = new ArrayList();
            }
            arrayListArr[i3].add(Integer.valueOf(i2));
        }
        for (int i4 = 0; i4 < channelSize; i4++) {
            ArrayList arrayList = arrayListArr[i4];
            Buffer<T> buffer = this.channels.getBuffer(i4);
            int bufferSize = buffer.getBufferSize();
            int size = bufferSize / arrayList.size();
            int i5 = 0;
            while (i5 < arrayList.size()) {
                this.consumerThreads[((Integer) arrayList.get(i5)).intValue()].addDataSource(buffer, i5 * size, i5 == arrayList.size() - 1 ? bufferSize : (i5 + 1) * size);
                i5++;
            }
        }
    }

    public void close() {
        try {
            this.lock.lock();
            this.running = false;
            for (ConsumerThread consumerThread : this.consumerThreads) {
                consumerThread.shutdown();
            }
        } finally {
            this.lock.unlock();
        }
    }
}
