package com.vortex.cloud.kafka.api.impl;

import com.alibaba.fastjson.JSON;
import com.vortex.cloud.kafka.api.IProductToKafkaService;
import com.vortex.cloud.kafka.api.config.ProducerConfig;
import com.vortex.cloud.kafka.api.dto.CommonBaseDTO;
import com.vortex.cloud.kafka.api.dto.DataChangeNotifyDTO;
import com.vortex.cloud.kafka.api.dto.KafkaPushDTO;
import com.vortex.cloud.kafka.api.enums.PushDataTypeEnum;
import com.vortex.util.kafka.IProducer;
import com.vortex.util.kafka.msg.KafkaMsg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/vortex/cloud/kafka/api/impl/ProductToKafkaServiceImpl.class */
public class ProductToKafkaServiceImpl implements IProductToKafkaService {
    public static final Logger log = LoggerFactory.getLogger(ProductToKafkaServiceImpl.class);

    @Autowired
    @Qualifier(ProducerConfig.PRODUCER_BEAN_NAME)
    private IProducer producer;

    @Override // com.vortex.cloud.kafka.api.IProductToKafkaService
    public void sendProductInfoToKafka(PushDataTypeEnum pushDataTypeEnum, CommonBaseDTO commonBaseDTO) throws Exception {
        this.producer.send(KafkaMsg.buildMsg(ProducerConfig.KAFKA_TOPIC, new KafkaPushDTO(pushDataTypeEnum.getKey(), commonBaseDTO)));
    }

    @Override // com.vortex.cloud.kafka.api.IProductToKafkaService
    public void sendDataChangeToKafka(DataChangeNotifyDTO dataChangeNotifyDTO) throws Exception {
        this.producer.send(KafkaMsg.buildMsg(DataChangeNotifyDTO.KAFKA_TOPIC, dataChangeNotifyDTO), (recordMetadata, exc) -> {
            if (exc != null) {
                log.error("数据推送失败！ topic = {}, source = {}, tenantId = {}, dataKey = {}, event = {}, content = {}", new Object[]{DataChangeNotifyDTO.KAFKA_TOPIC, dataChangeNotifyDTO.getSource(), dataChangeNotifyDTO.getTenantId(), dataChangeNotifyDTO.getDataKey(), dataChangeNotifyDTO.getEvent(), JSON.toJSONString(dataChangeNotifyDTO.getContent()), exc});
            } else {
                log.info("数据推送成功！ topic = {}, source = {}, tenantId = {}, dataKey = {}, event = {}, content = {}", new Object[]{DataChangeNotifyDTO.KAFKA_TOPIC, dataChangeNotifyDTO.getSource(), dataChangeNotifyDTO.getTenantId(), dataChangeNotifyDTO.getDataKey(), dataChangeNotifyDTO.getEvent(), JSON.toJSONString(dataChangeNotifyDTO.getContent())});
            }
        });
    }
}
