package com.vortex.ai.base.service.impl;

import com.alibaba.fastjson.JSON;
import com.vortex.ai.base.dao.mongo.RePushDataRepository;
import com.vortex.ai.base.model.mongo.RepushData;
import com.vortex.ai.base.service.IPublishService;
import com.vortex.common.util.StringUtils;
import com.vortex.device.util.thread.NamedThreadFactory;
import com.vortex.util.kafka.IProducer;
import com.vortex.util.kafka.Util;
import com.vortex.util.kafka.msg.KafkaMsg;
import com.vortex.util.kafka.producer.SimpleProcuder;
import com.vortex.util.kafka.producer.SimpleProducerConfig;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/vortex/ai/base/service/impl/PublishServiceImpl.class */
public class PublishServiceImpl implements IPublishService {
    private static final Logger log = LoggerFactory.getLogger(PublishServiceImpl.class);
    private final int cpuSize = Runtime.getRuntime().availableProcessors();

    @Value("${kafka.broker.list}")
    private String brokerList;
    private static IProducer producer;

    @Autowired
    private RePushDataRepository repushDataRepository;
    private static ExecutorService executor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/vortex/ai/base/service/impl/PublishServiceImpl$MyProducerCallback.class */
    public class MyProducerCallback implements Callback {
        private Object data;
        private String topic;
        private String key;

        public MyProducerCallback(Object obj, String str, String str2) {
            this.data = obj;
            this.topic = str2;
            this.key = str;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc == null) {
                PublishServiceImpl.log.info("msg published: msg[{}] topic[{}]", this.data.toString(), this.topic);
                return;
            }
            RepushData repushData = new RepushData();
            repushData.setKey(this.key);
            repushData.setTime(System.currentTimeMillis());
            repushData.setTopic(this.topic);
            repushData.setData(Util.pojo2String(this.data));
            PublishServiceImpl.this.repushDataRepository.save(repushData);
        }
    }

    @PostConstruct
    public void init() {
        executor = new ThreadPoolExecutor(this.cpuSize, this.cpuSize, 120L, TimeUnit.SECONDS, new LinkedBlockingQueue(1000), new NamedThreadFactory("publish Worker"), new ThreadPoolExecutor.CallerRunsPolicy());
        if (producer == null) {
            SimpleProducerConfig simpleProducerConfig = new SimpleProducerConfig(this.brokerList, "mts");
            simpleProducerConfig.put("retries", 3);
            simpleProducerConfig.put("max.in.flight.requests.per.connection", 1);
            simpleProducerConfig.put("max.request.size", 8388608);
            producer = new SimpleProcuder(simpleProducerConfig);
            try {
                producer.start();
            } catch (Exception e) {
                log.error("producer start error: {}", e.getMessage());
            }
        }
    }

    @PreDestroy
    public void preDestroy() {
        if (producer != null) {
            try {
                producer.stop();
            } catch (Exception e) {
                log.error("producer stop error: {}", e.getMessage());
            }
        }
    }

    @Override // com.vortex.ai.base.service.IPublishService
    public void pushToQueue(Object obj, String str, String str2) {
        executor.submit(() -> {
            try {
                log.info("publish key: {} topic：{}", str, str2);
                if (StringUtils.isBlank(str2)) {
                    log.warn("topic is blank");
                    return;
                }
                long currentTimeMillis = System.currentTimeMillis();
                publish(obj, str, str2);
                log.info("publish key: {} topic：{} cost:{}", new Object[]{str, str2, Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
            } catch (Exception e) {
                log.error(e.toString(), e);
            }
        });
    }

    private void publish(Object obj, String str, String str2) {
        KafkaMsg buildMsg = KafkaMsg.buildMsg(str2, str, obj);
        try {
            log.debug(">>>>> publish {}:{} begin", str, str2);
            log.debug(">>>>> publish {}:{} end: {}", new Object[]{str, str2, JSON.toJSONString((RecordMetadata) producer.send(buildMsg, new MyProducerCallback(obj, str, str2)).get())});
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
}
