package com.vortex.mps.service.kafka;

import com.vortex.mps.MyMsg;
import com.vortex.mps.service.IPublishService;
import com.vortex.util.kafka.IProducer;
import com.vortex.util.kafka.msg.KafkaMsg;
import com.vortex.util.kafka.producer.SimpleProcuder;
import com.vortex.util.kafka.producer.SimpleProducerConfig;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;

@Scope("prototype")
@Service(PublishServiceImpl.BEAN_NAME)
/* loaded from: input_file:com/vortex/mps/service/kafka/PublishServiceImpl.class */
public class PublishServiceImpl implements IPublishService {
    public static final String BEAN_NAME = "PublishServiceImpl";
    private static final Logger logger = LoggerFactory.getLogger(PublishServiceImpl.class);
    private static final int QUEUE_SIZE = 1048576;
    private BlockingQueue<MyMsg> queue = new ArrayBlockingQueue(QUEUE_SIZE);

    @Autowired
    private KafkaConfig kafkaConfig;
    private static IProducer producer;

    @PostConstruct
    public void postConstruct() {
        if (producer == null) {
            producer = new SimpleProcuder(new SimpleProducerConfig(this.kafkaConfig.getBrokerList(), "MPS"));
            try {
                producer.start();
            } catch (Exception e) {
                logger.error("producer start error: {}", e.getMessage());
            }
        }
    }

    @PreDestroy
    public void preDestroy() {
        if (producer != null) {
            try {
                producer.stop();
            } catch (Exception e) {
                logger.error("producer stop error: {}", e.getMessage());
            }
        }
    }

    @Override // com.vortex.mps.service.IPublishService
    public void publish(MyMsg myMsg) {
        logger.info("receive msg: {}", myMsg);
        try {
            this.queue.put(myMsg);
        } catch (Exception e) {
            logger.error("put to publish queue error:{}", e.getMessage());
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            try {
                MyMsg poll = this.queue.poll(3L, TimeUnit.SECONDS);
                if (poll == null) {
                    Thread.sleep(100L);
                } else {
                    String str = poll.getSourceDeviceType() + poll.getSourceDeviceId();
                    RecordMetadata recordMetadata = (RecordMetadata) producer.send(KafkaMsg.buildMsg(poll.getTopic(), poll)).get();
                    Thread.sleep(10 + new Random().nextInt(10));
                    logger.info("deviceId [{}]: publish result [{}]", str, recordMetadata.toString());
                }
            } catch (Throwable th) {
                logger.error("run error: " + th.getMessage(), th);
            }
        }
    }
}
