package com.shkpr.service.aimodelpower.components; import com.global.base.log.LogLevelFlag; import com.global.base.log.LogPrintMgr; import com.shkpr.service.aimodelpower.constants.LogFlagBusiType; import com.shkpr.service.aimodelpower.constants.WaterPumpStandard; import com.shkpr.service.aimodelpower.dbdao.services.intef.CollectHistoryService; import com.shkpr.service.aimodelpower.dbdao.services.intef.WaterPumpCollectionConfigService; import com.shkpr.service.aimodelpower.dbdao.services.intef.WaterPumpCollectionRecordService; import com.shkpr.service.aimodelpower.dto.CollectHistory; import com.shkpr.service.aimodelpower.dto.WaterPumpCollectionConfig; import com.shkpr.service.aimodelpower.dto.WaterPumpCollectionRecord; import com.shkpr.service.aimodelpower.dto.WaterPumpEnergyConfig; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.RandomUtils; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.math.BigDecimal; import java.math.RoundingMode; import java.time.LocalDateTime; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; /** * 泵采集器 * * @author 欧阳劲驰 * @serial 1.0.4 */ @Component public class WaterPumpCollector { /** * log */ private final static String mStrClassName = "PumpDataCollector"; private final static String mBizType = LogFlagBusiType.BUSI_DATA_COLLECT.toStrValue(); /** * 小时单位 */ private final static String HOUR_UNIT = "hour"; /** * 小时对齐单位 */ private final static String HOUR_ALIGN_UNIT = "hh"; final ThreadPoolTaskExecutor taskScheduler; final WaterPumpCollectionConfigService waterPumpCollectionConfigService; final CollectHistoryService collectHistoryService; final WaterPumpCollectionRecordService waterPumpCollectionRecordService; public WaterPumpCollector(@Qualifier("taskExecutor") ThreadPoolTaskExecutor taskScheduler, WaterPumpCollectionConfigService waterPumpCollectionConfigService, CollectHistoryService collectHistoryService, WaterPumpCollectionRecordService waterPumpCollectionRecordService) { this.taskScheduler = taskScheduler; this.waterPumpCollectionConfigService = waterPumpCollectionConfigService; this.collectHistoryService = collectHistoryService; this.waterPumpCollectionRecordService = waterPumpCollectionRecordService; } /** * 采集数据 * * @param previousHours 往前回溯小时数 */ public void collectData(int previousHours) { //计算开始和结束时间,开始时间整点回溯小时 final LocalDateTime startTime = LocalDateTime.now().truncatedTo(ChronoUnit.HOURS).minusHours(previousHours); final LocalDateTime endTime = LocalDateTime.now(); //获取采集配置 Map> configs = waterPumpCollectionConfigService.findAll().stream() .collect(Collectors.groupingBy(WaterPumpCollectionConfig::getDeviceCode)); //遍历采集配置 for (Map.Entry> configEntry : configs.entrySet()) { taskScheduler.execute(() -> { //采集标签映射 Map tagMap = configEntry.getValue().stream() .collect(Collectors.toMap( WaterPumpCollectionConfig::getCollcationTag, it -> it, (a, b) -> b )); //采集标签 List tags = new ArrayList<>(tagMap.keySet()); //查询总数,排除总数小于等于0的数据 Long count = collectHistoryService.findCount(startTime, endTime, tags); if (CollectionUtils.isEmpty(tags) || count <= 0) return; LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName , String.format("开始执行采集泵数据 设备编码:%s", configEntry.getKey())); long begin = System.currentTimeMillis(); //读取数据 List collectHistories = collectHistoryService.findAlign(startTime, endTime , 1, HOUR_UNIT, HOUR_ALIGN_UNIT, tags); //遍历原始数据,构建采集数据 List dates = collectHistories.stream() //根据时间分组 .collect(Collectors.groupingBy(CollectHistory::getTime)) .entrySet().stream() .map(it -> { WaterPumpCollectionRecord data = new WaterPumpCollectionRecord(); data.setDeviceCode(configEntry.getKey()); data.setTime(it.getKey()); //遍历同一台设备原始数据 for (CollectHistory collectHistory : it.getValue()) { WaterPumpCollectionConfig config = tagMap.get(collectHistory.getTagCode()); //根据标准模式设置对应字段 if (config != null) switch (config.getStandardCode()) { case WaterPumpStandard.ACTIVE_ENERGY: data.setActiveEnergy(collectHistory.getVal()); break; case WaterPumpStandard.STARTUP_STATE: data.setStartupState(collectHistory.getVal() != null ? collectHistory.getVal().shortValue() : null); break; case WaterPumpStandard.PHASE_A_CURRENT: data.setStartupState((short) (collectHistory.getVal() == null || collectHistory.getVal() == 0 ? 0 : 1)); break; } } return data; }).collect(Collectors.toList()); //执行批量合并 Boolean upserted = waterPumpCollectionRecordService.upsertAll(dates); long end = System.currentTimeMillis(); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName , String.format( "结束执行采集水量原始数据,设备编码:%s,入库状态:%s,用时(毫秒):%d", configEntry.getKey() , upserted , (end - begin) ) ); }); } } /** * 计算电量 * * @param previousHours 往前回溯小时数 */ public void calcPower(int previousHours) { //计算开始和结束时间,开始时间整点回溯小时 final LocalDateTime startTime = LocalDateTime.now().truncatedTo(ChronoUnit.HOURS).minusHours(previousHours + 1); final LocalDateTime endTime = LocalDateTime.now(); //采集配置 List collectionConfigs = waterPumpCollectionConfigService.findAll(); //流量配置 List energyConfigs = waterPumpCollectionConfigService.findEnergy(); //设备编码 List deviceCodes = collectionConfigs.stream() .map(WaterPumpCollectionConfig::getDeviceCode) .distinct().collect(Collectors.toList()); //遍历设备 for (String deviceCode : deviceCodes) { taskScheduler.execute(() -> { LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName , String.format("开始执行计算泵电量 设备编码:%s", deviceCode)); long begin = System.currentTimeMillis(); //当前设备流量配置 WaterPumpEnergyConfig energyConfig = energyConfigs.stream() .filter(map -> Objects.equals(deviceCode, map.getPumpID())) .findFirst().orElse(null); //读取数据 List dates = waterPumpCollectionRecordService.findAll( startTime, endTime, deviceCode); if (CollectionUtils.isEmpty(dates) || dates.size() < 2) return; //遍历数据,跳过第一条 for (int i = 1; i < dates.size(); i++) { //当前记录 WaterPumpCollectionRecord currentRecord = dates.get(i); //上一条记录 WaterPumpCollectionRecord previousRecord = dates.get(i - 1); //获取电量 Double currentEnergy = currentRecord.getActiveEnergy(); Double previousEnergy = previousRecord.getActiveEnergy(); double power; //使用计算电量 if (currentEnergy != null && previousEnergy != null) power = currentEnergy - previousEnergy; //使用额度功率 else if (energyConfig != null) power = energyConfig.getPower() != null ? energyConfig.getPower() : 0; //使用随机数 else power = BigDecimal.valueOf(RandomUtils.nextDouble(400, 600)) .setScale(2, RoundingMode.HALF_UP).doubleValue(); //更新电量 currentRecord.setPowerCons(power); } //移除第一条数据 dates.remove(0); //执行批量合并 Boolean upserted = waterPumpCollectionRecordService.upsertAll(dates); long end = System.currentTimeMillis(); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName , String.format( "结束执行计算泵电量,设备编码:%s,入库状态:%s,用时(毫秒):%d", deviceCode , upserted , (end - begin) ) ); }); } } }