package com.vortex.mps;

import com.vortex.common.util.StringUtils;
import com.vortex.das.msg.IMsg;
import com.vortex.device.data.util.Utils;
import com.vortex.mps.config.MsgPubConfig;
import com.vortex.mps.service.IMsgPubService;
import com.vortex.mps.service.IPublishService;
import com.vortex.mps.service.kafka.PublishServiceImpl;
import com.vortex.mps.util.MyThreadFactory;
import com.vortex.mps.util.NameUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service(MsgProvider.BEAN_NAME)
/* loaded from: input_file:com/vortex/mps/MsgProvider.class */
public class MsgProvider implements IMsgPubService, BeanFactoryAware {
    public static final String BEAN_NAME = "mps";
    private static Logger logger = LoggerFactory.getLogger(MsgProvider.class);
    private static final int QUEUE_SIZE = 100000;
    private static BlockingQueue<IMsg> queue = new ArrayBlockingQueue(QUEUE_SIZE);
    private static ExecutorService EXECUTOR = Executors.newSingleThreadExecutor();
    private List<IPublishService> publishServiceList = new ArrayList();
    private BeanFactory beanFactory;
    private static final int PUBLISH_SERVICE_NUMBER = 2;

    @Autowired
    private MsgPubConfig msgPubConfig;

    /* loaded from: input_file:com/vortex/mps/MsgProvider$MsgSender.class */
    private class MsgSender implements Runnable {
        private Logger logger;

        private MsgSender() {
            this.logger = LoggerFactory.getLogger(MsgProvider.class);
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    IMsg iMsg = (IMsg) MsgProvider.queue.poll(3L, TimeUnit.SECONDS);
                    if (iMsg == null) {
                        Thread.sleep(100L);
                    } else {
                        process(iMsg);
                    }
                } catch (Exception e) {
                    this.logger.error("run error: " + e.getMessage(), e);
                }
            }
        }

        private void process(IMsg iMsg) {
            this.logger.info("process msg: {}", iMsg);
            String businessType = getBusinessType(iMsg);
            if (StringUtils.isBlank(businessType)) {
                this.logger.warn("msg has no matched businessType");
                return;
            }
            try {
                publish(iMsg, businessType);
            } catch (Exception e) {
                this.logger.error(e.toString(), e);
            }
        }

        private String getBusinessType(IMsg iMsg) {
            Object tag = iMsg.getTag();
            if (tag == null) {
                return null;
            }
            return tag.toString();
        }

        private void publish(IMsg iMsg, String str) {
            String ownerIdByDeviceId = MsgProvider.this.msgPubConfig.getDms().getOwnerIdByDeviceId(iMsg.getSourceDeviceType() + iMsg.getSourceDeviceId());
            if (MsgProvider.this.msgPubConfig.isSend2Business()) {
                publishByBusinessType(iMsg, ownerIdByDeviceId, str);
            }
            if (MsgProvider.this.msgPubConfig.isSend2Owner()) {
                publishByOwner(iMsg, ownerIdByDeviceId, str);
            }
        }

        private void publishByBusinessType(IMsg iMsg, String str, String str2) {
            this.logger.info("publishByBusinessType msg: {}", iMsg);
            String str3 = iMsg.getSourceDeviceType() + iMsg.getSourceDeviceId();
            try {
                MyMsg myMsg = (MyMsg) Utils.copy(iMsg, MyMsg.class);
                myMsg.setOwnerId(str);
                myMsg.setTopic(str2);
                IPublishService publishService = getPublishService(str3);
                if (publishService == null) {
                    this.logger.error("no matched publish service for deviceId[{}]", str3);
                } else {
                    publishService.publish(myMsg);
                }
            } catch (Exception e) {
                this.logger.error("put to publish queue error:{}", e.getMessage());
            }
        }

        private void publishByOwner(IMsg iMsg, String str, String str2) {
            this.logger.info("publishByOwner msg: {}", iMsg);
            String str3 = iMsg.getSourceDeviceType() + iMsg.getSourceDeviceId();
            if (StringUtils.isBlank(str)) {
                this.logger.warn("no owner for device [{}]", str3);
                return;
            }
            try {
                MyMsg myMsg = (MyMsg) Utils.copy(iMsg, MyMsg.class);
                myMsg.setOwnerId(str);
                myMsg.setTopic(str2 + "_" + str);
                IPublishService publishService = getPublishService(str3);
                if (publishService == null) {
                    this.logger.error("no matched publish service for deviceId[{}]", str3);
                } else {
                    publishService.publish(myMsg);
                }
            } catch (Exception e) {
                this.logger.error("put to publish queue error:{}", e.getMessage());
            }
        }

        private IPublishService getPublishService(String str) {
            int calcSeed = NameUtil.calcSeed(str, MsgProvider.PUBLISH_SERVICE_NUMBER);
            if (calcSeed >= 0 && calcSeed < MsgProvider.PUBLISH_SERVICE_NUMBER) {
                return (IPublishService) MsgProvider.this.publishServiceList.get(calcSeed);
            }
            this.logger.error("seed is not valid: deviceId[{}] seed[{}]", str, Integer.valueOf(calcSeed));
            return null;
        }
    }

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
    }

    @PostConstruct
    public void init() {
        EXECUTOR.execute(new MsgSender());
        MyThreadFactory myThreadFactory = new MyThreadFactory("PubServ");
        for (int i = 0; i < PUBLISH_SERVICE_NUMBER; i++) {
            IPublishService iPublishService = (IPublishService) this.beanFactory.getBean(PublishServiceImpl.BEAN_NAME);
            this.publishServiceList.add(iPublishService);
            myThreadFactory.newThread(iPublishService).start();
        }
        logger.info("initPublishService [{}] threads of publish service[{}] ", Integer.valueOf(PUBLISH_SERVICE_NUMBER), PublishServiceImpl.BEAN_NAME);
    }

    public void dispose() {
    }

    public void putToQueue(IMsg iMsg) {
        logger.info("received msg: {}", iMsg);
        try {
            queue.add(iMsg);
        } catch (Exception e) {
            logger.error("put to msg queue error:{}", e.getMessage());
        }
    }
}
