package com.vortex.rss.service;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.lmax.disruptor.EventHandler;
import com.vortex.common.AbstractKafkaMsgListener;
import com.vortex.das.DasTopic;
import com.vortex.das.msg.IMsg;
import com.vortex.das.pojo.CacheMsgWrap;
import com.vortex.dms.IDmsMsgProcessor;
import com.vortex.rss.cfg.RssConfig;
import com.vortex.util.disruptor.IMessaging;
import com.vortex.util.disruptor.LmaxDiscuptorMessaging;
import com.vortex.util.disruptor.ValueEvent;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/vortex/rss/service/RssKafkaCacheMsgHandler.class */
public class RssKafkaCacheMsgHandler extends AbstractKafkaMsgListener implements EventHandler<ValueEvent> {

    @Autowired
    private RssConfig rssConfig;

    @Autowired
    private IDmsMsgProcessor<IMsg> dmsMsgProcessor;
    private static final String RSS_GROUP_ID = "RSS";
    private final Logger LOG = LoggerFactory.getLogger(getClass());

    @Value("${disruptor.ringBufferSize:16384}")
    private int ringBufferSize;
    private IMessaging messagingService;

    @PostConstruct
    public void init() {
        this.messagingService = new LmaxDiscuptorMessaging(this.ringBufferSize, new EventHandler[]{this});
        this.rssConfig.getKafkaSps().subscribeMessage(RSS_GROUP_ID, this, Lists.newArrayList(new String[]{DasTopic.getDasDataTopic()}));
    }

    @PreDestroy
    private void onDestroy() {
        this.messagingService.stop();
    }

    public void onEvent(ValueEvent valueEvent, long j, boolean z) throws Exception {
        IMsg iMsg = null;
        try {
            iMsg = (IMsg) valueEvent.getValue();
            if (iMsg != null) {
                this.dmsMsgProcessor.processMsg(iMsg);
            }
        } catch (Exception e) {
            this.LOG.error("handleMsg error. \nmsg content: {}\nexception:{}", iMsg, e.getMessage());
        }
    }

    protected void handleMessage(String str, String str2) {
        CacheMsgWrap cacheMsgWrap = (CacheMsgWrap) JSON.parseObject(str2, CacheMsgWrap.class);
        if (cacheMsgWrap != null) {
            this.messagingService.publish(cacheMsgWrap.getMsg());
        }
    }
}
