package com.vortex.mps.service.ons;

import com.vortex.das.msg.IMsg;
import com.vortex.mps.service.IPublishService;
import com.vortex.util.rocketmq.IFactory;
import com.vortex.util.rocketmq.IProducer;
import com.vortex.util.rocketmq.IProducerConfig;
import com.vortex.util.rocketmq.msg.RocketMsg;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:com/vortex/mps/service/ons/OnsAbstractPublishServiceImpl.class */
public abstract class OnsAbstractPublishServiceImpl implements IPublishService {

    @Autowired
    private OnsConfig onsConfig;
    private IProducer producer;
    private static final int QUEUE_SIZE = 1048576;
    private Logger logger = LoggerFactory.getLogger(getClass());
    private BlockingQueue<IMsg> queue = new ArrayBlockingQueue(QUEUE_SIZE);

    @PostConstruct
    public void init() {
        this.producer = getFactory().createProducer(new IProducerConfig() { // from class: com.vortex.mps.service.ons.OnsAbstractPublishServiceImpl.1
            public String getProducerId() {
                return OnsAbstractPublishServiceImpl.this.onsConfig.getProducerId();
            }
        });
        this.logger.info("init completed");
    }

    protected abstract IFactory getFactory();

    @Override // com.vortex.mps.service.IPublishService
    public void publish(IMsg iMsg, String str) {
        this.logger.info("put to publish queue start");
        try {
            this.queue.put(iMsg);
        } catch (InterruptedException e) {
            this.logger.error("put to publish queue error:{}", e.getMessage());
        }
        this.logger.info("put to publish queue end");
    }

    @Override // java.lang.Runnable
    public void run() {
        IMsg poll;
        while (true) {
            this.logger.info("run start: remaining capacity[{}]", Integer.valueOf(this.queue.remainingCapacity()));
            try {
                poll = this.queue.poll(3L, TimeUnit.SECONDS);
            } catch (Throwable th) {
                this.logger.error("run error: " + th.getMessage(), th);
            }
            if (poll != null) {
                String valueOf = String.valueOf(poll.getTag());
                String str = poll.getSourceDeviceType() + poll.getSourceDeviceId();
                this.producer.syncSend(getRocketMsg(poll, this.onsConfig.getTopic(), valueOf, str));
                this.logger.info("publish end: key[{}] tag[{}]", str, valueOf);
                this.logger.info("run end: remaining capacity[{}]", Integer.valueOf(this.queue.remainingCapacity()));
            }
        }
    }

    protected abstract RocketMsg getRocketMsg(IMsg iMsg, String str, String str2, String str3);
}
