package org.wso2.extension.siddhi.io.mqtt.source;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;

/* loaded from: input_file:org/wso2/extension/siddhi/io/mqtt/source/MqttConsumer.class */
public class MqttConsumer {
    private static final Logger log = Logger.getLogger(MqttConsumer.class);
    public SourceEventListener sourceEventListener;
    private boolean isPaused;
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = this.lock.newCondition();

    /* loaded from: input_file:org/wso2/extension/siddhi/io/mqtt/source/MqttConsumer$MqttSourceCallBack.class */
    public class MqttSourceCallBack implements MqttCallback {
        public MqttSourceCallBack() {
        }

        @Override // org.eclipse.paho.client.mqttv3.MqttCallback
        public void connectionLost(Throwable th) {
            MqttConsumer.log.debug("MQTT connection not reachable");
        }

        @Override // org.eclipse.paho.client.mqttv3.MqttCallback
        public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
            if (MqttConsumer.this.isPaused) {
                MqttConsumer.this.lock.lock();
                while (!MqttConsumer.this.isPaused) {
                    try {
                        try {
                            MqttConsumer.this.condition.await();
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            MqttConsumer.this.lock.unlock();
                        }
                    } finally {
                        MqttConsumer.this.lock.unlock();
                    }
                }
            }
            MqttConsumer.this.sourceEventListener.onEvent(new String(mqttMessage.getPayload(), "UTF-8"), (String[]) null);
        }

        @Override // org.eclipse.paho.client.mqttv3.MqttCallback
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        }
    }

    public MqttConsumer(SourceEventListener sourceEventListener) {
        this.sourceEventListener = sourceEventListener;
    }

    public void subscribe(String str, int i, MqttClient mqttClient) throws MqttException {
        mqttClient.setCallback(new MqttSourceCallBack());
        mqttClient.subscribe(str, i);
    }

    public void pause() {
        this.isPaused = true;
    }

    public void resume() {
        this.isPaused = false;
        try {
            this.lock.lock();
            this.condition.signalAll();
        } finally {
            this.lock.unlock();
        }
    }
}
