package com.vortex.ai.base.service.impl;

import com.vortex.ai.base.dao.mongo.RePushDataRepository;
import com.vortex.ai.base.model.mongo.RepushData;
import com.vortex.ai.base.service.IRepushService;
import com.vortex.util.kafka.IProducer;
import com.vortex.util.kafka.msg.KafkaMsg;
import com.vortex.util.kafka.producer.SimpleProcuder;
import com.vortex.util.kafka.producer.SimpleProducerConfig;
import java.util.List;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Service;

@Service
@ConditionalOnExpression("'${spring.application.name}'.equals('ai-base')")
/* loaded from: input_file:com/vortex/ai/base/service/impl/RepublishServiceImpl.class */
public class RepublishServiceImpl implements IRepushService {
    private static final Logger log = LoggerFactory.getLogger(RepublishServiceImpl.class);

    @Value("${kafka.broker.list}")
    private String brokerList;
    private static IProducer producer;
    private static final int PAGE_SIZE = 1000;

    @Autowired
    private RePushDataRepository repushDataRepository;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/vortex/ai/base/service/impl/RepublishServiceImpl$MyProducerCallback.class */
    public class MyProducerCallback implements Callback {
        private String id;

        public MyProducerCallback(String str) {
            this.id = str;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc != null) {
                RepublishServiceImpl.log.error(exc.getMessage(), exc);
            } else {
                RepublishServiceImpl.log.info("{} published", this.id);
                RepublishServiceImpl.this.repushDataRepository.deleteById(this.id);
            }
        }
    }

    @PostConstruct
    public void init() {
        if (producer == null) {
            SimpleProducerConfig simpleProducerConfig = new SimpleProducerConfig(this.brokerList, "MPS");
            simpleProducerConfig.put("retries", 3);
            simpleProducerConfig.put("max.in.flight.requests.per.connection", 1);
            producer = new SimpleProcuder(simpleProducerConfig);
            try {
                producer.start();
            } catch (Exception e) {
                log.error("producer start error: {}", e.getMessage());
            }
        }
    }

    @PreDestroy
    public void preDestroy() {
        if (producer != null) {
            try {
                producer.stop();
            } catch (Exception e) {
                log.error("producer stop error: {}", e.getMessage());
            }
        }
    }

    @Override // com.vortex.ai.base.service.IRepushService
    public void repush() {
        long j = 0;
        while (true) {
            Query query = new Query(Criteria.where("time").gt(Long.valueOf(j)));
            query.with(Sort.by(Sort.Direction.ASC, new String[]{"time"}));
            List<RepushData> content = this.repushDataRepository.find(query, PageRequest.of(0, PAGE_SIZE)).getContent();
            if (content.size() == 0) {
                return;
            }
            for (RepushData repushData : content) {
                j = repushData.getTime();
                publish(repushData);
            }
        }
    }

    private void publish(RepushData repushData) {
        KafkaMsg buildMsg = KafkaMsg.buildMsg(repushData.getTopic(), repushData.getKey(), repushData.getData());
        log.info("publish {}", buildMsg);
        try {
            producer.send(buildMsg, new MyProducerCallback(repushData.getId()));
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
}
