package com.vortex.platform.mns.service.impl;

import com.google.common.base.Joiner;
import com.vortex.platform.mns.common.MessageType;
import com.vortex.platform.mns.dto.ListenerDto;
import com.vortex.platform.mns.selector.Selectors;
import com.vortex.platform.mns.service.MnsService;
import com.vortex.platform.mns.service.MnsServiceRegistry;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.SmartLifecycle;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.dsl.context.IntegrationFlowContext;
import org.springframework.integration.dsl.context.IntegrationFlowRegistration;
import org.springframework.integration.dsl.core.MessageProcessorSpec;
import org.springframework.integration.dsl.kafka.Kafka;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;

@Service
/* loaded from: input_file:com/vortex/platform/mns/service/impl/MnsServiceImpl.class */
public class MnsServiceImpl implements MnsService, SmartLifecycle {
    private static final Logger log;
    private boolean isRunning = false;
    private static final String SERVICE_NAME;

    @Autowired
    private IntegrationFlowContext flowContext;

    @Autowired
    private ListenerServiceImpl listenerService;

    @Autowired
    private KafkaProperties kafkaProperties;
    static final /* synthetic */ boolean $assertionsDisabled;

    @Override // com.vortex.platform.mns.service.MnsService
    public void removeListener(ListenerDto listenerDto) {
        if (resolveKafkaInputChannel(listenerDto.getTopic()) == null || this.flowContext.getRegistrationById(listenerDto.getId().toString()) == null) {
            return;
        }
        this.flowContext.remove(listenerDto.getId().toString());
    }

    @Override // com.vortex.platform.mns.service.MnsService
    public void updateListener(ListenerDto listenerDto) {
        removeListener(listenerDto);
        addListener(listenerDto);
    }

    @Override // com.vortex.platform.mns.service.MnsService
    public void addListener(ListenerDto listenerDto) {
        String topic = listenerDto.getTopic();
        Assert.hasText(topic, "Topic should not be empty");
        String resolveKafkaInputChannel = resolveKafkaInputChannel(topic);
        Assert.hasText(listenerDto.getDestination(), "Destination should not be empty");
        MessageType messageType = listenerDto.getMessageType();
        Assert.notNull(messageType, "Message type should not be empty");
        Selectors resolveSelector = Selectors.resolveSelector(messageType);
        MessageProcessorSpec<?> processorSpec = resolveSelector.processorSpec(listenerDto);
        MessageHandler messageHandler = resolveSelector.messageHandler(listenerDto);
        log.info("Creating a message handler, info: {}", listenerDto.toString());
        this.flowContext.registration(IntegrationFlows.from(resolveKafkaInputChannel).filter(processorSpec).enrichHeaders(headerEnricherSpec -> {
            headerEnricherSpec.header("Content-Type", "application/json");
        }).handle(messageHandler).get()).id(listenerDto.getId().toString()).register();
    }

    private String resolveKafkaInputChannel(String str) {
        KafkaDslIdBuilder kafkaDslIdBuilder = new KafkaDslIdBuilder(str);
        String flowId = kafkaDslIdBuilder.flowId();
        IntegrationFlowRegistration registrationById = this.flowContext.getRegistrationById(flowId);
        if (registrationById != null) {
            MessageChannel inputChannel = registrationById.getInputChannel();
            if ($assertionsDisabled || inputChannel != null) {
                return kafkaDslIdBuilder.pubSubId();
            }
            throw new AssertionError();
        }
        KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = (KafkaMessageDrivenChannelAdapter) Kafka.messageDrivenChannelAdapter(kafkaMessageListenerContainer(Collections.singletonList(str))).id(kafkaDslIdBuilder.sourceId()).outputChannel((DirectChannel) MessageChannels.direct(kafkaDslIdBuilder.outputChannelId()).get()).get();
        this.flowContext.registration(IntegrationFlows.from(kafkaMessageDrivenChannelAdapter).channel(MessageChannels.publishSubscribe(kafkaDslIdBuilder.pubSubId(), new ThreadPoolExecutor(0, Integer.MAX_VALUE, 1L, TimeUnit.HOURS, new SynchronousQueue()))).get()).id(flowId).register();
        return kafkaDslIdBuilder.pubSubId();
    }

    private KafkaMessageListenerContainer<String, String> kafkaMessageListenerContainer(List<String> list) {
        return new KafkaMessageListenerContainer<>(kafkaConsumerFactory(list), new ContainerProperties((String[]) list.toArray(new String[0])));
    }

    private ConsumerFactory<String, String> kafkaConsumerFactory(List<String> list) {
        Map buildConsumerProperties = this.kafkaProperties.buildConsumerProperties();
        String join = Joiner.on("").join(list);
        String str = "mns-g-" + join;
        buildConsumerProperties.put("group.id", str);
        buildConsumerProperties.put("client.id", "mns-c-" + join);
        return new DefaultKafkaConsumerFactory(buildConsumerProperties);
    }

    public void start() {
        log.info("{} is start to run", SERVICE_NAME);
        this.listenerService.findAll().forEach(this::addListener);
        MnsServiceRegistry.register(SERVICE_NAME, this);
        this.isRunning = true;
    }

    public void stop() {
        MnsServiceRegistry.remove(SERVICE_NAME);
        this.isRunning = false;
    }

    public boolean isRunning() {
        return this.isRunning;
    }

    public boolean isAutoStartup() {
        return true;
    }

    public void stop(Runnable runnable) {
        stop();
        new Thread(runnable).start();
        this.isRunning = false;
    }

    public int getPhase() {
        return Integer.MAX_VALUE;
    }

    static {
        $assertionsDisabled = !MnsServiceImpl.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(MnsServiceImpl.class);
        SERVICE_NAME = MnsServiceImpl.class.getSimpleName();
    }
}
