package com.vortex.common.service.imp.kafka;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.vortex.common.AbstractKafkaMsgListener;
import com.vortex.common.service.ISubscribePublishService;
import com.vortex.util.kafka.ClientServiceConfig;
import com.vortex.util.kafka.IProducer;
import com.vortex.util.kafka.IService;
import com.vortex.util.kafka.IServiceManager;
import com.vortex.util.kafka.consumer.SimpleConsumer;
import com.vortex.util.kafka.consumer.SimpleConsumerConfig;
import com.vortex.util.kafka.manager.DefaultServiceManager;
import com.vortex.util.kafka.msg.IKafkaMsgListener;
import com.vortex.util.kafka.msg.KafkaMsg;
import com.vortex.util.kafka.producer.SimpleProcuder;
import com.vortex.util.kafka.producer.SimpleProducerConfig;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/vortex/common/service/imp/kafka/KafkaSubscribePublishServiceImpl.class */
public class KafkaSubscribePublishServiceImpl implements ISubscribePublishService {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSubscribePublishServiceImpl.class);
    private final ClientServiceConfig serviceConfig;
    private final IServiceManager manager;
    private IProducer producer;

    public KafkaSubscribePublishServiceImpl(ClientServiceConfig clientServiceConfig) {
        this(clientServiceConfig, new DefaultServiceManager());
    }

    public KafkaSubscribePublishServiceImpl(ClientServiceConfig clientServiceConfig, IServiceManager iServiceManager) {
        this.serviceConfig = clientServiceConfig;
        this.manager = iServiceManager;
    }

    @PostConstruct
    public void kafkaInit() {
        if (!Strings.isNullOrEmpty(this.serviceConfig.getBootstrapServers()) && this.producer == null) {
            SimpleProducerConfig simpleProducerConfig = new SimpleProducerConfig(this.serviceConfig.getBootstrapServers(), "SPS");
            simpleProducerConfig.putAll(this.serviceConfig);
            this.producer = new SimpleProcuder(simpleProducerConfig);
            this.producer.register(this.manager);
            try {
                this.producer.start();
            } catch (Exception e) {
                logger.error(e.toString(), e);
            }
        }
    }

    @PreDestroy
    public void kafkaStop() {
        Iterator it = this.manager.getAllService().iterator();
        while (it.hasNext()) {
            try {
                ((IService) it.next()).stop();
            } catch (Exception e) {
                logger.error("service stop error", e);
            }
        }
    }

    @Override // com.vortex.common.service.ISubscribePublishService
    public void publishMessage(String str, Object obj) {
        publishMessage(str, null, obj);
    }

    @Override // com.vortex.common.service.ISubscribePublishService
    public void publishMessage(String str, String str2, Object obj) {
        Preconditions.checkNotNull(str, "topic is null");
        Preconditions.checkNotNull(obj, "msg is null");
        this.producer.send(KafkaMsg.buildMsg(str, str2, obj));
    }

    @Override // com.vortex.common.service.ISubscribePublishService
    public boolean syncPublishMessage(String str, String str2, Object obj) {
        long currentTimeMillis = System.currentTimeMillis();
        Preconditions.checkNotNull(str, "topic is null");
        Preconditions.checkNotNull(obj, "msg is null");
        try {
            this.producer.send(KafkaMsg.buildMsg(str, str2, obj)).get(3L, TimeUnit.SECONDS);
            logger.debug("syncPublishMessage sent, cost:{}. topic:{} key:{}", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), str, str2});
            return true;
        } catch (Exception e) {
            logger.error("syncPublishMessage exception, cost:{}. topic:{} key:{}", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), str, str2, e});
            return false;
        }
    }

    @Override // com.vortex.common.service.ISubscribePublishService
    public void subscribeMessage(IKafkaMsgListener iKafkaMsgListener, List<String> list) {
        subscribeMessage("SPS" + UUID.randomUUID().toString(), iKafkaMsgListener, list);
    }

    @Override // com.vortex.common.service.ISubscribePublishService
    public void subscribeMessage(String str, IKafkaMsgListener iKafkaMsgListener, List<String> list) {
        Preconditions.checkNotNull(iKafkaMsgListener, "messageListener is null");
        Preconditions.checkNotNull(list, "topics is null");
        Preconditions.checkState(list.size() > 0, "invalid topics");
        SimpleConsumerConfig simpleConsumerConfig = new SimpleConsumerConfig(this.serviceConfig.getBootstrapServers(), "SPS" + UUID.randomUUID().toString(), str);
        simpleConsumerConfig.putAll(this.serviceConfig);
        SimpleConsumer simpleConsumer = new SimpleConsumer(simpleConsumerConfig);
        try {
            simpleConsumer.start();
        } catch (Exception e) {
            logger.error("consumer start error", e);
        }
        StringBuilder sb = new StringBuilder();
        for (String str2 : list) {
            if (!Strings.isNullOrEmpty(str2)) {
                sb.append("(");
                sb.append(str2);
                sb.append(")");
                sb.append("|");
            }
        }
        sb.setLength(sb.length() - 1);
        String sb2 = sb.toString();
        simpleConsumer.subscribe(Pattern.compile(sb2), (AbstractKafkaMsgListener) iKafkaMsgListener);
        logger.info("subscribeMessage, groupId:{} topic:{}", str, sb2);
    }

    @Override // com.vortex.common.service.ISubscribePublishService
    public void unsubscribeMessage(IKafkaMsgListener iKafkaMsgListener, List<String> list) {
        throw new RuntimeException("取消订阅，一个消费者重新订阅");
    }
}
