package com.vortex.das.mqtt.core;

import com.google.common.base.Charsets;
import com.google.common.base.Strings;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.lmax.disruptor.EventHandler;
import com.vortex.das.DasConfig;
import com.vortex.das.NettyUtil;
import com.vortex.das.bean.ChannelCache;
import com.vortex.das.mqtt.MqttCacheKeys;
import com.vortex.das.mqtt.authenticator.IMqttAuthenticator;
import com.vortex.das.mqtt.bean.MqttMsgSender;
import com.vortex.das.mqtt.bean.MqttTopicCache;
import com.vortex.das.mqtt.event.MqttConnectionLostEvent;
import com.vortex.das.mqtt.event.MqttEvent;
import com.vortex.das.mqtt.event.MqttProtocolEvent;
import com.vortex.das.mqtt.protocol.message.AbstractMessage;
import com.vortex.das.mqtt.protocol.message.ConnAckMessage;
import com.vortex.das.mqtt.protocol.message.ConnectMessage;
import com.vortex.das.mqtt.protocol.message.DisconnectMessage;
import com.vortex.das.mqtt.protocol.message.PingReqMessage;
import com.vortex.das.mqtt.protocol.message.PingRespMessage;
import com.vortex.das.mqtt.protocol.message.PubAckMessage;
import com.vortex.das.mqtt.protocol.message.PubCompMessage;
import com.vortex.das.mqtt.protocol.message.PubRecMessage;
import com.vortex.das.mqtt.protocol.message.PubRelMessage;
import com.vortex.das.mqtt.protocol.message.PublishMessage;
import com.vortex.das.mqtt.protocol.message.SubAckMessage;
import com.vortex.das.mqtt.protocol.message.SubscribeMessage;
import com.vortex.das.mqtt.protocol.message.UnsubAckMessage;
import com.vortex.das.mqtt.protocol.message.UnsubscribeMessage;
import com.vortex.das.mqtt.protocol.message.WillMessage;
import com.vortex.das.mqtt.protocol.subscriptions.Subscription;
import com.vortex.util.disruptor.IMessaging;
import com.vortex.util.disruptor.LmaxDiscuptorMessaging;
import com.vortex.util.disruptor.ValueEvent;
import io.netty.channel.Channel;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/vortex/das/mqtt/core/MqttEventProcessor.class */
public class MqttEventProcessor implements ApplicationListener<MqttEvent>, EventHandler<ValueEvent> {
    public static final Logger LOG = LoggerFactory.getLogger(MqttEventProcessor.class);

    @Autowired
    DasConfig dasConfig;

    @Autowired
    private IMqttAuthenticator authenticator;

    @Autowired
    MqttMsgSender mqttMsgSender;

    @Autowired
    ClusterMsgProcessor clusterMsgProcessor;

    @Autowired
    ChannelCache channelCache;

    @Autowired
    MqttTopicCache topicSubscriptionCache;
    private Cache<String, WillMessage> willMessageCache = CacheBuilder.newBuilder().maximumSize(1000000).concurrencyLevel(16).build();
    private IMessaging messaging;

    @PostConstruct
    private void init() {
        this.messaging = new LmaxDiscuptorMessaging(new EventHandler[]{this});
    }

    public void onApplicationEvent(MqttEvent mqttEvent) {
        this.messaging.publish(mqttEvent);
    }

    public void onEvent(ValueEvent valueEvent, long j, boolean z) throws Exception {
        Object value = valueEvent.getValue();
        if (value instanceof MqttConnectionLostEvent) {
            MqttConnectionLostEvent mqttConnectionLostEvent = (MqttConnectionLostEvent) value;
            try {
                processConnectionLost(mqttConnectionLostEvent.getChannel(), mqttConnectionLostEvent.getClientId());
                return;
            } catch (Exception e) {
                LOG.error("exception in processConnectionLost for cause: {}", e.getCause(), e);
                return;
            }
        }
        if (value instanceof MqttProtocolEvent) {
            MqttProtocolEvent mqttProtocolEvent = (MqttProtocolEvent) value;
            Channel channel = mqttProtocolEvent.getChannel();
            AbstractMessage message = mqttProtocolEvent.getMessage();
            try {
                dispatchMessage(channel, message);
            } catch (Exception e2) {
                LOG.error("exception in dispathMessage for messageType:{} and cause: {}", new Object[]{Byte.valueOf(message.getMessageType()), e2.getCause(), e2});
            }
        }
    }

    private void dispatchMessage(Channel channel, AbstractMessage abstractMessage) {
        switch (abstractMessage.getMessageType()) {
            case 1:
                processConnectMessage(channel, (ConnectMessage) abstractMessage);
                return;
            case 2:
            case AbstractMessage.SUBACK /* 9 */:
            case AbstractMessage.UNSUBACK /* 11 */:
            case AbstractMessage.PINGRESP /* 13 */:
            default:
                return;
            case 3:
                processPublishMessage(channel, (PublishMessage) abstractMessage);
                return;
            case 4:
                processPubAckMessage(channel, (PubAckMessage) abstractMessage);
                return;
            case 5:
                processPubRecMessage(channel, (PubRecMessage) abstractMessage);
                return;
            case AbstractMessage.PUBREL /* 6 */:
                processPubRelMessage(channel, (PubRelMessage) abstractMessage);
                return;
            case AbstractMessage.PUBCOMP /* 7 */:
                processPubCompMessage(channel, (PubCompMessage) abstractMessage);
                return;
            case AbstractMessage.SUBSCRIBE /* 8 */:
                processSubscribeMessage(channel, (SubscribeMessage) abstractMessage);
                return;
            case AbstractMessage.UNSUBSCRIBE /* 10 */:
                processUnsubscribeMessage(channel, (UnsubscribeMessage) abstractMessage);
                return;
            case AbstractMessage.PINGREQ /* 12 */:
                processPingReqMessage(channel, (PingReqMessage) abstractMessage);
                return;
            case AbstractMessage.DISCONNECT /* 14 */:
                processDisconnectMessage(channel, (DisconnectMessage) abstractMessage);
                return;
        }
    }

    private void processPublishMessage(Channel channel, PublishMessage publishMessage) {
        String clientId = NettyUtil.getClientId(channel);
        if (Strings.isNullOrEmpty(clientId)) {
            NettyUtil.closeChannel(channel, true);
            LOG.warn("receive publish message before connect message for channel:{}", channel);
            return;
        }
        LOG.debug("process publish message: {} for clientId: {}", publishMessage, clientId);
        this.mqttMsgSender.send(publishMessage);
        this.clusterMsgProcessor.processMsg(publishMessage);
        AbstractMessage.QOSType qos = publishMessage.getQos();
        if (AbstractMessage.QOSType.MOST_ONE == qos) {
            return;
        }
        if (AbstractMessage.QOSType.LEAST_ONE != qos) {
            if (AbstractMessage.QOSType.EXACTLY_ONCE == qos) {
            }
            return;
        }
        PubAckMessage pubAckMessage = new PubAckMessage();
        pubAckMessage.setMessageId(publishMessage.getMessageId());
        NettyUtil.writeData(channel, pubAckMessage);
    }

    private void processConnectionLost(Channel channel, String str) {
        Channel channel2 = this.channelCache.get(str);
        if (channel2 == null) {
            LOG.warn("lost connection with clientId: {} which channel not exist in channelCache", str);
            return;
        }
        if (!channel2.equals(channel)) {
            LOG.debug("lost connection with clientId: {} which has already reconnected", str);
            return;
        }
        this.channelCache.remove(str);
        if (NettyUtil.isCleanSession(channel)) {
            cleanSession(str);
        }
        WillMessage willMessage = (WillMessage) this.willMessageCache.getIfPresent(str);
        if (willMessage != null) {
            this.mqttMsgSender.send(willMessage);
            this.willMessageCache.invalidate(str);
            LOG.debug("process will message: {} for clientId: {}", willMessage, str);
        }
        LOG.info("lost connection with clientId: {}", str);
    }

    private void processPingReqMessage(Channel channel, PingReqMessage pingReqMessage) {
        channel.writeAndFlush(new PingRespMessage());
    }

    private void processConnectMessage(Channel channel, ConnectMessage connectMessage) {
        String clientId = connectMessage.getClientId();
        if (checkConnectValid(channel, connectMessage)) {
            Channel channel2 = this.channelCache.get(clientId);
            this.channelCache.put(clientId, channel);
            if (channel2 != null) {
                NettyUtil.closeChannel(channel2, true);
                LOG.info("close an existing connection with same clientId:{}", clientId);
            }
            if (connectMessage.isWillFlag()) {
                this.willMessageCache.put(clientId, new WillMessage(connectMessage.getWillTopic(), ByteBuffer.wrap(connectMessage.getWillMessage().getBytes(Charsets.UTF_8)), connectMessage.isWillRetain(), AbstractMessage.QOSType.values()[connectMessage.getWillQos()]));
                LOG.debug("save will message for clientId: {}", clientId);
            }
            boolean isCleanSession = connectMessage.isCleanSession();
            if (isCleanSession) {
                cleanSession(clientId);
            }
            int keepAlive = connectMessage.getKeepAlive();
            NettyUtil.setChannelIdleTime(channel, Math.round(keepAlive * 2.5f));
            NettyUtil.setKeepAlive(channel, keepAlive);
            NettyUtil.setCleanSession(channel, isCleanSession);
            NettyUtil.setClientId(channel, clientId);
            ConnAckMessage connAckMessage = new ConnAckMessage();
            connAckMessage.setReturnCode((byte) 0);
            NettyUtil.writeData(channel, connAckMessage);
            LOG.info("channel connected for clientId:{} with protocolVersion:{} and keepAlive:{} and cleanSession:{}", new Object[]{clientId, Byte.valueOf(connectMessage.getProcotolVersion()), Integer.valueOf(keepAlive), Boolean.valueOf(isCleanSession)});
        }
    }

    private void processDisconnectMessage(Channel channel, DisconnectMessage disconnectMessage) {
        String clientId = NettyUtil.getClientId(channel);
        if (Strings.isNullOrEmpty(clientId)) {
            NettyUtil.closeChannel(channel, true);
            LOG.warn("receive disconnect message before connect message for channel:{}", channel);
        } else {
            this.willMessageCache.invalidate(clientId);
            NettyUtil.closeChannel(channel, true);
            LOG.info("process disconnect message for client: {} ", clientId);
        }
    }

    private void processSubscribeMessage(Channel channel, SubscribeMessage subscribeMessage) {
        String clientId = NettyUtil.getClientId(channel);
        if (Strings.isNullOrEmpty(clientId)) {
            NettyUtil.closeChannel(channel, true);
            LOG.warn("receive subscribe message before connect message for channel:{}", channel);
            return;
        }
        boolean isCleanSession = NettyUtil.isCleanSession(channel);
        ArrayList newArrayList = Lists.newArrayList();
        for (SubscribeMessage.Couple couple : subscribeMessage.subscriptions()) {
            newArrayList.add(new Subscription(clientId, couple.getTopicFilter(), AbstractMessage.QOSType.values()[couple.getQos()], isCleanSession));
        }
        addSubscriptionsToLocalAndCcs(newArrayList);
        SubAckMessage subAckMessage = new SubAckMessage();
        subAckMessage.setMessageId(subscribeMessage.getMessageId());
        Iterator<SubscribeMessage.Couple> it = subscribeMessage.subscriptions().iterator();
        while (it.hasNext()) {
            subAckMessage.addType(AbstractMessage.QOSType.values()[it.next().getQos()]);
        }
        NettyUtil.writeData(channel, subAckMessage);
        LOG.debug("process subscribe message from clientId: {} with messageId: {}", clientId, subscribeMessage.getMessageId());
    }

    private void processUnsubscribeMessage(Channel channel, UnsubscribeMessage unsubscribeMessage) {
        String clientId = NettyUtil.getClientId(channel);
        if (Strings.isNullOrEmpty(clientId)) {
            NettyUtil.closeChannel(channel, true);
            LOG.warn("receive unsubscribe message before connect message for channel:{}", channel);
            return;
        }
        removeSubscriptionsFromLocalAndCcs(clientId, unsubscribeMessage.topicFilters());
        UnsubAckMessage unsubAckMessage = new UnsubAckMessage();
        unsubAckMessage.setMessageId(unsubscribeMessage.getMessageId());
        NettyUtil.writeData(channel, unsubAckMessage);
        LOG.debug("process unsubscribe message removing subscription on topics: {}, for clientId: {}", unsubscribeMessage.topicFilters(), clientId);
    }

    private void processPubAckMessage(Channel channel, PubAckMessage pubAckMessage) {
    }

    private void processPubRelMessage(Channel channel, PubRelMessage pubRelMessage) {
    }

    private void processPubCompMessage(Channel channel, PubCompMessage pubCompMessage) {
    }

    private void processPubRecMessage(Channel channel, PubRecMessage pubRecMessage) {
        String clientId = NettyUtil.getClientId(channel);
        if (Strings.isNullOrEmpty(clientId)) {
            NettyUtil.closeChannel(channel, true);
            LOG.warn("receive pubrec message before connect message for channel:{}", channel);
            return;
        }
        int intValue = pubRecMessage.getMessageId().intValue();
        PubRelMessage pubRelMessage = new PubRelMessage();
        pubRelMessage.setMessageId(Integer.valueOf(intValue));
        pubRelMessage.setQos(AbstractMessage.QOSType.LEAST_ONE);
        NettyUtil.writeData(channel, pubRelMessage);
        LOG.debug("send pubrec message for clientId: {} ad messageId: {}", clientId, Integer.valueOf(intValue));
    }

    private synchronized void addSubscriptionsToLocalAndCcs(List<Subscription> list) {
        Iterator<String> it = this.topicSubscriptionCache.addSubscriptions(list).iterator();
        while (it.hasNext()) {
            this.dasConfig.getCcs().putObjectToSet(MqttCacheKeys.getCcsKeyForNodesByTopic(it.next()), String.valueOf(this.dasConfig.getDasNodeId()));
        }
    }

    private synchronized void removeSubscriptionsFromLocalAndCcs(String str, List<String> list) {
        Iterator<String> it = this.topicSubscriptionCache.removeSubscriptions(str, list).iterator();
        while (it.hasNext()) {
            this.dasConfig.getCcs().removeObjectFromSet(MqttCacheKeys.getCcsKeyForNodesByTopic(it.next()), String.valueOf(this.dasConfig.getDasNodeId()));
        }
    }

    private boolean checkConnectValid(Channel channel, ConnectMessage connectMessage) {
        String clientId = connectMessage.getClientId();
        byte procotolVersion = connectMessage.getProcotolVersion();
        if (procotolVersion != 3 && procotolVersion != 4) {
            ConnAckMessage connAckMessage = new ConnAckMessage();
            connAckMessage.setReturnCode((byte) 1);
            NettyUtil.writeDataThenClose(channel, connAckMessage);
            LOG.warn("close channel for connect message with wrong protocol version:{} for clientId:{}", Integer.valueOf(procotolVersion), clientId);
            return false;
        }
        if (Strings.isNullOrEmpty(clientId)) {
            ConnAckMessage connAckMessage2 = new ConnAckMessage();
            connAckMessage2.setReturnCode((byte) 2);
            NettyUtil.writeDataThenClose(channel, connAckMessage2);
            LOG.warn("close channel for connect message with null or empty clientId");
            return false;
        }
        if (!(connectMessage.isUserFlag() || connectMessage.isPasswordFlag()) || this.authenticator.checkValid(connectMessage.getUsername(), connectMessage.getPassword())) {
            return true;
        }
        ConnAckMessage connAckMessage3 = new ConnAckMessage();
        connAckMessage3.setReturnCode((byte) 4);
        NettyUtil.writeDataThenClose(channel, connAckMessage3);
        LOG.warn("close channel for connect message with wrong userName:{} or password:{}", connectMessage.getUsername(), connectMessage.getPassword());
        return false;
    }

    private synchronized void cleanSession(String str) {
        Iterator<String> it = this.topicSubscriptionCache.removeForClientId(str).iterator();
        while (it.hasNext()) {
            this.dasConfig.getCcs().removeObjectFromSet(MqttCacheKeys.getCcsKeyForNodesByTopic(it.next()), String.valueOf(this.dasConfig.getDasNodeId()));
        }
        LOG.debug("clean saved subscriptions for clientId:{}", str);
    }
}
