package com.vortex.platform.dsms.stream;

import com.vortex.platform.dsms.dto.FactorSummaryData;
import java.lang.Thread;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

/* loaded from: input_file:com/vortex/platform/dsms/stream/AbstractStreamProcessor.class */
public abstract class AbstractStreamProcessor {
    private Logger logger = LoggerFactory.getLogger(AbstractStreamProcessor.class);
    protected Serde<FactorSummaryData> summaryDataSerde;
    protected Properties props;

    @Autowired
    private DataAggregator dataAggregator;

    @Value("${dsms.kafka.brokers}")
    private String brokers;

    @Value("${stream.application.id}")
    private String applicationId;

    @PostConstruct
    public void process() {
        this.logger.info("execute method process()");
        init();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        addOtherStreamBeforeSummaryStream(streamsBuilder);
        buildSummaryStream(streamsBuilder);
        addOtherStreamAfterSummaryStream(streamsBuilder);
        Topology build = streamsBuilder.build();
        System.out.println(build.describe());
        final KafkaStreams kafkaStreams = new KafkaStreams(build, this.props);
        kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: com.vortex.platform.dsms.stream.AbstractStreamProcessor.1
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                AbstractStreamProcessor.this.logger.error("线程" + thread.getName() + "发生异常", th);
            }
        });
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { // from class: com.vortex.platform.dsms.stream.AbstractStreamProcessor.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                kafkaStreams.close();
            }
        });
        kafkaStreams.start();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void init() {
        initSummarySerde();
        initProperties();
    }

    protected void addOtherStreamBeforeSummaryStream(StreamsBuilder streamsBuilder) {
    }

    protected void addOtherStreamAfterSummaryStream(StreamsBuilder streamsBuilder) {
    }

    protected void buildSummaryStream(StreamsBuilder streamsBuilder) {
        KStream<String, FactorSummaryData> stream = streamsBuilder.stream("SUMMARY_RAW_DATA", Consumed.with(Serdes.String(), this.summaryDataSerde));
        buildMinStream(stream);
        buildMin10Stream(stream);
        buildMin30Stream(stream);
        buildHourStream(stream);
    }

    protected Properties initProperties() {
        this.props = new Properties();
        this.props.put("application.id", this.applicationId);
        this.props.put("bootstrap.servers", this.brokers);
        this.props.put("default.key.serde", Serdes.String().getClass());
        this.props.put("default.value.serde", Serdes.String().getClass());
        this.props.put("default.timestamp.extractor", JsonTimestampExtractor.class);
        this.props.put("num.stream.threads", Integer.valueOf(Runtime.getRuntime().availableProcessors() * 2));
        return this.props;
    }

    protected void initSummarySerde() {
        this.summaryDataSerde = SerdFactory.create(FactorSummaryData.class);
    }

    protected void buildMinStream(KStream<String, FactorSummaryData> kStream) {
        this.logger.info("min stream summary");
        kStream.groupByKey(Serialized.with(Serdes.String(), this.summaryDataSerde)).windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(1L))).aggregate(new Initializer<FactorSummaryData>() { // from class: com.vortex.platform.dsms.stream.AbstractStreamProcessor.4
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public FactorSummaryData m7apply() {
                return null;
            }
        }, new Aggregator<Object, FactorSummaryData, FactorSummaryData>() { // from class: com.vortex.platform.dsms.stream.AbstractStreamProcessor.5
            public FactorSummaryData apply(Object obj, FactorSummaryData factorSummaryData, FactorSummaryData factorSummaryData2) {
                return AbstractStreamProcessor.this.dataAggregator.aggregate(factorSummaryData, factorSummaryData2);
            }
        }, Materialized.as("aggregated-stream-store-min").withValueSerde(this.summaryDataSerde)).toStream().map(new KeyValueMapper<Windowed<String>, FactorSummaryData, KeyValue<String, FactorSummaryData>>() { // from class: com.vortex.platform.dsms.stream.AbstractStreamProcessor.3
            public KeyValue<String, FactorSummaryData> apply(Windowed<String> windowed, FactorSummaryData factorSummaryData) {
                factorSummaryData.setDatetime(windowed.window().start());
                return new KeyValue<>(windowed.key(), factorSummaryData);
            }
        }).to("SUMMARY_MIN_DATA", Produced.with(Serdes.String(), this.summaryDataSerde));
    }

    protected void buildMin10Stream(KStream<String, FactorSummaryData> kStream) {
        this.logger.info("10 min stream summary");
        kStream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(10L))).aggregate(new Initializer<FactorSummaryData>() { // from class: com.vortex.platform.dsms.stream.AbstractStreamProcessor.7
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public FactorSummaryData m8apply() {
                return null;
            }
        }, new Aggregator<String, FactorSummaryData, FactorSummaryData>() { // from class: com.vortex.platform.dsms.stream.AbstractStreamProcessor.8
            public FactorSummaryData apply(String str, FactorSummaryData factorSummaryData, FactorSummaryData factorSummaryData2) {
                return AbstractStreamProcessor.this.dataAggregator.aggregate(factorSummaryData, factorSummaryData2);
            }
        }, Materialized.as("aggregated-stream-store-min10").withValueSerde(this.summaryDataSerde)).toStream().map(new KeyValueMapper<Windowed<String>, FactorSummaryData, KeyValue<String, FactorSummaryData>>() { // from class: com.vortex.platform.dsms.stream.AbstractStreamProcessor.6
            public KeyValue<String, FactorSummaryData> apply(Windowed<String> windowed, FactorSummaryData factorSummaryData) {
                factorSummaryData.setDatetime(windowed.window().start());
                return new KeyValue<>(windowed.key(), factorSummaryData);
            }
        }).to("SUMMARY_MIN10_DATA", Produced.with(Serdes.String(), this.summaryDataSerde));
    }

    protected void buildMin30Stream(KStream<String, FactorSummaryData> kStream) {
        this.logger.info("30 min stream summary");
        kStream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(30L))).aggregate(new Initializer<FactorSummaryData>() { // from class: com.vortex.platform.dsms.stream.AbstractStreamProcessor.10
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public FactorSummaryData m5apply() {
                return null;
            }
        }, new Aggregator<String, FactorSummaryData, FactorSummaryData>() { // from class: com.vortex.platform.dsms.stream.AbstractStreamProcessor.11
            public FactorSummaryData apply(String str, FactorSummaryData factorSummaryData, FactorSummaryData factorSummaryData2) {
                return AbstractStreamProcessor.this.dataAggregator.aggregate(factorSummaryData, factorSummaryData2);
            }
        }, Materialized.as("aggregated-stream-store-min30").withValueSerde(this.summaryDataSerde)).toStream().map(new KeyValueMapper<Windowed<String>, FactorSummaryData, KeyValue<String, FactorSummaryData>>() { // from class: com.vortex.platform.dsms.stream.AbstractStreamProcessor.9
            public KeyValue<String, FactorSummaryData> apply(Windowed<String> windowed, FactorSummaryData factorSummaryData) {
                factorSummaryData.setDatetime(windowed.window().start());
                return new KeyValue<>(windowed.key(), factorSummaryData);
            }
        }).to("SUMMARY_MIN30_DATA", Produced.with(Serdes.String(), this.summaryDataSerde));
    }

    protected void buildHourStream(KStream<String, FactorSummaryData> kStream) {
        this.logger.info("hour stream summary");
        kStream.groupByKey().windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1L))).aggregate(new Initializer<FactorSummaryData>() { // from class: com.vortex.platform.dsms.stream.AbstractStreamProcessor.13
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public FactorSummaryData m6apply() {
                return null;
            }
        }, new Aggregator<String, FactorSummaryData, FactorSummaryData>() { // from class: com.vortex.platform.dsms.stream.AbstractStreamProcessor.14
            public FactorSummaryData apply(String str, FactorSummaryData factorSummaryData, FactorSummaryData factorSummaryData2) {
                return AbstractStreamProcessor.this.dataAggregator.aggregate(factorSummaryData, factorSummaryData2);
            }
        }, Materialized.as("aggregated-stream-store-hour").withValueSerde(this.summaryDataSerde)).toStream().map(new KeyValueMapper<Windowed<String>, FactorSummaryData, KeyValue<String, FactorSummaryData>>() { // from class: com.vortex.platform.dsms.stream.AbstractStreamProcessor.12
            public KeyValue<String, FactorSummaryData> apply(Windowed<String> windowed, FactorSummaryData factorSummaryData) {
                factorSummaryData.setDatetime(windowed.window().start());
                return new KeyValue<>(windowed.key(), factorSummaryData);
            }
        }).to("SUMMARY_HOUR_DATA", Produced.with(Serdes.String(), this.summaryDataSerde));
    }
}
