package com.vortex.platform.dsms.stream;

import com.vortex.platform.dsms.constant.TimeIntervalType;
import com.vortex.platform.dsms.dao.SummaryRepository;
import com.vortex.platform.dsms.dto.FactorSummaryData;
import java.util.HashMap;
import java.util.Properties;
import javax.annotation.PostConstruct;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/vortex/platform/dsms/stream/StreamStoreProcessor.class */
public class StreamStoreProcessor {
    private Logger logger = LoggerFactory.getLogger(StreamStoreProcessor.class);
    private Serde<FactorSummaryData> summaryDataSerde;

    @Autowired
    private SummaryRepository summaryRepository;

    @Value("${dsms.kafka.brokers}")
    private String brokers;

    @Value("${stream.store.id}")
    private String applicationId;

    @PostConstruct
    public void init() {
        Properties initProperties = initProperties();
        initSerde();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        buildSaveStream(streamsBuilder);
        Topology build = streamsBuilder.build();
        System.out.println(build.describe());
        final KafkaStreams kafkaStreams = new KafkaStreams(build, initProperties);
        Runtime.getRuntime().addShutdownHook(new Thread("streams-store-shutdown-hook") { // from class: com.vortex.platform.dsms.stream.StreamStoreProcessor.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                kafkaStreams.close();
            }
        });
        kafkaStreams.start();
    }

    private void buildSaveStream(StreamsBuilder streamsBuilder) {
        streamsBuilder.stream("SUMMARY_MIN_DATA", Consumed.with(Serdes.String(), this.summaryDataSerde)).foreach(new ForeachAction<String, FactorSummaryData>() { // from class: com.vortex.platform.dsms.stream.StreamStoreProcessor.2
            public void apply(String str, FactorSummaryData factorSummaryData) {
                StreamStoreProcessor.this.logger.info("sava min data:{}", factorSummaryData);
                StreamStoreProcessor.this.summaryRepository.save(factorSummaryData, Integer.valueOf(TimeIntervalType.MIN.getValue()));
            }
        });
        streamsBuilder.stream("SUMMARY_MIN10_DATA", Consumed.with(Serdes.String(), this.summaryDataSerde)).foreach(new ForeachAction<String, FactorSummaryData>() { // from class: com.vortex.platform.dsms.stream.StreamStoreProcessor.3
            public void apply(String str, FactorSummaryData factorSummaryData) {
                StreamStoreProcessor.this.logger.info("sava min10 data:{}", factorSummaryData);
                StreamStoreProcessor.this.summaryRepository.save(factorSummaryData, Integer.valueOf(TimeIntervalType.MIN10.getValue()));
            }
        });
        streamsBuilder.stream("SUMMARY_MIN30_DATA", Consumed.with(Serdes.String(), this.summaryDataSerde)).foreach(new ForeachAction<String, FactorSummaryData>() { // from class: com.vortex.platform.dsms.stream.StreamStoreProcessor.4
            public void apply(String str, FactorSummaryData factorSummaryData) {
                StreamStoreProcessor.this.logger.info("sava min30 data:{}", factorSummaryData);
                StreamStoreProcessor.this.summaryRepository.save(factorSummaryData, Integer.valueOf(TimeIntervalType.MIN30.getValue()));
            }
        });
        streamsBuilder.stream("SUMMARY_HOUR_DATA", Consumed.with(Serdes.String(), this.summaryDataSerde)).foreach(new ForeachAction<String, FactorSummaryData>() { // from class: com.vortex.platform.dsms.stream.StreamStoreProcessor.5
            public void apply(String str, FactorSummaryData factorSummaryData) {
                StreamStoreProcessor.this.logger.info("sava hour data:{}", factorSummaryData);
                StreamStoreProcessor.this.summaryRepository.save(factorSummaryData, Integer.valueOf(TimeIntervalType.HOUR.getValue()));
            }
        });
    }

    private Properties initProperties() {
        Properties properties = new Properties();
        properties.put("application.id", this.applicationId);
        properties.put("bootstrap.servers", this.brokers);
        properties.put("default.timestamp.extractor", JsonTimestampExtractor.class);
        properties.put("num.stream.threads", Integer.valueOf(Runtime.getRuntime().availableProcessors()));
        return properties;
    }

    private void initSerde() {
        HashMap hashMap = new HashMap();
        JsonSerializer jsonSerializer = new JsonSerializer();
        JsonDeserializer jsonDeserializer = new JsonDeserializer();
        hashMap.put(JsonDeserializer.CLASS_KEY, FactorSummaryData.class);
        jsonDeserializer.configure(hashMap, false);
        this.summaryDataSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
    }
}
