package com.vortex.platform.dsms.stream;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.vortex.dss.dto.FactorValue;
import com.vortex.dss.dto.FactorsData;
import com.vortex.platform.dsms.constant.TimeIntervalType;
import com.vortex.platform.dsms.dao.SummaryRepository;
import com.vortex.platform.dsms.dto.FactorSummaryData;
import com.vortex.platform.dsms.util.NumberUtil;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/vortex/platform/dsms/stream/StreamProcessor.class */
public class StreamProcessor {
    private Logger logger = LoggerFactory.getLogger(StreamProcessor.class);
    private Serde<FactorSummaryData> summaryDataSerde;
    private Serde<FactorsData> factorsDataSerde;

    @Autowired
    private DataAggregator dataAggregator;

    @Autowired
    private SummaryRepository summaryRepository;

    @Value("${dsms.kafka.brokers}")
    private String brokers;

    @Value("${stream.application.id}")
    private String applicationId;

    @PostConstruct
    public void process() {
        this.logger.info("execute method process()");
        Properties initProperties = initProperties();
        initSerde();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        buildRawDataStream(streamsBuilder);
        KStream<String, FactorSummaryData> stream = streamsBuilder.stream("TEST_SUMMARY_RAW_DATA", Consumed.with(Serdes.String(), this.summaryDataSerde));
        buildMinStream(stream);
        buildMin10Stream(stream);
        buildMin30Stream(stream);
        buildHourStream(stream);
        Topology build = streamsBuilder.build();
        System.out.println(build.describe());
        final KafkaStreams kafkaStreams = new KafkaStreams(build, initProperties);
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { // from class: com.vortex.platform.dsms.stream.StreamProcessor.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                kafkaStreams.close();
            }
        });
        kafkaStreams.start();
    }

    private Properties initProperties() {
        Properties properties = new Properties();
        properties.put("application.id", this.applicationId);
        properties.put("bootstrap.servers", this.brokers);
        properties.put("default.key.serde", Serdes.String().getClass());
        properties.put("default.value.serde", Serdes.String().getClass());
        properties.put("default.timestamp.extractor", JsonTimestampExtractor.class);
        return properties;
    }

    private void initSerde() {
        HashMap hashMap = new HashMap();
        JsonSerializer jsonSerializer = new JsonSerializer();
        JsonDeserializer jsonDeserializer = new JsonDeserializer();
        hashMap.put(JsonDeserializer.CLASS_KEY, FactorsData.class);
        jsonDeserializer.configure(hashMap, false);
        this.factorsDataSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
        JsonSerializer jsonSerializer2 = new JsonSerializer();
        JsonDeserializer jsonDeserializer2 = new JsonDeserializer();
        hashMap.put(JsonDeserializer.CLASS_KEY, FactorSummaryData.class);
        jsonDeserializer2.configure(hashMap, false);
        this.summaryDataSerde = Serdes.serdeFrom(jsonSerializer2, jsonDeserializer2);
    }

    private void buildRawDataStream(StreamsBuilder streamsBuilder) {
        streamsBuilder.stream("DSS_REAL_TIME_DATA", Consumed.with(Serdes.String(), this.factorsDataSerde)).flatMapValues(factorsData -> {
            ArrayList newArrayList = Lists.newArrayList();
            if (factorsData.getFactorValues() == null) {
                return newArrayList;
            }
            for (FactorValue factorValue : factorsData.getFactorValues()) {
                Double doubleConvert = NumberUtil.doubleConvert(factorValue.getValue());
                if (doubleConvert != null) {
                    FactorSummaryData factorSummaryData = new FactorSummaryData();
                    factorSummaryData.setFactorCode(factorValue.getFactorCode());
                    factorSummaryData.setCount(1);
                    factorSummaryData.setDeviceCode(factorsData.getDeviceCode());
                    factorSummaryData.setDeviceType(factorsData.getDeviceType());
                    factorSummaryData.setMax(doubleConvert);
                    factorSummaryData.setMin(doubleConvert);
                    factorSummaryData.setValue(doubleConvert);
                    factorSummaryData.setDatetime(factorValue.getDatetime().longValue());
                    newArrayList.add(factorSummaryData);
                }
            }
            return newArrayList;
        }).map((str, factorSummaryData) -> {
            return new KeyValue(Joiner.on("_").join(factorSummaryData.getDeviceCode(), factorSummaryData.getFactorCode(), new Object[0]), factorSummaryData);
        }).to("TEST_SUMMARY_RAW_DATA", Produced.with(Serdes.String(), this.summaryDataSerde));
    }

    private void buildMinStream(KStream<String, FactorSummaryData> kStream) {
        this.logger.info("min stream summary");
        kStream.groupByKey(Serialized.with(Serdes.String(), this.summaryDataSerde)).windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(1L))).aggregate(new Initializer<FactorSummaryData>() { // from class: com.vortex.platform.dsms.stream.StreamProcessor.3
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public FactorSummaryData m6apply() {
                return null;
            }
        }, new Aggregator<Object, FactorSummaryData, FactorSummaryData>() { // from class: com.vortex.platform.dsms.stream.StreamProcessor.4
            public FactorSummaryData apply(Object obj, FactorSummaryData factorSummaryData, FactorSummaryData factorSummaryData2) {
                return StreamProcessor.this.dataAggregator.aggregate(factorSummaryData, factorSummaryData2);
            }
        }, Materialized.as("aggregated-stream-store-min").withValueSerde(this.summaryDataSerde)).toStream().map(new KeyValueMapper<Windowed<String>, FactorSummaryData, KeyValue<String, FactorSummaryData>>() { // from class: com.vortex.platform.dsms.stream.StreamProcessor.2
            public KeyValue<String, FactorSummaryData> apply(Windowed<String> windowed, FactorSummaryData factorSummaryData) {
                factorSummaryData.setDatetime(windowed.window().start());
                StreamProcessor.this.summaryRepository.save(factorSummaryData, Integer.valueOf(TimeIntervalType.MIN.getValue()));
                return new KeyValue<>(windowed.key(), factorSummaryData);
            }
        }).to("TEST_SUMMARY_MIN_DATA", Produced.with(Serdes.String(), this.summaryDataSerde));
    }

    private void buildMin10Stream(KStream<String, FactorSummaryData> kStream) {
        this.logger.info("10 min stream summary");
        kStream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(10L))).aggregate(new Initializer<FactorSummaryData>() { // from class: com.vortex.platform.dsms.stream.StreamProcessor.6
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public FactorSummaryData m7apply() {
                return null;
            }
        }, new Aggregator<String, FactorSummaryData, FactorSummaryData>() { // from class: com.vortex.platform.dsms.stream.StreamProcessor.7
            public FactorSummaryData apply(String str, FactorSummaryData factorSummaryData, FactorSummaryData factorSummaryData2) {
                return StreamProcessor.this.dataAggregator.aggregate(factorSummaryData, factorSummaryData2);
            }
        }, Materialized.as("aggregated-stream-store-min10").withValueSerde(this.summaryDataSerde)).toStream().map(new KeyValueMapper<Windowed<String>, FactorSummaryData, KeyValue<String, FactorSummaryData>>() { // from class: com.vortex.platform.dsms.stream.StreamProcessor.5
            public KeyValue<String, FactorSummaryData> apply(Windowed<String> windowed, FactorSummaryData factorSummaryData) {
                factorSummaryData.setDatetime(windowed.window().start());
                StreamProcessor.this.summaryRepository.save(factorSummaryData, Integer.valueOf(TimeIntervalType.MIN10.getValue()));
                return new KeyValue<>(windowed.key(), factorSummaryData);
            }
        }).to("TEST_SUMMARY_MIN10_DATA", Produced.with(Serdes.String(), this.summaryDataSerde));
    }

    private void buildMin30Stream(KStream<String, FactorSummaryData> kStream) {
        this.logger.info("30 min stream summary");
        kStream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(30L))).aggregate(new Initializer<FactorSummaryData>() { // from class: com.vortex.platform.dsms.stream.StreamProcessor.9
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public FactorSummaryData m8apply() {
                return null;
            }
        }, new Aggregator<String, FactorSummaryData, FactorSummaryData>() { // from class: com.vortex.platform.dsms.stream.StreamProcessor.10
            public FactorSummaryData apply(String str, FactorSummaryData factorSummaryData, FactorSummaryData factorSummaryData2) {
                return StreamProcessor.this.dataAggregator.aggregate(factorSummaryData, factorSummaryData2);
            }
        }, Materialized.as("aggregated-stream-store-min30").withValueSerde(this.summaryDataSerde)).toStream().map(new KeyValueMapper<Windowed<String>, FactorSummaryData, KeyValue<String, FactorSummaryData>>() { // from class: com.vortex.platform.dsms.stream.StreamProcessor.8
            public KeyValue<String, FactorSummaryData> apply(Windowed<String> windowed, FactorSummaryData factorSummaryData) {
                factorSummaryData.setDatetime(windowed.window().start());
                StreamProcessor.this.summaryRepository.save(factorSummaryData, Integer.valueOf(TimeIntervalType.MIN30.getValue()));
                return new KeyValue<>(windowed.key(), factorSummaryData);
            }
        }).to("TEST_SUMMARY_MIN30_DATA", Produced.with(Serdes.String(), this.summaryDataSerde));
    }

    private void buildHourStream(KStream<String, FactorSummaryData> kStream) {
        this.logger.info("hour stream summary");
        kStream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1L))).aggregate(new Initializer<FactorSummaryData>() { // from class: com.vortex.platform.dsms.stream.StreamProcessor.12
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public FactorSummaryData m4apply() {
                return null;
            }
        }, new Aggregator<String, FactorSummaryData, FactorSummaryData>() { // from class: com.vortex.platform.dsms.stream.StreamProcessor.13
            public FactorSummaryData apply(String str, FactorSummaryData factorSummaryData, FactorSummaryData factorSummaryData2) {
                return StreamProcessor.this.dataAggregator.aggregate(factorSummaryData, factorSummaryData2);
            }
        }, Materialized.as("aggregated-stream-store-hour").withValueSerde(this.summaryDataSerde)).toStream().map(new KeyValueMapper<Windowed<String>, FactorSummaryData, KeyValue<String, FactorSummaryData>>() { // from class: com.vortex.platform.dsms.stream.StreamProcessor.11
            public KeyValue<String, FactorSummaryData> apply(Windowed<String> windowed, FactorSummaryData factorSummaryData) {
                factorSummaryData.setDatetime(windowed.window().start());
                StreamProcessor.this.summaryRepository.save(factorSummaryData, Integer.valueOf(TimeIntervalType.HOUR.getValue()));
                return new KeyValue<>(windowed.key(), factorSummaryData);
            }
        }).to("TEST_SUMMARY_HOUR_DATA", Produced.with(Serdes.String(), this.summaryDataSerde));
    }

    private void buildDayStream(KStream<String, FactorSummaryData> kStream) {
        this.logger.info("day stream summary");
        kStream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.DAYS.toMillis(1L))).aggregate(new Initializer<FactorSummaryData>() { // from class: com.vortex.platform.dsms.stream.StreamProcessor.15
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public FactorSummaryData m5apply() {
                return null;
            }
        }, new Aggregator<String, FactorSummaryData, FactorSummaryData>() { // from class: com.vortex.platform.dsms.stream.StreamProcessor.16
            public FactorSummaryData apply(String str, FactorSummaryData factorSummaryData, FactorSummaryData factorSummaryData2) {
                return StreamProcessor.this.dataAggregator.aggregate(factorSummaryData, factorSummaryData2);
            }
        }, Materialized.as("aggregated-stream-store-day").withValueSerde(this.summaryDataSerde)).toStream().map(new KeyValueMapper<Windowed<String>, FactorSummaryData, KeyValue<String, FactorSummaryData>>() { // from class: com.vortex.platform.dsms.stream.StreamProcessor.14
            public KeyValue<String, FactorSummaryData> apply(Windowed<String> windowed, FactorSummaryData factorSummaryData) {
                factorSummaryData.setDatetime(windowed.window().start());
                StreamProcessor.this.summaryRepository.save(factorSummaryData, Integer.valueOf(TimeIntervalType.DAY.getValue()));
                return new KeyValue<>(windowed.key(), factorSummaryData);
            }
        }).to("TEST_SUMMARY_DAY_DATA", Produced.with(Serdes.String(), this.summaryDataSerde));
    }
}
