package com.vortex.zhsw.xcgl.scheduler.kafka.consumer;

import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Lists;
import com.vortex.util.kafka.consumer.SimpleConsumer;
import com.vortex.util.kafka.consumer.SimpleConsumerConfig;
import com.vortex.util.kafka.msg.IKafkaMsgListener;
import com.vortex.zhsw.xcgl.service.api.patrol.record.PatrolRecordService;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/vortex/zhsw/xcgl/scheduler/kafka/consumer/StaffGpsKafkaConsumer.class */
public class StaffGpsKafkaConsumer implements ApplicationRunner {
    private static final Logger log = LoggerFactory.getLogger(StaffGpsKafkaConsumer.class);

    @Value("${kafka.staff.gps.bootstrapServers}")
    private String kafkaServer;

    @Value("${kafka.staff.gps.consumer.topic}")
    private String topic;

    @Value("${kafka.staff.gps.consumer.groupId}")
    private String groupId;

    @Value("${kafka.staff.gps.consumer.clientId}")
    private String clientId;

    @Autowired
    private PatrolRecordService patrolRecordService;

    public void run(ApplicationArguments applicationArguments) throws Exception {
        log.error("StaffGpsKafkaConsumer.run, {}, {}, {}, {}", new Object[]{this.kafkaServer, this.topic, this.clientId, this.groupId});
        SimpleConsumerConfig simpleConsumerConfig = new SimpleConsumerConfig(this.kafkaServer, this.clientId, this.groupId);
        simpleConsumerConfig.put("max.poll.records", 1);
        SimpleConsumer simpleConsumer = new SimpleConsumer(simpleConsumerConfig);
        simpleConsumer.start();
        simpleConsumer.subscribe(Lists.newArrayList(new String[]{this.topic}), new IKafkaMsgListener() { // from class: com.vortex.zhsw.xcgl.scheduler.kafka.consumer.StaffGpsKafkaConsumer.1
            public boolean onProcessAndConfirm(ConsumerRecords<String, String> consumerRecords) {
                long currentTimeMillis = System.currentTimeMillis();
                StaffGpsKafkaConsumer.log.error("Kafka---监听到数据时间, {}", Long.valueOf(currentTimeMillis));
                if (null == consumerRecords) {
                    return false;
                }
                Iterator it = consumerRecords.partitions().iterator();
                while (it.hasNext()) {
                    Iterator it2 = consumerRecords.records((TopicPartition) it.next()).iterator();
                    while (it2.hasNext()) {
                        String str = (String) ((ConsumerRecord) it2.next()).value();
                        StaffGpsKafkaConsumer.log.error("开始处理消息, {}", str);
                        try {
                            if (!StrUtil.isBlank(str)) {
                                StaffGpsKafkaConsumer.this.patrolRecordService.dealKafkaData(str);
                            }
                        } catch (Exception e) {
                            StaffGpsKafkaConsumer.log.error("Kafka消费失败, 消息: " + str, e);
                            throw e;
                        }
                    }
                }
                long currentTimeMillis2 = System.currentTimeMillis();
                StaffGpsKafkaConsumer.log.error("Kafka---处理完成时间, {} ,耗时：{}", Long.valueOf(currentTimeMillis2), Long.valueOf(currentTimeMillis2 - currentTimeMillis));
                return true;
            }

            public void onFaild(Throwable th) {
                StaffGpsKafkaConsumer.log.error(getClass().getSimpleName() + ".onFaild-----" + th.getMessage(), th);
            }
        });
    }
}
