package com.vortex.platform.dsms.stream;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.vortex.platform.dsms.dto.FactorSummaryData;
import com.vortex.platform.dsms.service.DeviceFactorService;
import com.vortex.platform.dsms.util.NumberUtil;
import com.vortex.platform.dss.dto.FactorValue;
import com.vortex.platform.dss.dto.FactorsData;
import java.util.ArrayList;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/vortex/platform/dsms/stream/RealTimeStreamProcessor.class */
public class RealTimeStreamProcessor extends AbstractStreamProcessor {
    private Logger logger = LoggerFactory.getLogger(RealTimeStreamProcessor.class);
    private Serde<FactorsData> factorsDataSerde;

    @Autowired
    private DeviceFactorService deviceFactorService;

    private void initFactorDataSerde() {
        this.factorsDataSerde = SerdFactory.create(FactorsData.class);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.vortex.platform.dsms.stream.AbstractStreamProcessor
    public void init() {
        super.init();
        initFactorDataSerde();
    }

    @Override // com.vortex.platform.dsms.stream.AbstractStreamProcessor
    protected void addOtherStreamBeforeSummaryStream(StreamsBuilder streamsBuilder) {
        streamsBuilder.stream("DSS_REAL_TIME_DATA", Consumed.with(Serdes.String(), this.factorsDataSerde)).flatMapValues(factorsData -> {
            Double doubleConvert;
            ArrayList newArrayList = Lists.newArrayList();
            if (factorsData.getFactorValues() == null || factorsData.getFactorValues().size() == 0) {
                return newArrayList;
            }
            this.logger.info("received realtime data:{} ", factorsData);
            for (FactorValue factorValue : factorsData.getFactorValues()) {
                if (!"NO_INTERVAL".equals(this.deviceFactorService.getFactorSummaryMode(factorsData.getDeviceCode(), factorValue.getFactorCode())) && (doubleConvert = NumberUtil.doubleConvert(factorValue.getValue())) != null) {
                    FactorSummaryData factorSummaryData = new FactorSummaryData();
                    factorSummaryData.setFactorCode(factorValue.getFactorCode());
                    factorSummaryData.setCount(1);
                    factorSummaryData.setDeviceCode(factorsData.getDeviceCode());
                    factorSummaryData.setDeviceType(factorsData.getDeviceType());
                    factorSummaryData.setMax(doubleConvert);
                    factorSummaryData.setMin(doubleConvert);
                    factorSummaryData.setValue(doubleConvert);
                    factorSummaryData.setDatetime(factorValue.getDatetime().longValue());
                    newArrayList.add(factorSummaryData);
                }
            }
            return newArrayList;
        }).map((str, factorSummaryData) -> {
            return new KeyValue(Joiner.on("_").join(factorSummaryData.getDeviceCode(), factorSummaryData.getFactorCode(), new Object[0]), factorSummaryData);
        }).to("SUMMARY_RAW_DATA", Produced.with(Serdes.String(), this.summaryDataSerde));
    }
}
