package com.vortex.peiqi.data.service;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.mongodb.BasicDBObject;
import com.mongodb.client.DistinctIterable;
import com.vortex.common.protocol.BusinessDataEnum;
import com.vortex.das.msg.DeviceMsg;
import com.vortex.device.util.thread.NamedThreadFactory;
import com.vortex.dms.dto.DeviceStatus;
import com.vortex.dms.ui.IDmsFeignClient;
import com.vortex.mps.api.dto.MyMsg;
import com.vortex.mps.api.service.IMpsApiService;
import com.vortex.peiqi.data.dao.mongo.ICmdSendRecordDao;
import com.vortex.peiqi.data.dao.mongo.ISendRecordStatisticsDao;
import com.vortex.peiqi.data.model.mongo.CmdSendRecord;
import com.vortex.peiqi.data.model.mongo.SendRecordStatistics;
import com.vortex.util.redis.ICentralCacheService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/vortex/peiqi/data/service/CacheMsgSendService.class */
public class CacheMsgSendService implements Runnable {
    private static final int QUEUE_SIZE = 1000000;

    @Autowired
    private MongoTemplate mongoTemplate;

    @Autowired
    private ICmdSendRecordDao cmdSendRecordDao;

    @Autowired
    private ISendRecordStatisticsDao sendRecordStatisticsDao;

    @Autowired
    private ICentralCacheService ccs;
    private ExecutorService executor;

    @Autowired
    private IDmsFeignClient dmsFeignClient;

    @Autowired
    private IMpsApiService mpsApiService;
    private static final Logger LOGGER = LoggerFactory.getLogger(CacheMsgSendService.class);
    private static final Long TIME_OUT_MESC = 259200000L;
    private static final Long FIRST_RETRAN_TIME = 40000L;
    private static final Long RETRAN_TIME = 20000L;
    private static final int THREAD_SIZE = Runtime.getRuntime().availableProcessors() * 2;
    private static AtomicInteger count = new AtomicInteger(0);
    private static Boolean flag = false;

    /* loaded from: input_file:com/vortex/peiqi/data/service/CacheMsgSendService$ResendThread.class */
    public class ResendThread implements Runnable {
        private String deviceCode;

        public ResendThread(String str) {
            this.deviceCode = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    CacheMsgSendService.this.reSend(this.deviceCode);
                    int decrementAndGet = CacheMsgSendService.count.decrementAndGet();
                    CacheMsgSendService.LOGGER.info("[{}] resend finished, remain count:{}", this.deviceCode, Integer.valueOf(decrementAndGet));
                    if (decrementAndGet == 0) {
                        Boolean unused = CacheMsgSendService.flag = false;
                    }
                } catch (Exception e) {
                    CacheMsgSendService.LOGGER.error(e.toString(), e);
                    int decrementAndGet2 = CacheMsgSendService.count.decrementAndGet();
                    CacheMsgSendService.LOGGER.info("[{}] resend finished, remain count:{}", this.deviceCode, Integer.valueOf(decrementAndGet2));
                    if (decrementAndGet2 == 0) {
                        Boolean unused2 = CacheMsgSendService.flag = false;
                    }
                }
            } catch (Throwable th) {
                int decrementAndGet3 = CacheMsgSendService.count.decrementAndGet();
                CacheMsgSendService.LOGGER.info("[{}] resend finished, remain count:{}", this.deviceCode, Integer.valueOf(decrementAndGet3));
                if (decrementAndGet3 == 0) {
                    Boolean unused3 = CacheMsgSendService.flag = false;
                }
                throw th;
            }
        }
    }

    @PostConstruct
    private void init() {
        this.executor = new ThreadPoolExecutor(THREAD_SIZE, THREAD_SIZE, 120L, TimeUnit.SECONDS, new LinkedBlockingQueue(QUEUE_SIZE), new NamedThreadFactory("retran handler"), new RejectedExecutionHandler() { // from class: com.vortex.peiqi.data.service.CacheMsgSendService.1
            @Override // java.util.concurrent.RejectedExecutionHandler
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                CacheMsgSendService.LOGGER.info("blocking queue full, drop execute task");
            }
        });
        new Thread(this).start();
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            try {
                LOGGER.info("start CacheMsgSend");
                BasicDBObject append = new BasicDBObject().append("msgCode", "A206").append("isDeleted", false);
                Long valueOf = Long.valueOf(System.currentTimeMillis());
                DistinctIterable distinct = this.mongoTemplate.getCollection("cmd_send_record").distinct("deviceCode", append, String.class);
                LOGGER.info("LoadneedReSendDevice costs:" + (System.currentTimeMillis() - valueOf.longValue()));
                ArrayList newArrayList = Lists.newArrayList(distinct);
                LOGGER.info("needReSendDeviceCount:" + newArrayList.size());
                count = new AtomicInteger(newArrayList.size());
                if (newArrayList.size() > 0) {
                    flag = true;
                }
                while (newArrayList.iterator().hasNext()) {
                    this.executor.execute(new ResendThread(newArrayList.iterator().next().toString()));
                }
                while (flag.booleanValue()) {
                    try {
                        Thread.sleep(10L);
                    } catch (Exception e) {
                        LOGGER.error(e.toString(), e);
                    }
                }
                LOGGER.info("end CacheMsgSend costs:" + (System.currentTimeMillis() - valueOf.longValue()));
                Thread.sleep(500L);
            } catch (Exception e2) {
                LOGGER.error(e2.toString(), e2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reSend(String str) {
        try {
            DeviceStatus deviceStatus = (DeviceStatus) this.dmsFeignClient.getDeviceStatus(str).getRet();
            if (deviceStatus != null && deviceStatus.isConnected()) {
                if (this.ccs.containsKey("peiqi:cmd:speech:" + str)) {
                    Map map = (Map) this.ccs.getObject("peiqi:cmd:speech:" + str, Map.class);
                    String valueOf = String.valueOf(map.get("notificationId"));
                    Long valueOf2 = Long.valueOf(String.valueOf(map.get("lastSendTime")));
                    if (Integer.valueOf(Integer.parseInt(String.valueOf(map.get("sendTime").toString()))).intValue() == 1) {
                        if (System.currentTimeMillis() - valueOf2.longValue() < FIRST_RETRAN_TIME.longValue()) {
                            return;
                        }
                    } else if (System.currentTimeMillis() - valueOf2.longValue() < RETRAN_TIME.longValue()) {
                        return;
                    }
                    Long valueOf3 = Long.valueOf(System.currentTimeMillis());
                    Query query = Query.query(new Criteria("deviceCode").is(str));
                    query.addCriteria(new Criteria("notificationId").is(valueOf));
                    query.addCriteria(new Criteria("msgCode").is("A206"));
                    query.addCriteria(new Criteria("isDeleted").is(false));
                    query.with(Sort.by(Sort.Direction.ASC, new String[]{"createTime"}));
                    List content = this.cmdSendRecordDao.find(query, PageRequest.of(0, 1)).getContent();
                    LOGGER.info(str + " load first data costs:" + (System.currentTimeMillis() - valueOf3.longValue()));
                    if (content == null || content.size() == 0) {
                        this.ccs.removeObject("peiqi:cmd:speech:" + str);
                        return;
                    }
                    CmdSendRecord cmdSendRecord = (CmdSendRecord) content.get(0);
                    if (System.currentTimeMillis() - Long.valueOf(String.valueOf(map.get("createTime"))).longValue() > TIME_OUT_MESC.longValue()) {
                        LOGGER.info("deviceCode[{}] noticeId[{}] send time out", str, valueOf);
                        this.ccs.removeObject("peiqi:cmd:speech:" + str);
                        this.cmdSendRecordDao.delete(cmdSendRecord);
                        LOGGER.info("deviceCode[{}] send msg fail after 3 days msg[{}]", str, cmdSendRecord.toString());
                        SendRecordStatistics sendRecordStatistics = new SendRecordStatistics();
                        BeanUtils.copyProperties(cmdSendRecord, sendRecordStatistics);
                        sendRecordStatistics.setLastestSendTime(valueOf2);
                        sendRecordStatistics.setSendTime(Integer.valueOf(Integer.parseInt(String.valueOf(map.get("sendTime")))));
                        sendRecordStatistics.setDeleted(true);
                        sendRecordStatistics.setResponseTime(0L);
                        this.sendRecordStatisticsDao.save(sendRecordStatistics);
                        publishToKafka(cmdSendRecord);
                        return;
                    }
                    map.put("sendTime", Integer.valueOf(Integer.parseInt(String.valueOf(map.get("sendTime"))) + 1));
                    map.put("lastSendTime", Long.valueOf(System.currentTimeMillis()));
                    this.ccs.putObject("peiqi:cmd:speech:" + str, map);
                    sendToDms(cmdSendRecord);
                } else {
                    Long valueOf4 = Long.valueOf(System.currentTimeMillis());
                    Query query2 = Query.query(new Criteria("deviceCode").is(str));
                    query2.addCriteria(new Criteria("msgCode").is("A206"));
                    query2.addCriteria(new Criteria("isDeleted").is(false));
                    query2.with(Sort.by(Sort.Direction.ASC, new String[]{"createTime"}));
                    List content2 = this.cmdSendRecordDao.find(query2, PageRequest.of(0, 1)).getContent();
                    LOGGER.info(str + " load data costs:" + (System.currentTimeMillis() - valueOf4.longValue()));
                    if (content2 == null || content2.size() == 0) {
                        return;
                    }
                    CmdSendRecord cmdSendRecord2 = (CmdSendRecord) content2.get(0);
                    HashMap newHashMap = Maps.newHashMap();
                    newHashMap.put("notificationId", cmdSendRecord2.getNotificationId());
                    newHashMap.put("createTime", Long.valueOf(System.currentTimeMillis()));
                    newHashMap.put("lastSendTime", Long.valueOf(System.currentTimeMillis()));
                    newHashMap.put("sendTime", 1);
                    this.ccs.putObject("peiqi:cmd:speech:" + str, newHashMap);
                    sendToDms(cmdSendRecord2);
                }
            }
        } catch (Exception e) {
            LOGGER.error(e.toString(), e);
        }
    }

    private void sendToDms(CmdSendRecord cmdSendRecord) {
        String deviceCode = cmdSendRecord.getDeviceCode();
        DeviceMsg newMsgFromCloud = DeviceMsg.newMsgFromCloud("A206", deviceCode.substring(0, 5), deviceCode.substring(5));
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("multiMediaDataFormat", cmdSendRecord.getMultiMediaDataFormat());
        newHashMap.put("noticeData", cmdSendRecord.getNoticeData());
        newHashMap.put("notificationId", cmdSendRecord.getNotificationId());
        newHashMap.put("senderLabel", cmdSendRecord.getSenderLabel());
        newHashMap.put("runningNum", cmdSendRecord.getRunningNo());
        newMsgFromCloud.setParams(newHashMap);
        LOGGER.info("reSend: targetDeviceType[{}] targetDeviceId[{}]  msgCode[{}]  runningNo[{}]", new Object[]{deviceCode.substring(0, 5), deviceCode.substring(5), "A206", String.valueOf(newHashMap.get("runningNum"))});
        this.dmsFeignClient.sendMsg(newMsgFromCloud);
    }

    private void publishToKafka(CmdSendRecord cmdSendRecord) {
        MyMsg myMsg = new MyMsg();
        myMsg.setMsgCode("0000");
        myMsg.setSourceDeviceType(cmdSendRecord.getDeviceCode().substring(0, 5));
        myMsg.setSourceDeviceId(cmdSendRecord.getDeviceCode().substring(5));
        myMsg.setTargetDeviceType("CLOUD");
        myMsg.setTargetDeviceId("VORTEX__PLAT");
        myMsg.setTag(BusinessDataEnum.PEIQI_GENERATE_RES);
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("noticeId", cmdSendRecord.getNotificationId());
        newHashMap.put("ackPacketCode", "A206");
        newHashMap.put("resultCode", "99");
        myMsg.setParams(newHashMap);
        this.mpsApiService.putToQueue(myMsg);
    }
}
