package com.vortex.das.mqtt.processor;

import com.google.common.base.Charsets;
import com.google.common.base.Strings;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.lmax.disruptor.EventHandler;
import com.vortex.common.lamx.IMessaging;
import com.vortex.common.lamx.LmaxDiscuptorMessaging;
import com.vortex.das.config.DasConfig;
import com.vortex.das.mqtt.MqttRemoteMessageHandler;
import com.vortex.das.mqtt.auth.IAuthenticator;
import com.vortex.das.mqtt.cache.MqttBrokerChannelCache;
import com.vortex.das.mqtt.cache.MqttTopicSubscriptionCache;
import com.vortex.das.mqtt.constant.MqttConstants;
import com.vortex.das.mqtt.event.LostConnectionEvent;
import com.vortex.das.mqtt.event.ProtocolMessageEvent;
import com.vortex.das.mqtt.message.AbstractMessage;
import com.vortex.das.mqtt.message.ConnAckMessage;
import com.vortex.das.mqtt.message.ConnectMessage;
import com.vortex.das.mqtt.message.DisconnectMessage;
import com.vortex.das.mqtt.message.PubAckMessage;
import com.vortex.das.mqtt.message.PubCompMessage;
import com.vortex.das.mqtt.message.PubRecMessage;
import com.vortex.das.mqtt.message.PubRelMessage;
import com.vortex.das.mqtt.message.PublishMessage;
import com.vortex.das.mqtt.message.SubAckMessage;
import com.vortex.das.mqtt.message.SubscribeMessage;
import com.vortex.das.mqtt.message.UnsubAckMessage;
import com.vortex.das.mqtt.message.UnsubscribeMessage;
import com.vortex.das.mqtt.message.WillMessage;
import com.vortex.das.mqtt.subscriptions.Subscription;
import com.vortex.das.mqtt.util.MqttUtil;
import com.vortex.das.util.CentralCacheKeyUtil;
import io.netty.channel.Channel;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

@Service("mqttProtocolProcessor")
/* loaded from: input_file:com/vortex/das/mqtt/processor/MqttProtocolProcessor.class */
public class MqttProtocolProcessor extends MqttAbstractProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(MqttProtocolProcessor.class);

    @Autowired
    private DasConfig dasConfig;

    @Autowired
    @Qualifier("mqttSimpleAuthenticator")
    private IAuthenticator iAuthenticator;

    @Autowired
    @Qualifier("mqttBrokerChannelCache")
    private MqttBrokerChannelCache serverChannelCache;

    @Autowired
    @Qualifier("mqttTopicSubscriptionCache")
    private MqttTopicSubscriptionCache topicSubscriptionCache;

    @Autowired
    @Qualifier("mqttDeviceProcessor")
    private MqttDeviceProcessor mqttDeviceProcessor;

    @Autowired
    @Qualifier("mqttRemoteMessageHandler")
    private MqttRemoteMessageHandler mqttRemoteMessageHandler;
    private IMessaging messagingService;
    private IMessaging remoteMessagingService;
    private Cache<String, WillMessage> willMessageCache = CacheBuilder.newBuilder().maximumSize(1000000).concurrencyLevel(16).build();

    @PostConstruct
    private void init() {
        this.messagingService = new LmaxDiscuptorMessaging(new EventHandler[]{this.mqttDeviceProcessor});
        this.remoteMessagingService = new LmaxDiscuptorMessaging(new EventHandler[]{this.mqttRemoteMessageHandler});
    }

    @Override // com.vortex.das.mqtt.processor.MqttAbstractProcessor
    public void processConnectionLost(Channel channel, String str) {
        Channel channel2 = this.serverChannelCache.get(str);
        if (channel2 == null) {
            LOG.warn("lost connection with clientId: {} which channel not exist in serverChannelCache", str);
            return;
        }
        if (!channel2.equals(channel)) {
            LOG.debug("lost connection with clientId: {} which has already reconnected", str);
            return;
        }
        this.serverChannelCache.remove(str);
        if (((Boolean) channel.attr(MqttConstants.ATTR_CLEAN_SESSION).get()).booleanValue()) {
            cleanSession(str);
        }
        WillMessage willMessage = (WillMessage) this.willMessageCache.getIfPresent(str);
        if (willMessage != null) {
            dealMessagePublish(willMessage);
            this.willMessageCache.invalidate(str);
            LOG.debug("process will message: {} for clientId: {}", willMessage, str);
        }
        this.messagingService.publish(new LostConnectionEvent(channel, str));
        LOG.info("lost connection with clientId: {}", str);
    }

    @Override // com.vortex.das.mqtt.processor.MqttAbstractProcessor
    public void processConnectMessage(Channel channel, ConnectMessage connectMessage) {
        String clientID = connectMessage.getClientID();
        if (checkConnectValid(channel, connectMessage)) {
            Channel channel2 = this.serverChannelCache.get(clientID);
            this.serverChannelCache.put(clientID, channel);
            if (channel2 != null) {
                MqttUtil.closeChannel(channel2, true);
                LOG.info("close an existing connection with same clientId:{}", clientID);
            }
            if (connectMessage.isWillFlag()) {
                this.willMessageCache.put(clientID, new WillMessage(connectMessage.getWillTopic(), ByteBuffer.wrap(connectMessage.getWillMessage().getBytes(Charsets.UTF_8)), connectMessage.isWillRetain(), AbstractMessage.QOSType.values()[connectMessage.getWillQos()]));
                LOG.debug("save will message for clientId: {}", clientID);
            }
            boolean isCleanSession = connectMessage.isCleanSession();
            if (isCleanSession) {
                cleanSession(clientID);
            }
            int keepAlive = connectMessage.getKeepAlive();
            MqttUtil.setChannelIdleTime(channel, Math.round(keepAlive * 2.5f));
            channel.attr(MqttConstants.ATTR_KEEP_ALIVE).set(Integer.valueOf(keepAlive));
            channel.attr(MqttConstants.ATTR_CLEAN_SESSION).set(Boolean.valueOf(isCleanSession));
            channel.attr(MqttConstants.ATTR_CLIENTID).set(clientID);
            ConnAckMessage connAckMessage = new ConnAckMessage();
            connAckMessage.setReturnCode((byte) 0);
            MqttUtil.writeData(channel, connAckMessage);
            LOG.info("channle connected for clientId:{} with protocolVersion:{} and keepAlive:{} and cleanSession:{}", new Object[]{clientID, Byte.valueOf(connectMessage.getProcotolVersion()), Integer.valueOf(keepAlive), Boolean.valueOf(isCleanSession)});
            this.messagingService.publish(new ProtocolMessageEvent(channel, connectMessage));
        }
    }

    @Override // com.vortex.das.mqtt.processor.MqttAbstractProcessor
    public void processDisconnectMessage(Channel channel, DisconnectMessage disconnectMessage) {
        String str = (String) channel.attr(MqttConstants.ATTR_CLIENTID).get();
        if (Strings.isNullOrEmpty(str)) {
            MqttUtil.closeChannel(channel, true);
            LOG.warn("receive disconnect message before connect message for channel:{}", channel);
        } else {
            this.willMessageCache.invalidate(str);
            MqttUtil.closeChannel(channel, true);
            LOG.info("process disconnect message for client: {} ", str);
            this.messagingService.publish(new ProtocolMessageEvent(channel, disconnectMessage));
        }
    }

    @Override // com.vortex.das.mqtt.processor.MqttAbstractProcessor
    public void processSubscribeMessage(Channel channel, SubscribeMessage subscribeMessage) {
        String str = (String) channel.attr(MqttConstants.ATTR_CLIENTID).get();
        if (Strings.isNullOrEmpty(str)) {
            MqttUtil.closeChannel(channel, true);
            LOG.warn("receive subscribe message before connect message for channel:{}", channel);
            return;
        }
        boolean booleanValue = ((Boolean) channel.attr(MqttConstants.ATTR_CLEAN_SESSION).get()).booleanValue();
        ArrayList newArrayList = Lists.newArrayList();
        for (SubscribeMessage.Couple couple : subscribeMessage.subscriptions()) {
            newArrayList.add(new Subscription(str, couple.getTopicFilter(), AbstractMessage.QOSType.values()[couple.getQos()], booleanValue));
        }
        addSubscriptionsToLocalAndCCS(newArrayList);
        SubAckMessage subAckMessage = new SubAckMessage();
        subAckMessage.setMessageID(subscribeMessage.getMessageID());
        Iterator<SubscribeMessage.Couple> it = subscribeMessage.subscriptions().iterator();
        while (it.hasNext()) {
            subAckMessage.addType(AbstractMessage.QOSType.values()[it.next().getQos()]);
        }
        MqttUtil.writeData(channel, subAckMessage);
        LOG.debug("process subscribe message from clientId: {} with messageId: {}", str, subscribeMessage.getMessageID());
        this.messagingService.publish(new ProtocolMessageEvent(channel, subscribeMessage));
    }

    private synchronized void addSubscriptionsToLocalAndCCS(List<Subscription> list) {
        Iterator<String> it = this.topicSubscriptionCache.addSubscriptions(list).iterator();
        while (it.hasNext()) {
            this.dasConfig.getCcs().putObjectToSet(CentralCacheKeyUtil.getTopicKey(it.next()), String.valueOf(this.dasConfig.getAcsNum()));
        }
    }

    @Override // com.vortex.das.mqtt.processor.MqttAbstractProcessor
    public void processUnsubscribeMessage(Channel channel, UnsubscribeMessage unsubscribeMessage) {
        String str = (String) channel.attr(MqttConstants.ATTR_CLIENTID).get();
        if (Strings.isNullOrEmpty(str)) {
            MqttUtil.closeChannel(channel, true);
            LOG.warn("receive unsubscribe message before connect message for channel:{}", channel);
            return;
        }
        removeSubscriptionsFromLocalAndCCS(str, unsubscribeMessage.topicFilters());
        UnsubAckMessage unsubAckMessage = new UnsubAckMessage();
        unsubAckMessage.setMessageID(unsubscribeMessage.getMessageID());
        MqttUtil.writeData(channel, unsubAckMessage);
        LOG.debug("process unsubscribe message removing subscription on topics: {}, for clientId: {}", unsubscribeMessage.topicFilters(), str);
        this.messagingService.publish(new ProtocolMessageEvent(channel, unsubscribeMessage));
    }

    private synchronized void removeSubscriptionsFromLocalAndCCS(String str, List<String> list) {
        Iterator<String> it = this.topicSubscriptionCache.removeSubscriptions(str, list).iterator();
        while (it.hasNext()) {
            this.dasConfig.getCcs().removeObjectFromSet(CentralCacheKeyUtil.getTopicKey(it.next()), String.valueOf(this.dasConfig.getAcsNum()));
        }
    }

    @Override // com.vortex.das.mqtt.processor.MqttAbstractProcessor
    public void processPublishMessage(Channel channel, PublishMessage publishMessage) {
        String str = (String) channel.attr(MqttConstants.ATTR_CLIENTID).get();
        if (Strings.isNullOrEmpty(str)) {
            MqttUtil.closeChannel(channel, true);
            LOG.warn("receive publish message before connect message for channel:{}", channel);
        } else {
            dealMessagePublish(publishMessage);
            LOG.debug("process publish message: {} for clientId: {}", publishMessage, str);
            this.remoteMessagingService.publish(publishMessage);
            this.messagingService.publish(new ProtocolMessageEvent(channel, publishMessage));
        }
    }

    @Override // com.vortex.das.mqtt.processor.MqttAbstractProcessor
    public void processPubRecMessage(Channel channel, PubRecMessage pubRecMessage) {
        String str = (String) channel.attr(MqttConstants.ATTR_CLIENTID).get();
        if (Strings.isNullOrEmpty(str)) {
            MqttUtil.closeChannel(channel, true);
            LOG.warn("receive pubrec message before connect message for channel:{}", channel);
            return;
        }
        int intValue = pubRecMessage.getMessageID().intValue();
        PubRelMessage pubRelMessage = new PubRelMessage();
        pubRelMessage.setMessageID(Integer.valueOf(intValue));
        pubRelMessage.setQos(AbstractMessage.QOSType.LEAST_ONE);
        MqttUtil.writeData(channel, pubRelMessage);
        LOG.debug("send pubrec message for clientId: {} ad messageId: {}", str, Integer.valueOf(intValue));
        this.messagingService.publish(new ProtocolMessageEvent(channel, pubRecMessage));
    }

    @Override // com.vortex.das.mqtt.processor.MqttAbstractProcessor
    public void processPubAckMessage(Channel channel, PubAckMessage pubAckMessage) {
    }

    @Override // com.vortex.das.mqtt.processor.MqttAbstractProcessor
    public void processPubRelMessage(Channel channel, PubRelMessage pubRelMessage) {
    }

    @Override // com.vortex.das.mqtt.processor.MqttAbstractProcessor
    public void processPubCompMessage(Channel channel, PubCompMessage pubCompMessage) {
    }

    public long getWillMessageCacheSize() {
        return this.willMessageCache.size();
    }

    public long getMessagingRemainBufferSize() {
        return this.messagingService.getRemainBufferSize();
    }

    private boolean checkConnectValid(Channel channel, ConnectMessage connectMessage) {
        String clientID = connectMessage.getClientID();
        byte procotolVersion = connectMessage.getProcotolVersion();
        if (procotolVersion != 3 && procotolVersion != 4) {
            ConnAckMessage connAckMessage = new ConnAckMessage();
            connAckMessage.setReturnCode((byte) 1);
            MqttUtil.writeDataThenClose(channel, connAckMessage);
            LOG.warn("close channel for connect message with wrong protocol version:{} for clientId:{}", Integer.valueOf(procotolVersion), clientID);
            return false;
        }
        if (Strings.isNullOrEmpty(clientID)) {
            ConnAckMessage connAckMessage2 = new ConnAckMessage();
            connAckMessage2.setReturnCode((byte) 2);
            MqttUtil.writeDataThenClose(channel, connAckMessage2);
            LOG.warn("close channel for connect message with null or empty clientId");
            return false;
        }
        if (connectMessage.isUserFlag() && connectMessage.isPasswordFlag() && this.iAuthenticator.checkValid(connectMessage.getUsername(), connectMessage.getPassword())) {
            return true;
        }
        ConnAckMessage connAckMessage3 = new ConnAckMessage();
        connAckMessage3.setReturnCode((byte) 4);
        MqttUtil.writeDataThenClose(channel, connAckMessage3);
        LOG.warn("close channel for connect message with wrong userName:{} or password:{}", connectMessage.getUsername(), connectMessage.getPassword());
        return false;
    }

    private synchronized void cleanSession(String str) {
        Iterator<String> it = this.topicSubscriptionCache.removeForClientId(str).iterator();
        while (it.hasNext()) {
            this.dasConfig.getCcs().removeObjectFromSet(CentralCacheKeyUtil.getTopicKey(it.next()), String.valueOf(this.dasConfig.getAcsNum()));
        }
        LOG.debug("clean saved subscriptions for clientId:{}", str);
    }

    public void dealMessagePublish(PublishMessage publishMessage) {
        AbstractMessage.QOSType qos = publishMessage.getQos();
        if (AbstractMessage.QOSType.MOST_ONE == qos) {
            publish2Subscribers(publishMessage);
        } else {
            if (AbstractMessage.QOSType.LEAST_ONE != qos && AbstractMessage.QOSType.EXACTLY_ONCE == qos) {
            }
        }
    }

    private void publish2Subscribers(PublishMessage publishMessage) {
        String topicName = publishMessage.getTopicName();
        AbstractMessage.QOSType qos = publishMessage.getQos();
        for (Subscription subscription : this.topicSubscriptionCache.getSubscriptions(topicName).values()) {
            if (qos.ordinal() > subscription.getRequestedQos().ordinal()) {
                publishMessage.setQos(subscription.getRequestedQos());
            }
            String clientId = subscription.getClientId();
            AbstractMessage.QOSType qos2 = publishMessage.getQos();
            if (AbstractMessage.QOSType.MOST_ONE == qos2 && subscription.isActive()) {
                sendPublishMessage(clientId, publishMessage);
            } else if (AbstractMessage.QOSType.LEAST_ONE != qos2 && AbstractMessage.QOSType.EXACTLY_ONCE == qos2) {
            }
            LOG.debug("publish to subscriber for clientId:{}, topic:{}, qos:{}, active:{}", new Object[]{clientId, subscription.getTopicFilter(), qos2, Boolean.valueOf(subscription.isActive())});
        }
    }

    private void sendPublishMessage(String str, PublishMessage publishMessage) {
        Channel channel = this.serverChannelCache.get(str);
        if (channel == null) {
            LOG.warn("send publish message can't find channel for clientId: {}", str);
        } else {
            MqttUtil.writeData(channel, publishMessage);
            LOG.debug("send publish message for clientId: {} on message: {}", str, publishMessage);
        }
    }
}
