package com.vortex.das.mqtt.server;

import com.lmax.disruptor.EventHandler;
import com.vortex.common.lamx.IMessaging;
import com.vortex.common.lamx.LmaxDiscuptorMessaging;
import com.vortex.das.config.ServerProperties;
import com.vortex.das.core.AbsTcpServer;
import com.vortex.das.mqtt.MqttChannelHandler;
import com.vortex.das.mqtt.codec.MQTTDecoder;
import com.vortex.das.mqtt.codec.MQTTEncoder;
import com.vortex.das.mqtt.processor.MqttProtocolProcessor;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;

/* loaded from: input_file:com/vortex/das/mqtt/server/MqttServer.class */
public class MqttServer extends AbsTcpServer {
    private static final Logger LOG = LoggerFactory.getLogger(MqttServer.class);

    @Autowired
    @Qualifier("mqttProtocolProcessor")
    private MqttProtocolProcessor protocolProcessor;
    private ChannelInitializer<SocketChannel> channelInitializer;
    private IMessaging messagingService;

    public void start() {
        int port = this.serverProperties.getPort();
        if (port == 0) {
            LOG.info("mqtt broker tcp transport is disabled");
        } else if (isRunning()) {
            LOG.warn("mqtt broker tcp transport is already running for port:" + port);
        } else {
            LOG.info("mqtt broker tcp server is starting");
            super.start();
        }
    }

    public void stop() {
        LOG.info("mqtt broker tcp server is stopping...");
        this.messagingService.stop();
        super.stop();
        LOG.info("mqtt broker tcp server stopped");
    }

    public ChannelInitializer<SocketChannel> getChannelInitializer(final ServerProperties serverProperties) {
        if (this.channelInitializer == null) {
            this.messagingService = new LmaxDiscuptorMessaging(new EventHandler[]{this.protocolProcessor});
            this.channelInitializer = new ChannelInitializer<SocketChannel>() { // from class: com.vortex.das.mqtt.server.MqttServer.1
                public void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addFirst("idleStateHandler", new IdleStateHandler(serverProperties.getIdleTime(), 0, 0)).addLast("loggingHandler", new LoggingHandler(LogLevel.DEBUG)).addLast("mqttDecoder", new MQTTDecoder()).addLast("mqttEncoder", new MQTTEncoder()).addLast("mqttHandler", new MqttChannelHandler(MqttServer.this.messagingService));
                }
            };
        }
        return this.channelInitializer;
    }

    public long getMessagingRemainBufferSize() {
        if (this.messagingService != null) {
            return this.messagingService.getRemainBufferSize();
        }
        return 0L;
    }
}
