package com.vortex.mps.service.kafka;

import com.vortex.mps.api.dto.MyMsg;
import com.vortex.mps.bean.ProducerBean;
import com.vortex.mps.service.IPublishService;
import com.vortex.util.kafka.IProducer;
import com.vortex.util.kafka.msg.KafkaMsg;
import java.util.concurrent.TimeUnit;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/vortex/mps/service/kafka/PublishServiceImpl.class */
public class PublishServiceImpl implements IPublishService {
    private static final Logger logger = LoggerFactory.getLogger(PublishServiceImpl.class);

    @Autowired
    @Qualifier(ProducerBean.KAFKA_PRODUCER_BEAN_NAME)
    private IProducer producer;
    private Callback defaultCallback = new ExceptionLogCallback();

    /* loaded from: input_file:com/vortex/mps/service/kafka/PublishServiceImpl$ExceptionLogCallback.class */
    static class ExceptionLogCallback implements Callback {
        ExceptionLogCallback() {
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc != null) {
                PublishServiceImpl.logger.error("syncPublish - kafka send msg exception. metadata:{}", recordMetadata, exc);
            }
        }
    }

    @PreDestroy
    public void preDestroy() {
        if (this.producer == null) {
            return;
        }
        try {
            this.producer.stop();
        } catch (Exception e) {
            logger.error("producer stop error: {}", e.getMessage());
        }
    }

    @Override // com.vortex.mps.service.IPublishService
    public void publish(MyMsg myMsg) {
        long currentTimeMillis = System.currentTimeMillis();
        String str = null;
        String str2 = null;
        try {
            str = myMsg.getSourceDeviceType() + myMsg.getSourceDeviceId();
            str2 = myMsg.getTopic();
            KafkaMsg buildMsg = KafkaMsg.buildMsg(str2, str, myMsg);
            logger.info("publish - start topic:{} deviceId:{}", str2, str);
            this.producer.send(buildMsg);
            logger.info("publish - end cost:{} topic:{} deviceId:{}. msg:{}", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), str2, str, buildMsg});
        } catch (Throwable th) {
            logger.error("publish - exception cost:{} topic:{} deviceId:{}. msg:{}", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), str2, str, myMsg, th});
        }
    }

    @Override // com.vortex.mps.service.IPublishService
    public void syncPublish(MyMsg myMsg) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        String str = null;
        String str2 = null;
        try {
            str = myMsg.getSourceDeviceType() + myMsg.getSourceDeviceId();
            str2 = myMsg.getTopic();
            KafkaMsg buildMsg = KafkaMsg.buildMsg(str2, str, myMsg);
            logger.info("syncPublish - start topic:{} deviceId:{}", str2, str);
            RecordMetadata recordMetadata = (RecordMetadata) this.producer.send(buildMsg, this.defaultCallback).get(5000L, TimeUnit.MILLISECONDS);
            logger.info("syncPublish - end cost:{} topic:{} deviceId:{} metadata: {}-{}-{}-{}. msg:{}", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), str2, str, recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset()), Long.valueOf(recordMetadata.timestamp()), buildMsg});
        } catch (Throwable th) {
            logger.error("syncPublish - exception cost:{} topic:{} deviceId:{}. myMsg:{}", new Object[]{Long.valueOf(System.currentTimeMillis() - currentTimeMillis), str2, str, myMsg, th});
            throw new Exception(th);
        }
    }
}
