package com.vortex.util.kafka.consumer;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.vortex.util.kafka.AbsService;
import com.vortex.util.kafka.IConsumer;
import com.vortex.util.kafka.Util;
import com.vortex.util.kafka.consumer.loop.PullLoop;
import com.vortex.util.kafka.msg.IKafkaMsgListener;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

/* loaded from: input_file:BOOT-INF/lib/util-kafka-2.1.0-SNAPSHOT.jar:com/vortex/util/kafka/consumer/SimpleConsumer.class */
public class SimpleConsumer extends AbsService implements IConsumer {
    protected IConsumerConfig config;
    protected KafkaConsumer<String, String> consumer;
    private PullLoop loop;

    public SimpleConsumer(IConsumerConfig iConsumerConfig) {
        this.config = iConsumerConfig;
        Util.checkBrokerListConfig(iConsumerConfig.getBootstrapServers());
        Preconditions.checkNotNull(iConsumerConfig.getGroupId(), "consumer's group.id can not be null");
    }

    public IConsumerConfig getConfig() {
        return this.config;
    }

    public KafkaConsumer<String, String> getConsumer() {
        return this.consumer;
    }

    @Override // com.vortex.util.kafka.AbsService
    protected void onStart() {
        Properties properties = new Properties();
        properties.putAll(this.config);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put("bootstrap.servers", this.config.getBootstrapServers());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.config.getGroupId());
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        if (!Strings.isNullOrEmpty(this.config.getClientId())) {
            properties.put("client.id", this.config.getClientId());
        }
        this.consumer = new KafkaConsumer<>(properties);
    }

    @Override // com.vortex.util.kafka.AbsService
    protected void onStop() {
        if (this.loop != null) {
            this.loop.stop();
            this.loop = null;
        }
    }

    @Override // com.vortex.util.kafka.IConsumer
    public void subscribe(Collection<String> collection, IKafkaMsgListener iKafkaMsgListener) {
        Preconditions.checkNotNull(iKafkaMsgListener, "listener is null");
        try {
            checkBeforeSubscribe();
            Preconditions.checkNotNull(collection, "topics is null");
            Preconditions.checkState(collection.size() > 0, "topics is empty");
            this.consumer.subscribe(collection);
            this.loop = new PullLoop(this.consumer, iKafkaMsgListener);
            this.loop.start();
        } catch (Exception e) {
            iKafkaMsgListener.onFaild(e);
        }
    }

    @Override // com.vortex.util.kafka.IConsumer
    public void subscribe(Pattern pattern, IKafkaMsgListener iKafkaMsgListener) {
        Preconditions.checkNotNull(iKafkaMsgListener, "listener is null");
        try {
            checkBeforeSubscribe();
            Preconditions.checkNotNull(pattern, "pattern is null");
            this.consumer.subscribe(pattern, new ConsumerRebalanceListener() { // from class: com.vortex.util.kafka.consumer.SimpleConsumer.1
                @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                }

                @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                }
            });
            this.loop = new PullLoop(this.consumer, iKafkaMsgListener);
            this.loop.start();
        } catch (Exception e) {
            iKafkaMsgListener.onFaild(e);
        }
    }

    @Override // com.vortex.util.kafka.IConsumer
    public void subscribe(String str, Integer num, Long l, IKafkaMsgListener iKafkaMsgListener) {
        Preconditions.checkNotNull(iKafkaMsgListener, "listener is null");
        try {
            checkBeforeSubscribe();
            Preconditions.checkNotNull(str, "topics is null");
            if (num == null) {
                num = 0;
            }
            Preconditions.checkState(checkPartitionValid(str, num), String.format("invalid partition. topic:%s  partition:%s", str, num));
            TopicPartition topicPartition = new TopicPartition(str, num.intValue());
            List singletonList = Collections.singletonList(topicPartition);
            this.consumer.assign(singletonList);
            if (l == null) {
                this.consumer.seekToEnd(new ArrayList());
            } else if (l.longValue() == 0) {
                this.consumer.seekToBeginning(new ArrayList());
            } else {
                Long l2 = this.consumer.endOffsets(singletonList).get(topicPartition);
                Preconditions.checkState(l2 != null && l2.longValue() > l.longValue(), String.format("invalid offset,topic:%s  partition:%s offset:%s", str, num, l));
                this.consumer.seek(topicPartition, l.longValue());
            }
            this.loop = new PullLoop(this.consumer, iKafkaMsgListener);
            this.loop.start();
        } catch (Exception e) {
            iKafkaMsgListener.onFaild(e);
        }
    }

    void checkBeforeSubscribe() {
        Preconditions.checkNotNull(this.consumer, "consumer is null, can not subscribe");
        this.consumer.unsubscribe();
    }

    boolean checkPartitionValid(String str, Integer num) {
        Iterator<PartitionInfo> it = this.consumer.partitionsFor(str).iterator();
        while (it.hasNext()) {
            if (it.next().partition() == num.intValue()) {
                return true;
            }
        }
        return false;
    }
}
