package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-1.0.1.jar:org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.class */
public class StreamPartitionAssignor implements PartitionAssignor, Configurable, ThreadMetadataProvider {
    private Time time = Time.SYSTEM;
    private static final int UNKNOWN = -1;
    public static final int NOT_AVAILABLE = -2;
    private Logger log;
    private String logPrefix;
    private static final Comparator<TopicPartition> PARTITION_COMPARATOR = new Comparator<TopicPartition>() { // from class: org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.1
        @Override // java.util.Comparator
        public int compare(TopicPartition topicPartition, TopicPartition topicPartition2) {
            int compareTo = topicPartition.topic().compareTo(topicPartition2.topic());
            if (compareTo != 0) {
                return compareTo;
            }
            if (topicPartition.partition() < topicPartition2.partition()) {
                return -1;
            }
            return topicPartition.partition() > topicPartition2.partition() ? 1 : 0;
        }
    };
    private ThreadDataProvider threadDataProvider;
    private String userEndPoint;
    private int numStandbyReplicas;
    private Cluster metadataWithInternalTopics;
    private Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
    private Map<TaskId, Set<TopicPartition>> standbyTasks;
    private Map<TaskId, Set<TopicPartition>> activeTasks;
    private InternalTopicManager internalTopicManager;
    private CopartitionedTopicsValidator copartitionedTopicsValidator;

    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-1.0.1.jar:org/apache/kafka/streams/processor/internals/StreamPartitionAssignor$AssignedPartition.class */
    private static class AssignedPartition implements Comparable<AssignedPartition> {
        public final TaskId taskId;
        public final TopicPartition partition;

        AssignedPartition(TaskId taskId, TopicPartition topicPartition) {
            this.taskId = taskId;
            this.partition = topicPartition;
        }

        @Override // java.lang.Comparable
        public int compareTo(AssignedPartition assignedPartition) {
            return StreamPartitionAssignor.PARTITION_COMPARATOR.compare(this.partition, assignedPartition.partition);
        }

        public boolean equals(Object obj) {
            return (obj instanceof AssignedPartition) && compareTo((AssignedPartition) obj) == 0;
        }

        public int hashCode() {
            return this.partition.hashCode();
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-1.0.1.jar:org/apache/kafka/streams/processor/internals/StreamPartitionAssignor$ClientMetadata.class */
    private static class ClientMetadata {
        final HostInfo hostInfo;
        final Set<String> consumers;
        final ClientState state;

        ClientMetadata(String str) {
            if (str != null) {
                String host = Utils.getHost(str);
                Integer port = Utils.getPort(str);
                if (host == null || port == null) {
                    throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", str));
                }
                this.hostInfo = new HostInfo(host, port.intValue());
            } else {
                this.hostInfo = null;
            }
            this.consumers = new HashSet();
            this.state = new ClientState();
        }

        void addConsumer(String str, SubscriptionInfo subscriptionInfo) {
            this.consumers.add(str);
            this.state.addPreviousActiveTasks(subscriptionInfo.prevTasks);
            this.state.addPreviousStandbyTasks(subscriptionInfo.standbyTasks);
            this.state.incrementCapacity();
        }

        public String toString() {
            return "ClientMetadata{hostInfo=" + this.hostInfo + ", consumers=" + this.consumers + ", state=" + this.state + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-1.0.1.jar:org/apache/kafka/streams/processor/internals/StreamPartitionAssignor$CopartitionedTopicsValidator.class */
    public static class CopartitionedTopicsValidator {
        private final String logPrefix;

        CopartitionedTopicsValidator(String str) {
            this.logPrefix = String.format("stream-thread [%s]", str);
        }

        /* JADX WARN: Code restructure failed: missing block: B:12:0x00d1, code lost:
        
            if (r13 != (-1)) goto L34;
         */
        /* JADX WARN: Code restructure failed: missing block: B:13:0x00d4, code lost:
        
            r0 = r11.entrySet().iterator();
         */
        /* JADX WARN: Code restructure failed: missing block: B:15:0x00e8, code lost:
        
            if (r0.hasNext() == false) goto L55;
         */
        /* JADX WARN: Code restructure failed: missing block: B:16:0x00eb, code lost:
        
            r0 = r0.next();
         */
        /* JADX WARN: Code restructure failed: missing block: B:17:0x0104, code lost:
        
            if (r10.contains(r0.getKey()) == false) goto L58;
         */
        /* JADX WARN: Code restructure failed: missing block: B:19:0x0107, code lost:
        
            r0 = r0.getValue().numPartitions;
         */
        /* JADX WARN: Code restructure failed: missing block: B:20:0x011a, code lost:
        
            if (r0 <= r13) goto L59;
         */
        /* JADX WARN: Code restructure failed: missing block: B:22:0x011d, code lost:
        
            r13 = r0;
         */
        /* JADX WARN: Code restructure failed: missing block: B:28:0x0124, code lost:
        
            r0 = r11.entrySet().iterator();
         */
        /* JADX WARN: Code restructure failed: missing block: B:30:0x0138, code lost:
        
            if (r0.hasNext() == false) goto L62;
         */
        /* JADX WARN: Code restructure failed: missing block: B:31:0x013b, code lost:
        
            r0 = r0.next();
         */
        /* JADX WARN: Code restructure failed: missing block: B:32:0x0154, code lost:
        
            if (r10.contains(r0.getKey()) == false) goto L64;
         */
        /* JADX WARN: Code restructure failed: missing block: B:34:0x0157, code lost:
        
            r0.getValue().numPartitions = r13;
         */
        /* JADX WARN: Code restructure failed: missing block: B:39:0x0169, code lost:
        
            return;
         */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        void validate(java.util.Set<java.lang.String> r10, java.util.Map<java.lang.String, org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.InternalTopicMetadata> r11, org.apache.kafka.common.Cluster r12) {
            /*
                Method dump skipped, instructions count: 362
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.CopartitionedTopicsValidator.validate(java.util.Set, java.util.Map, org.apache.kafka.common.Cluster):void");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-1.0.1.jar:org/apache/kafka/streams/processor/internals/StreamPartitionAssignor$InternalTopicMetadata.class */
    public static class InternalTopicMetadata {
        public final InternalTopicConfig config;
        public int numPartitions = -1;

        InternalTopicMetadata(InternalTopicConfig internalTopicConfig) {
            this.config = internalTopicConfig;
        }

        public String toString() {
            return "InternalTopicMetadata(config=" + this.config + ", numPartitions=" + this.numPartitions + ")";
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-1.0.1.jar:org/apache/kafka/streams/processor/internals/StreamPartitionAssignor$SubscriptionUpdates.class */
    public static class SubscriptionUpdates {
        private final Set<String> updatedTopicSubscriptions = new HashSet();

        /* JADX INFO: Access modifiers changed from: private */
        public void updateTopics(Collection<String> collection) {
            this.updatedTopicSubscriptions.clear();
            this.updatedTopicSubscriptions.addAll(collection);
        }

        public Collection<String> getUpdates() {
            return Collections.unmodifiableSet(new HashSet(this.updatedTopicSubscriptions));
        }

        public boolean hasUpdates() {
            return !this.updatedTopicSubscriptions.isEmpty();
        }

        public String toString() {
            return "SubscriptionUpdates{updatedTopicSubscriptions=" + this.updatedTopicSubscriptions + '}';
        }
    }

    void time(Time time) {
        this.time = time;
    }

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        this.numStandbyReplicas = ((Integer) map.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG)).intValue();
        this.logPrefix = String.format("stream-thread [%s] ", map.get("client.id"));
        this.log = new LogContext(this.logPrefix).logger(getClass());
        Object obj = map.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
        if (obj == null) {
            KafkaException kafkaException = new KafkaException("StreamThread is not specified");
            this.log.error(kafkaException.getMessage(), (Throwable) kafkaException);
            throw kafkaException;
        }
        if (!(obj instanceof ThreadDataProvider)) {
            KafkaException kafkaException2 = new KafkaException(String.format("%s is not an instance of %s", obj.getClass().getName(), ThreadDataProvider.class.getName()));
            this.log.error(kafkaException2.getMessage(), (Throwable) kafkaException2);
            throw kafkaException2;
        }
        this.threadDataProvider = (ThreadDataProvider) obj;
        this.threadDataProvider.setThreadMetadataProvider(this);
        String str = (String) map.get(StreamsConfig.APPLICATION_SERVER_CONFIG);
        if (str != null && !str.isEmpty()) {
            try {
                String host = Utils.getHost(str);
                Integer port = Utils.getPort(str);
                if (host == null || port == null) {
                    throw new ConfigException(String.format("%s Config %s isn't in the correct format. Expected a host:port pair but received %s", this.logPrefix, StreamsConfig.APPLICATION_SERVER_CONFIG, str));
                }
                this.userEndPoint = str;
            } catch (NumberFormatException e) {
                throw new ConfigException(String.format("%s Invalid port supplied in %s for config %s", this.logPrefix, str, StreamsConfig.APPLICATION_SERVER_CONFIG));
            }
        }
        this.internalTopicManager = new InternalTopicManager(StreamsKafkaClient.create(this.threadDataProvider.config()), map.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? ((Integer) map.get(StreamsConfig.REPLICATION_FACTOR_CONFIG)).intValue() : 1, (map.containsKey(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG) ? (Long) map.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG) : InternalTopicManager.WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT).longValue(), this.time);
        this.copartitionedTopicsValidator = new CopartitionedTopicsValidator(this.threadDataProvider.name());
    }

    @Override // org.apache.kafka.clients.consumer.internals.PartitionAssignor
    public String name() {
        return "stream";
    }

    @Override // org.apache.kafka.clients.consumer.internals.PartitionAssignor
    public PartitionAssignor.Subscription subscription(Set<String> set) {
        Set<TaskId> prevActiveTasks = this.threadDataProvider.prevActiveTasks();
        Set<TaskId> cachedTasks = this.threadDataProvider.cachedTasks();
        cachedTasks.removeAll(prevActiveTasks);
        SubscriptionInfo subscriptionInfo = new SubscriptionInfo(this.threadDataProvider.processId(), prevActiveTasks, cachedTasks, this.userEndPoint);
        if (this.threadDataProvider.builder().sourceTopicPattern() != null && !this.threadDataProvider.builder().subscriptionUpdates().getUpdates().equals(set)) {
            updateSubscribedTopics(set);
        }
        return new PartitionAssignor.Subscription(new ArrayList(set), subscriptionInfo.encode());
    }

    private void updateSubscribedTopics(Set<String> set) {
        SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
        this.log.debug("found {} topics possibly matching regex", set);
        subscriptionUpdates.updateTopics(set);
        this.threadDataProvider.builder().updateSubscriptions(subscriptionUpdates, this.threadDataProvider.name());
    }

    @Override // org.apache.kafka.clients.consumer.internals.PartitionAssignor
    public Map<String, PartitionAssignor.Assignment> assign(Cluster cluster, Map<String, PartitionAssignor.Subscription> map) {
        boolean z;
        Integer partitionCountForTopic;
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, PartitionAssignor.Subscription> entry : map.entrySet()) {
            String key = entry.getKey();
            SubscriptionInfo decode = SubscriptionInfo.decode(entry.getValue().userData());
            ClientMetadata clientMetadata = (ClientMetadata) hashMap.get(decode.processId);
            if (clientMetadata == null) {
                clientMetadata = new ClientMetadata(decode.userEndPoint);
                hashMap.put(decode.processId, clientMetadata);
            }
            clientMetadata.addConsumer(key, decode);
        }
        this.log.debug("Constructed client metadata {} from the member subscriptions.", hashMap);
        Map<Integer, InternalTopologyBuilder.TopicsInfo> map2 = this.threadDataProvider.builder().topicGroups();
        HashMap hashMap2 = new HashMap();
        Iterator<InternalTopologyBuilder.TopicsInfo> it = map2.values().iterator();
        while (it.hasNext()) {
            for (InternalTopicConfig internalTopicConfig : it.next().repartitionSourceTopics.values()) {
                hashMap2.put(internalTopicConfig.name(), new InternalTopicMetadata(internalTopicConfig));
            }
        }
        do {
            z = false;
            Iterator<InternalTopologyBuilder.TopicsInfo> it2 = map2.values().iterator();
            while (it2.hasNext()) {
                for (String str : it2.next().repartitionSourceTopics.keySet()) {
                    int i = hashMap2.get(str).numPartitions;
                    if (i == -1) {
                        for (InternalTopologyBuilder.TopicsInfo topicsInfo : map2.values()) {
                            if (topicsInfo.sinkTopics.contains(str)) {
                                for (String str2 : topicsInfo.sourceTopics) {
                                    if (hashMap2.containsKey(str2)) {
                                        partitionCountForTopic = Integer.valueOf(hashMap2.get(str2).numPartitions);
                                    } else {
                                        partitionCountForTopic = cluster.partitionCountForTopic(str2);
                                        if (partitionCountForTopic == null) {
                                            hashMap2.get(str).numPartitions = -2;
                                        }
                                    }
                                    if (partitionCountForTopic != null && partitionCountForTopic.intValue() > i) {
                                        i = partitionCountForTopic.intValue();
                                    }
                                }
                            }
                        }
                        if (i == -1) {
                            z = true;
                        } else {
                            hashMap2.get(str).numPartitions = i;
                        }
                    }
                }
            }
        } while (z);
        HashMap hashMap3 = new HashMap();
        for (Map.Entry<String, InternalTopicMetadata> entry2 : hashMap2.entrySet()) {
            String key2 = entry2.getKey();
            Integer valueOf = Integer.valueOf(entry2.getValue().numPartitions);
            for (int i2 = 0; i2 < valueOf.intValue(); i2++) {
                hashMap3.put(new TopicPartition(key2, i2), new PartitionInfo(key2, i2, null, new Node[0], new Node[0]));
            }
        }
        ensureCopartitioning(this.threadDataProvider.builder().copartitionGroups(), hashMap2, cluster);
        prepareTopic(hashMap2);
        this.metadataWithInternalTopics = cluster.withPartitions(hashMap3);
        this.log.debug("Created repartition topics {} from the parsed topology.", hashMap3.values());
        HashSet<String> hashSet = new HashSet();
        HashMap hashMap4 = new HashMap();
        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry3 : map2.entrySet()) {
            hashSet.addAll(entry3.getValue().sourceTopics);
            hashMap4.put(entry3.getKey(), entry3.getValue().sourceTopics);
        }
        Map<TaskId, Set<TopicPartition>> partitionGroups = this.threadDataProvider.partitionGrouper().partitionGroups(hashMap4, this.metadataWithInternalTopics);
        HashSet hashSet2 = new HashSet();
        HashMap hashMap5 = new HashMap();
        for (Map.Entry<TaskId, Set<TopicPartition>> entry4 : partitionGroups.entrySet()) {
            Set<TopicPartition> value = entry4.getValue();
            for (TopicPartition topicPartition : value) {
                if (hashSet2.contains(topicPartition)) {
                    this.log.warn("Partition {} is assigned to more than one tasks: {}", topicPartition, partitionGroups);
                }
            }
            hashSet2.addAll(value);
            TaskId key3 = entry4.getKey();
            Set set = (Set) hashMap5.get(Integer.valueOf(key3.topicGroupId));
            if (set == null) {
                set = new HashSet();
                hashMap5.put(Integer.valueOf(key3.topicGroupId), set);
            }
            set.add(key3);
        }
        for (String str3 : hashSet) {
            List<PartitionInfo> partitionsForTopic = this.metadataWithInternalTopics.partitionsForTopic(str3);
            if (partitionsForTopic.isEmpty()) {
                this.log.warn("No partitions found for topic {}", str3);
            } else {
                for (PartitionInfo partitionInfo : partitionsForTopic) {
                    TopicPartition topicPartition2 = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                    if (!hashSet2.contains(topicPartition2)) {
                        this.log.warn("Partition {} is not assigned to any tasks: {}", topicPartition2, partitionGroups);
                    }
                }
            }
        }
        HashMap hashMap6 = new HashMap();
        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry5 : map2.entrySet()) {
            int intValue = entry5.getKey().intValue();
            for (InternalTopicConfig internalTopicConfig2 : entry5.getValue().stateChangelogTopics.values()) {
                int i3 = -1;
                if (hashMap5.get(Integer.valueOf(intValue)) != null) {
                    for (TaskId taskId : (Set) hashMap5.get(Integer.valueOf(intValue))) {
                        if (i3 < taskId.partition + 1) {
                            i3 = taskId.partition + 1;
                        }
                    }
                    InternalTopicMetadata internalTopicMetadata = new InternalTopicMetadata(internalTopicConfig2);
                    internalTopicMetadata.numPartitions = i3;
                    hashMap6.put(internalTopicConfig2.name(), internalTopicMetadata);
                } else {
                    this.log.debug("No tasks found for topic group {}", Integer.valueOf(intValue));
                }
            }
        }
        prepareTopic(hashMap6);
        this.log.debug("Created state changelog topics {} from the parsed topology.", hashMap6.values());
        HashMap hashMap7 = new HashMap();
        for (Map.Entry entry6 : hashMap.entrySet()) {
            hashMap7.put(entry6.getKey(), ((ClientMetadata) entry6.getValue()).state);
        }
        this.log.debug("Assigning tasks {} to clients {} with number of replicas {}", partitionGroups.keySet(), hashMap7, Integer.valueOf(this.numStandbyReplicas));
        new StickyTaskAssignor(hashMap7, partitionGroups.keySet()).assign(this.numStandbyReplicas);
        this.log.info("Assigned tasks to clients as {}.", hashMap7);
        this.partitionsByHostState = new HashMap();
        for (Map.Entry entry7 : hashMap.entrySet()) {
            HostInfo hostInfo = ((ClientMetadata) entry7.getValue()).hostInfo;
            if (hostInfo != null) {
                HashSet hashSet3 = new HashSet();
                Iterator<TaskId> it3 = ((ClientMetadata) entry7.getValue()).state.activeTasks().iterator();
                while (it3.hasNext()) {
                    hashSet3.addAll(partitionGroups.get(it3.next()));
                }
                this.partitionsByHostState.put(hostInfo, hashSet3);
            }
        }
        HashMap hashMap8 = new HashMap();
        for (Map.Entry entry8 : hashMap.entrySet()) {
            Set<String> set2 = ((ClientMetadata) entry8.getValue()).consumers;
            ClientState clientState = ((ClientMetadata) entry8.getValue()).state;
            List<List<TaskId>> interleaveTasksByGroupId = interleaveTasksByGroupId(clientState.activeTasks(), set2.size());
            List<List<TaskId>> interleaveTasksByGroupId2 = interleaveTasksByGroupId(clientState.standbyTasks(), set2.size());
            int i4 = 0;
            for (String str4 : set2) {
                HashMap hashMap9 = new HashMap();
                ArrayList arrayList = new ArrayList();
                for (TaskId taskId2 : interleaveTasksByGroupId.get(i4)) {
                    Iterator<TopicPartition> it4 = partitionGroups.get(taskId2).iterator();
                    while (it4.hasNext()) {
                        arrayList.add(new AssignedPartition(taskId2, it4.next()));
                    }
                }
                if (!clientState.standbyTasks().isEmpty()) {
                    for (TaskId taskId3 : interleaveTasksByGroupId2.get(i4)) {
                        Set set3 = (Set) hashMap9.get(taskId3);
                        if (set3 == null) {
                            set3 = new HashSet();
                            hashMap9.put(taskId3, set3);
                        }
                        set3.addAll(partitionGroups.get(taskId3));
                    }
                }
                i4++;
                Collections.sort(arrayList);
                ArrayList arrayList2 = new ArrayList();
                ArrayList arrayList3 = new ArrayList();
                Iterator it5 = arrayList.iterator();
                while (it5.hasNext()) {
                    AssignedPartition assignedPartition = (AssignedPartition) it5.next();
                    arrayList2.add(assignedPartition.taskId);
                    arrayList3.add(assignedPartition.partition);
                }
                hashMap8.put(str4, new PartitionAssignor.Assignment(arrayList3, new AssignmentInfo(arrayList2, hashMap9, this.partitionsByHostState).encode()));
            }
        }
        return hashMap8;
    }

    List<List<TaskId>> interleaveTasksByGroupId(Collection<TaskId> collection, int i) {
        LinkedList linkedList = new LinkedList(collection);
        Collections.sort(linkedList);
        ArrayList<List> arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(new ArrayList());
        }
        while (!linkedList.isEmpty()) {
            for (List list : arrayList) {
                TaskId taskId = (TaskId) linkedList.poll();
                if (taskId == null) {
                    break;
                }
                list.add(taskId);
            }
        }
        return arrayList;
    }

    @Override // org.apache.kafka.clients.consumer.internals.PartitionAssignor
    public void onAssignment(PartitionAssignor.Assignment assignment) {
        ArrayList arrayList = new ArrayList(assignment.partitions());
        Collections.sort(arrayList, PARTITION_COMPARATOR);
        AssignmentInfo decode = AssignmentInfo.decode(assignment.userData());
        this.standbyTasks = decode.standbyTasks;
        this.activeTasks = new HashMap();
        if (arrayList.size() != decode.activeTasks.size()) {
            throw new TaskAssignmentException(String.format("%sNumber of assigned partitions %d is not equal to the number of active taskIds %d, assignmentInfo=%s", this.logPrefix, Integer.valueOf(arrayList.size()), Integer.valueOf(decode.activeTasks.size()), decode.toString()));
        }
        for (int i = 0; i < arrayList.size(); i++) {
            TopicPartition topicPartition = (TopicPartition) arrayList.get(i);
            TaskId taskId = decode.activeTasks.get(i);
            Set<TopicPartition> set = this.activeTasks.get(taskId);
            if (set == null) {
                set = new HashSet();
                this.activeTasks.put(taskId, set);
            }
            set.add(topicPartition);
        }
        this.partitionsByHostState = decode.partitionsByHost;
        Collection<Set<TopicPartition>> values = this.partitionsByHostState.values();
        HashMap hashMap = new HashMap();
        Iterator<Set<TopicPartition>> it = values.iterator();
        while (it.hasNext()) {
            for (TopicPartition topicPartition2 : it.next()) {
                hashMap.put(topicPartition2, new PartitionInfo(topicPartition2.topic(), topicPartition2.partition(), null, new Node[0], new Node[0]));
            }
        }
        this.metadataWithInternalTopics = Cluster.empty().withPartitions(hashMap);
        checkForNewTopicAssignments(assignment);
    }

    private void checkForNewTopicAssignments(PartitionAssignor.Assignment assignment) {
        if (this.threadDataProvider.builder().sourceTopicPattern() != null) {
            HashSet hashSet = new HashSet();
            Iterator<TopicPartition> it = assignment.partitions().iterator();
            while (it.hasNext()) {
                hashSet.add(it.next().topic());
            }
            if (this.threadDataProvider.builder().subscriptionUpdates().getUpdates().containsAll(hashSet)) {
                return;
            }
            hashSet.addAll(this.threadDataProvider.builder().subscriptionUpdates().getUpdates());
            updateSubscribedTopics(hashSet);
        }
    }

    private void prepareTopic(Map<String, InternalTopicMetadata> map) {
        this.log.debug("Starting to validate internal topics in partition assignor.");
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        for (InternalTopicMetadata internalTopicMetadata : map.values()) {
            InternalTopicConfig internalTopicConfig = internalTopicMetadata.config;
            Integer valueOf = Integer.valueOf(internalTopicMetadata.numPartitions);
            if (valueOf.intValue() != -2) {
                if (valueOf.intValue() < 0) {
                    throw new TopologyBuilderException(String.format("%sTopic [%s] number of partitions not defined", this.logPrefix, internalTopicConfig.name()));
                }
                hashMap.put(internalTopicConfig, valueOf);
                hashSet.add(internalTopicConfig.name());
            }
        }
        if (!hashMap.isEmpty()) {
            this.internalTopicManager.makeReady(hashMap);
            while (!allTopicsCreated(hashSet, hashMap)) {
                try {
                    Thread.sleep(50L);
                } catch (InterruptedException e) {
                }
            }
        }
        this.log.debug("Completed validating internal topics in partition assignor.");
    }

    private boolean allTopicsCreated(Set<String> set, Map<InternalTopicConfig, Integer> map) {
        Map<String, Integer> numPartitions = this.internalTopicManager.getNumPartitions(set);
        for (Map.Entry<InternalTopicConfig, Integer> entry : map.entrySet()) {
            Integer num = numPartitions.get(entry.getKey().name());
            if (num == null || !num.equals(entry.getValue())) {
                return false;
            }
        }
        return true;
    }

    private void ensureCopartitioning(Collection<Set<String>> collection, Map<String, InternalTopicMetadata> map, Cluster cluster) {
        Iterator<Set<String>> it = collection.iterator();
        while (it.hasNext()) {
            this.copartitionedTopicsValidator.validate(it.next(), map, cluster);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.ThreadMetadataProvider
    public Map<HostInfo, Set<TopicPartition>> getPartitionsByHostState() {
        return this.partitionsByHostState == null ? Collections.emptyMap() : Collections.unmodifiableMap(this.partitionsByHostState);
    }

    @Override // org.apache.kafka.streams.processor.internals.ThreadMetadataProvider
    public Cluster clusterMetadata() {
        return this.metadataWithInternalTopics == null ? Cluster.empty() : this.metadataWithInternalTopics;
    }

    @Override // org.apache.kafka.streams.processor.internals.ThreadMetadataProvider
    public Map<TaskId, Set<TopicPartition>> activeTasks() {
        return this.activeTasks == null ? Collections.emptyMap() : Collections.unmodifiableMap(this.activeTasks);
    }

    @Override // org.apache.kafka.streams.processor.internals.ThreadMetadataProvider
    public Map<TaskId, Set<TopicPartition>> standbyTasks() {
        return this.standbyTasks == null ? Collections.emptyMap() : Collections.unmodifiableMap(this.standbyTasks);
    }

    void setInternalTopicManager(InternalTopicManager internalTopicManager) {
        this.internalTopicManager = internalTopicManager;
    }

    @Override // org.apache.kafka.streams.processor.internals.ThreadMetadataProvider
    public void close() {
        this.internalTopicManager.close();
    }
}
