| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- package com.shkpr.service.aimodelpower.components;
- import com.global.base.log.LogLevelFlag;
- import com.global.base.log.LogPrintMgr;
- import com.shkpr.service.aimodelpower.constants.LogFlagBusiType;
- import com.shkpr.service.aimodelpower.constants.WaterPumpStandard;
- import com.shkpr.service.aimodelpower.dbdao.services.intef.CollectHistoryService;
- import com.shkpr.service.aimodelpower.dbdao.services.intef.WaterPumpCollectionConfigService;
- import com.shkpr.service.aimodelpower.dbdao.services.intef.WaterPumpCollectionRecordService;
- import com.shkpr.service.aimodelpower.dto.CollectHistory;
- import com.shkpr.service.aimodelpower.dto.WaterPumpCollectionConfig;
- import com.shkpr.service.aimodelpower.dto.WaterPumpCollectionRecord;
- import com.shkpr.service.aimodelpower.dto.WaterPumpEnergyConfig;
- import org.apache.commons.collections4.CollectionUtils;
- import org.apache.commons.lang3.RandomUtils;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
- import org.springframework.stereotype.Component;
- import java.math.BigDecimal;
- import java.math.RoundingMode;
- import java.time.LocalDateTime;
- import java.time.temporal.ChronoUnit;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import java.util.Objects;
- import java.util.stream.Collectors;
- /**
- * 泵采集器
- *
- * @author 欧阳劲驰
- * @serial 1.0.4
- */
- @Component
- public class WaterPumpCollector {
- /**
- * log
- */
- private final static String mStrClassName = "PumpDataCollector";
- private final static String mBizType = LogFlagBusiType.BUSI_DATA_COLLECT.toStrValue();
- /**
- * 小时单位
- */
- private final static String HOUR_UNIT = "hour";
- /**
- * 小时对齐单位
- */
- private final static String HOUR_ALIGN_UNIT = "hh";
- final
- ThreadPoolTaskExecutor taskScheduler;
- final
- WaterPumpCollectionConfigService waterPumpCollectionConfigService;
- final
- CollectHistoryService collectHistoryService;
- final
- WaterPumpCollectionRecordService waterPumpCollectionRecordService;
- public WaterPumpCollector(@Qualifier("taskExecutor") ThreadPoolTaskExecutor taskScheduler, WaterPumpCollectionConfigService waterPumpCollectionConfigService, CollectHistoryService collectHistoryService, WaterPumpCollectionRecordService waterPumpCollectionRecordService) {
- this.taskScheduler = taskScheduler;
- this.waterPumpCollectionConfigService = waterPumpCollectionConfigService;
- this.collectHistoryService = collectHistoryService;
- this.waterPumpCollectionRecordService = waterPumpCollectionRecordService;
- }
- /**
- * 采集数据
- *
- * @param previousHours 往前回溯小时数
- */
- public void collectData(int previousHours) {
- //计算开始和结束时间,开始时间整点回溯小时
- final LocalDateTime startTime = LocalDateTime.now().truncatedTo(ChronoUnit.HOURS).minusHours(previousHours);
- final LocalDateTime endTime = LocalDateTime.now();
- //获取采集配置
- Map<String, List<WaterPumpCollectionConfig>> configs = waterPumpCollectionConfigService.findAll().stream()
- .collect(Collectors.groupingBy(WaterPumpCollectionConfig::getDeviceCode));
- //遍历采集配置
- for (Map.Entry<String, List<WaterPumpCollectionConfig>> configEntry : configs.entrySet()) {
- taskScheduler.execute(() -> {
- //采集标签映射
- Map<String, WaterPumpCollectionConfig> tagMap = configEntry.getValue().stream()
- .collect(Collectors.toMap(
- WaterPumpCollectionConfig::getCollcationTag,
- it -> it,
- (a, b) -> b
- ));
- //采集标签
- List<String> tags = new ArrayList<>(tagMap.keySet());
- //查询总数,排除总数小于等于0的数据
- Long count = collectHistoryService.findCount(startTime, endTime, tags);
- if (CollectionUtils.isEmpty(tags) || count <= 0) return;
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
- , String.format("开始执行采集泵数据 设备编码:%s", configEntry.getKey()));
- long begin = System.currentTimeMillis();
- //读取数据
- List<CollectHistory> collectHistories = collectHistoryService.findAlign(startTime, endTime
- , 1, HOUR_UNIT, HOUR_ALIGN_UNIT, tags);
- //遍历原始数据,构建采集数据
- List<WaterPumpCollectionRecord> dates = collectHistories.stream()
- //根据时间分组
- .collect(Collectors.groupingBy(CollectHistory::getTime))
- .entrySet().stream()
- .map(it -> {
- WaterPumpCollectionRecord data = new WaterPumpCollectionRecord();
- data.setDeviceCode(configEntry.getKey());
- data.setTime(it.getKey());
- //遍历同一台设备原始数据
- for (CollectHistory collectHistory : it.getValue()) {
- WaterPumpCollectionConfig config = tagMap.get(collectHistory.getTagCode());
- //根据标准模式设置对应字段
- if (config != null) switch (config.getStandardCode()) {
- case WaterPumpStandard.ACTIVE_ENERGY:
- data.setActiveEnergy(collectHistory.getVal());
- break;
- case WaterPumpStandard.STARTUP_STATE:
- data.setStartupState(collectHistory.getVal() != null ? collectHistory.getVal().shortValue() : null);
- break;
- case WaterPumpStandard.PHASE_A_CURRENT:
- data.setStartupState((short) (collectHistory.getVal() == null || collectHistory.getVal() == 0 ? 0 : 1));
- break;
- }
- }
- return data;
- }).collect(Collectors.toList());
- //执行批量合并
- Boolean upserted = waterPumpCollectionRecordService.upsertAll(dates);
- long end = System.currentTimeMillis();
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
- , String.format(
- "结束执行采集水量原始数据,设备编码:%s,入库状态:%s,用时(毫秒):%d",
- configEntry.getKey()
- , upserted
- , (end - begin)
- )
- );
- });
- }
- }
- /**
- * 计算电量
- *
- * @param previousHours 往前回溯小时数
- */
- public void calcPower(int previousHours) {
- //计算开始和结束时间,开始时间整点回溯小时
- final LocalDateTime startTime = LocalDateTime.now().truncatedTo(ChronoUnit.HOURS).minusHours(previousHours + 1);
- final LocalDateTime endTime = LocalDateTime.now();
- //采集配置
- List<WaterPumpCollectionConfig> collectionConfigs = waterPumpCollectionConfigService.findAll();
- //流量配置
- List<WaterPumpEnergyConfig> energyConfigs = waterPumpCollectionConfigService.findEnergy();
- //设备编码
- List<String> deviceCodes = collectionConfigs.stream()
- .map(WaterPumpCollectionConfig::getDeviceCode)
- .distinct().collect(Collectors.toList());
- //遍历设备
- for (String deviceCode : deviceCodes) {
- taskScheduler.execute(() -> {
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
- , String.format("开始执行计算泵电量 设备编码:%s", deviceCode));
- long begin = System.currentTimeMillis();
- //当前设备流量配置
- WaterPumpEnergyConfig energyConfig = energyConfigs.stream()
- .filter(map -> Objects.equals(deviceCode, map.getPumpID()))
- .findFirst().orElse(null);
- //读取数据
- List<WaterPumpCollectionRecord> dates = waterPumpCollectionRecordService.findAll(
- startTime, endTime, deviceCode);
- if (CollectionUtils.isEmpty(dates) || dates.size() < 2)
- return;
- //遍历数据,跳过第一条
- for (int i = 1; i < dates.size(); i++) {
- //当前记录
- WaterPumpCollectionRecord currentRecord = dates.get(i);
- //上一条记录
- WaterPumpCollectionRecord previousRecord = dates.get(i - 1);
- //获取电量
- Double currentEnergy = currentRecord.getActiveEnergy();
- Double previousEnergy = previousRecord.getActiveEnergy();
- double power;
- //使用计算电量
- if (currentEnergy != null && previousEnergy != null)
- power = currentEnergy - previousEnergy;
- //使用额度功率
- else if (energyConfig != null)
- power = energyConfig.getPower() != null ? energyConfig.getPower() : 0;
- //使用随机数
- else power = BigDecimal.valueOf(RandomUtils.nextDouble(400, 600))
- .setScale(2, RoundingMode.HALF_UP).doubleValue();
- //更新电量
- currentRecord.setPowerCons(power);
- }
- //移除第一条数据
- dates.remove(0);
- //执行批量合并
- Boolean upserted = waterPumpCollectionRecordService.upsertAll(dates);
- long end = System.currentTimeMillis();
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
- , String.format(
- "结束执行计算泵电量,设备编码:%s,入库状态:%s,用时(毫秒):%d",
- deviceCode
- , upserted
- , (end - begin)
- )
- );
- });
- }
- }
- }
|