package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.slf4j.Logger;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-1.0.0.jar:org/apache/kafka/streams/processor/internals/ProcessorStateManager.class */
public class ProcessorStateManager implements StateManager {
    private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
    static final String CHECKPOINT_FILE_NAME = ".checkpoint";
    private final Logger log;
    private final File baseDir;
    private final TaskId taskId;
    private final String logPrefix;
    private final boolean isStandby;
    private final ChangelogReader changelogReader;
    private final Map<String, StateStore> stores;
    private final Map<String, StateStore> globalStores;
    private final Map<TopicPartition, Long> offsetLimits;
    private final Map<TopicPartition, Long> restoredOffsets;
    private final Map<TopicPartition, Long> checkpointedOffsets;
    private final Map<String, StateRestoreCallback> restoreCallbacks;
    private final Map<String, String> storeToChangelogTopic;
    private final List<TopicPartition> changelogPartitions = new ArrayList();
    private final Map<String, TopicPartition> partitionForTopic = new HashMap();
    private OffsetCheckpoint checkpoint;

    public ProcessorStateManager(TaskId taskId, Collection<TopicPartition> collection, boolean z, StateDirectory stateDirectory, Map<String, String> map, ChangelogReader changelogReader, boolean z2, LogContext logContext) throws IOException {
        this.taskId = taskId;
        this.changelogReader = changelogReader;
        this.logPrefix = String.format("task [%s] ", taskId);
        this.log = logContext.logger(getClass());
        for (TopicPartition topicPartition : collection) {
            this.partitionForTopic.put(topicPartition.topic(), topicPartition);
        }
        this.stores = new LinkedHashMap();
        this.globalStores = new HashMap();
        this.offsetLimits = new HashMap();
        this.restoredOffsets = new HashMap();
        this.isStandby = z;
        this.restoreCallbacks = z ? new HashMap() : null;
        this.storeToChangelogTopic = map;
        this.baseDir = stateDirectory.directoryForTask(taskId);
        this.checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
        this.checkpointedOffsets = new HashMap(this.checkpoint.read());
        if (z2) {
            this.checkpoint.delete();
            this.checkpoint = null;
        }
        this.log.debug("Created state store manager for task {} with the acquired state dir lock", taskId);
    }

    public static String storeChangelogTopic(String str, String str2) {
        return str + "-" + str2 + STATE_CHANGELOG_TOPIC_SUFFIX;
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public File baseDir() {
        return this.baseDir;
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void register(StateStore stateStore, StateRestoreCallback stateRestoreCallback) {
        this.log.debug("Registering state store {} to its state manager", stateStore.name());
        if (stateStore.name().equals(CHECKPOINT_FILE_NAME)) {
            throw new IllegalArgumentException(String.format("%sIllegal store name: %s", this.logPrefix, CHECKPOINT_FILE_NAME));
        }
        if (this.stores.containsKey(stateStore.name())) {
            throw new IllegalArgumentException(String.format("%sStore %s has already been registered.", this.logPrefix, stateStore.name()));
        }
        String str = this.storeToChangelogTopic.get(stateStore.name());
        if (str == null) {
            this.stores.put(stateStore.name(), stateStore);
            return;
        }
        TopicPartition topicPartition = new TopicPartition(str, getPartition(str));
        if (!this.isStandby) {
            this.log.trace("Restoring state store {} from changelog topic {}", stateStore.name(), str);
            this.changelogReader.register(new StateRestorer(topicPartition, new CompositeRestoreListener(stateRestoreCallback), this.checkpointedOffsets.get(topicPartition), offsetLimit(topicPartition), stateStore.persistent(), stateStore.name()));
        } else if (stateStore.persistent()) {
            this.log.trace("Preparing standby replica of persistent state store {} with changelog topic {}", stateStore.name(), str);
            this.restoreCallbacks.put(str, stateRestoreCallback);
        }
        this.changelogPartitions.add(topicPartition);
        this.stores.put(stateStore.name(), stateStore);
    }

    @Override // org.apache.kafka.streams.processor.internals.Checkpointable
    public Map<TopicPartition, Long> checkpointed() {
        HashMap hashMap = new HashMap();
        Iterator<Map.Entry<String, StateRestoreCallback>> it = this.restoreCallbacks.entrySet().iterator();
        while (it.hasNext()) {
            String key = it.next().getKey();
            TopicPartition topicPartition = new TopicPartition(key, getPartition(key));
            if (this.checkpointedOffsets.containsKey(topicPartition)) {
                hashMap.put(topicPartition, this.checkpointedOffsets.get(topicPartition));
            } else {
                hashMap.put(topicPartition, -1L);
            }
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<ConsumerRecord<byte[], byte[]>> updateStandbyStates(TopicPartition topicPartition, List<ConsumerRecord<byte[], byte[]>> list) {
        long offsetLimit = offsetLimit(topicPartition);
        ArrayList arrayList = null;
        ArrayList arrayList2 = new ArrayList();
        BatchingStateRestoreCallback batchingRestoreCallback = getBatchingRestoreCallback(this.restoreCallbacks.get(topicPartition.topic()));
        long j = -1;
        int i = 0;
        for (ConsumerRecord<byte[], byte[]> consumerRecord : list) {
            if (consumerRecord.offset() < offsetLimit) {
                arrayList2.add(KeyValue.pair(consumerRecord.key(), consumerRecord.value()));
                j = consumerRecord.offset();
            } else {
                if (arrayList == null) {
                    arrayList = new ArrayList(list.size() - i);
                }
                arrayList.add(consumerRecord);
            }
            i++;
        }
        if (!arrayList2.isEmpty()) {
            try {
                batchingRestoreCallback.restoreAll(arrayList2);
            } catch (Exception e) {
                throw new ProcessorStateException(String.format("%sException caught while trying to restore state from %s", this.logPrefix, topicPartition), e);
            }
        }
        this.restoredOffsets.put(topicPartition, Long.valueOf(j + 1));
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void putOffsetLimit(TopicPartition topicPartition, long j) {
        this.log.trace("Updating store offset limit for partition {} to {}", topicPartition, Long.valueOf(j));
        this.offsetLimits.put(topicPartition, Long.valueOf(j));
    }

    private long offsetLimit(TopicPartition topicPartition) {
        Long l = this.offsetLimits.get(topicPartition);
        if (l != null) {
            return l.longValue();
        }
        return Long.MAX_VALUE;
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public StateStore getStore(String str) {
        return this.stores.get(str);
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void flush() {
        ProcessorStateException processorStateException = null;
        if (!this.stores.isEmpty()) {
            this.log.debug("Flushing all stores registered in the state manager");
            for (StateStore stateStore : this.stores.values()) {
                this.log.trace("Flushing store {}", stateStore.name());
                try {
                    stateStore.flush();
                } catch (Exception e) {
                    if (processorStateException == null) {
                        processorStateException = new ProcessorStateException(String.format("%sFailed to flush state store %s", this.logPrefix, stateStore.name()), e);
                    }
                    this.log.error("Failed to flush state store {}: ", stateStore.name(), e);
                }
            }
        }
        if (processorStateException != null) {
            throw processorStateException;
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public void close(Map<TopicPartition, Long> map) throws ProcessorStateException {
        ProcessorStateException processorStateException = null;
        if (!this.stores.isEmpty()) {
            this.log.debug("Closing its state manager and all the registered state stores");
            for (StateStore stateStore : this.stores.values()) {
                this.log.debug("Closing storage engine {}", stateStore.name());
                try {
                    stateStore.close();
                } catch (Exception e) {
                    if (processorStateException == null) {
                        processorStateException = new ProcessorStateException(String.format("%sFailed to close state store %s", this.logPrefix, stateStore.name()), e);
                    }
                    this.log.error("Failed to close state store {}: ", stateStore.name(), e);
                }
            }
            if (map != null) {
                checkpoint(map);
            }
        }
        if (processorStateException != null) {
            throw processorStateException;
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Checkpointable
    public void checkpoint(Map<TopicPartition, Long> map) {
        this.log.trace("Writing checkpoint: {}", map);
        this.checkpointedOffsets.putAll(this.changelogReader.restoredOffsets());
        for (StateStore stateStore : this.stores.values()) {
            String name = stateStore.name();
            if (stateStore.persistent() && this.storeToChangelogTopic.containsKey(name)) {
                TopicPartition topicPartition = new TopicPartition(this.storeToChangelogTopic.get(name), getPartition(name));
                if (map.containsKey(topicPartition)) {
                    this.checkpointedOffsets.put(topicPartition, Long.valueOf(map.get(topicPartition).longValue() + 1));
                } else if (this.restoredOffsets.containsKey(topicPartition)) {
                    this.checkpointedOffsets.put(topicPartition, this.restoredOffsets.get(topicPartition));
                }
            }
        }
        try {
            if (this.checkpoint == null) {
                this.checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
            }
            this.checkpoint.write(this.checkpointedOffsets);
        } catch (IOException e) {
            this.log.warn("Failed to write checkpoint file to {}:", new File(this.baseDir, CHECKPOINT_FILE_NAME), e);
        }
    }

    private int getPartition(String str) {
        TopicPartition topicPartition = this.partitionForTopic.get(str);
        return topicPartition == null ? this.taskId.partition : topicPartition.partition();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerGlobalStateStores(List<StateStore> list) {
        this.log.debug("Register global stores {}", list);
        for (StateStore stateStore : list) {
            this.globalStores.put(stateStore.name(), stateStore);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.StateManager
    public StateStore getGlobalStore(String str) {
        return this.globalStores.get(str);
    }

    private BatchingStateRestoreCallback getBatchingRestoreCallback(StateRestoreCallback stateRestoreCallback) {
        return stateRestoreCallback instanceof BatchingStateRestoreCallback ? (BatchingStateRestoreCallback) stateRestoreCallback : new WrappedBatchingStateRestoreCallback(stateRestoreCallback);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Collection<TopicPartition> changelogPartitions() {
        return this.changelogPartitions;
    }
}
