WaterPumpCollector.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. package com.shkpr.service.aimodelpower.components;
  2. import com.global.base.log.LogLevelFlag;
  3. import com.global.base.log.LogPrintMgr;
  4. import com.shkpr.service.aimodelpower.constants.LogFlagBusiType;
  5. import com.shkpr.service.aimodelpower.constants.WaterPumpStandard;
  6. import com.shkpr.service.aimodelpower.dbdao.services.intef.CollectHistoryService;
  7. import com.shkpr.service.aimodelpower.dbdao.services.intef.WaterPumpCollectionConfigService;
  8. import com.shkpr.service.aimodelpower.dbdao.services.intef.WaterPumpCollectionRecordService;
  9. import com.shkpr.service.aimodelpower.dto.CollectHistory;
  10. import com.shkpr.service.aimodelpower.dto.WaterPumpCollectionConfig;
  11. import com.shkpr.service.aimodelpower.dto.WaterPumpCollectionRecord;
  12. import com.shkpr.service.aimodelpower.dto.WaterPumpEnergyConfig;
  13. import org.apache.commons.collections4.CollectionUtils;
  14. import org.apache.commons.lang3.RandomUtils;
  15. import org.springframework.beans.factory.annotation.Qualifier;
  16. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  17. import org.springframework.stereotype.Component;
  18. import java.math.BigDecimal;
  19. import java.math.RoundingMode;
  20. import java.time.LocalDateTime;
  21. import java.time.temporal.ChronoUnit;
  22. import java.util.ArrayList;
  23. import java.util.List;
  24. import java.util.Map;
  25. import java.util.Objects;
  26. import java.util.stream.Collectors;
  27. /**
  28. * 泵采集器
  29. *
  30. * @author 欧阳劲驰
  31. * @serial 1.0.4
  32. */
  33. @Component
  34. public class WaterPumpCollector {
  35. /**
  36. * log
  37. */
  38. private final static String mStrClassName = "PumpDataCollector";
  39. private final static String mBizType = LogFlagBusiType.BUSI_DATA_COLLECT.toStrValue();
  40. /**
  41. * 小时单位
  42. */
  43. private final static String HOUR_UNIT = "hour";
  44. /**
  45. * 小时对齐单位
  46. */
  47. private final static String HOUR_ALIGN_UNIT = "hh";
  48. final
  49. ThreadPoolTaskExecutor taskScheduler;
  50. final
  51. WaterPumpCollectionConfigService waterPumpCollectionConfigService;
  52. final
  53. CollectHistoryService collectHistoryService;
  54. final
  55. WaterPumpCollectionRecordService waterPumpCollectionRecordService;
  56. public WaterPumpCollector(@Qualifier("taskExecutor") ThreadPoolTaskExecutor taskScheduler, WaterPumpCollectionConfigService waterPumpCollectionConfigService, CollectHistoryService collectHistoryService, WaterPumpCollectionRecordService waterPumpCollectionRecordService) {
  57. this.taskScheduler = taskScheduler;
  58. this.waterPumpCollectionConfigService = waterPumpCollectionConfigService;
  59. this.collectHistoryService = collectHistoryService;
  60. this.waterPumpCollectionRecordService = waterPumpCollectionRecordService;
  61. }
  62. /**
  63. * 采集数据
  64. *
  65. * @param previousHours 往前回溯小时数
  66. */
  67. public void collectData(int previousHours) {
  68. //计算开始和结束时间,开始时间整点回溯小时
  69. final LocalDateTime startTime = LocalDateTime.now().truncatedTo(ChronoUnit.HOURS).minusHours(previousHours);
  70. final LocalDateTime endTime = LocalDateTime.now();
  71. //获取采集配置
  72. Map<String, List<WaterPumpCollectionConfig>> configs = waterPumpCollectionConfigService.findAll().stream()
  73. .collect(Collectors.groupingBy(WaterPumpCollectionConfig::getDeviceCode));
  74. //遍历采集配置
  75. for (Map.Entry<String, List<WaterPumpCollectionConfig>> configEntry : configs.entrySet()) {
  76. taskScheduler.execute(() -> {
  77. //采集标签映射
  78. Map<String, WaterPumpCollectionConfig> tagMap = configEntry.getValue().stream()
  79. .collect(Collectors.toMap(
  80. WaterPumpCollectionConfig::getCollcationTag,
  81. it -> it,
  82. (a, b) -> b
  83. ));
  84. //采集标签
  85. List<String> tags = new ArrayList<>(tagMap.keySet());
  86. //查询总数,排除总数小于等于0的数据
  87. Long count = collectHistoryService.findCount(startTime, endTime, tags);
  88. if (CollectionUtils.isEmpty(tags) || count <= 0) return;
  89. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
  90. , String.format("开始执行采集泵数据 设备编码:%s", configEntry.getKey()));
  91. long begin = System.currentTimeMillis();
  92. //读取数据
  93. List<CollectHistory> collectHistories = collectHistoryService.findAlign(startTime, endTime
  94. , 1, HOUR_UNIT, HOUR_ALIGN_UNIT, tags);
  95. //遍历原始数据,构建采集数据
  96. List<WaterPumpCollectionRecord> dates = collectHistories.stream()
  97. //根据时间分组
  98. .collect(Collectors.groupingBy(CollectHistory::getTime))
  99. .entrySet().stream()
  100. .map(it -> {
  101. WaterPumpCollectionRecord data = new WaterPumpCollectionRecord();
  102. data.setDeviceCode(configEntry.getKey());
  103. data.setTime(it.getKey());
  104. //遍历同一台设备原始数据
  105. for (CollectHistory collectHistory : it.getValue()) {
  106. WaterPumpCollectionConfig config = tagMap.get(collectHistory.getTagCode());
  107. //根据标准模式设置对应字段
  108. if (config != null) switch (config.getStandardCode()) {
  109. case WaterPumpStandard.ACTIVE_ENERGY:
  110. data.setActiveEnergy(collectHistory.getVal());
  111. break;
  112. case WaterPumpStandard.STARTUP_STATE:
  113. data.setStartupState(collectHistory.getVal() != null ? collectHistory.getVal().shortValue() : null);
  114. break;
  115. case WaterPumpStandard.PHASE_A_CURRENT:
  116. data.setStartupState((short) (collectHistory.getVal() == null || collectHistory.getVal() == 0 ? 0 : 1));
  117. break;
  118. }
  119. }
  120. return data;
  121. }).collect(Collectors.toList());
  122. //执行批量合并
  123. Boolean upserted = waterPumpCollectionRecordService.upsertAll(dates);
  124. long end = System.currentTimeMillis();
  125. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
  126. , String.format(
  127. "结束执行采集水量原始数据,设备编码:%s,入库状态:%s,用时(毫秒):%d",
  128. configEntry.getKey()
  129. , upserted
  130. , (end - begin)
  131. )
  132. );
  133. });
  134. }
  135. }
  136. /**
  137. * 计算电量
  138. *
  139. * @param previousHours 往前回溯小时数
  140. */
  141. public void calcPower(int previousHours) {
  142. //计算开始和结束时间,开始时间整点回溯小时
  143. final LocalDateTime startTime = LocalDateTime.now().truncatedTo(ChronoUnit.HOURS).minusHours(previousHours + 1);
  144. final LocalDateTime endTime = LocalDateTime.now();
  145. //采集配置
  146. List<WaterPumpCollectionConfig> collectionConfigs = waterPumpCollectionConfigService.findAll();
  147. //流量配置
  148. List<WaterPumpEnergyConfig> energyConfigs = waterPumpCollectionConfigService.findEnergy();
  149. //设备编码
  150. List<String> deviceCodes = collectionConfigs.stream()
  151. .map(WaterPumpCollectionConfig::getDeviceCode)
  152. .distinct().collect(Collectors.toList());
  153. //遍历设备
  154. for (String deviceCode : deviceCodes) {
  155. taskScheduler.execute(() -> {
  156. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
  157. , String.format("开始执行计算泵电量 设备编码:%s", deviceCode));
  158. long begin = System.currentTimeMillis();
  159. //当前设备流量配置
  160. WaterPumpEnergyConfig energyConfig = energyConfigs.stream()
  161. .filter(map -> Objects.equals(deviceCode, map.getPumpID()))
  162. .findFirst().orElse(null);
  163. //读取数据
  164. List<WaterPumpCollectionRecord> dates = waterPumpCollectionRecordService.findAll(
  165. startTime, endTime, deviceCode);
  166. if (CollectionUtils.isEmpty(dates) || dates.size() < 2)
  167. return;
  168. //遍历数据,跳过第一条
  169. for (int i = 1; i < dates.size(); i++) {
  170. //当前记录
  171. WaterPumpCollectionRecord currentRecord = dates.get(i);
  172. //上一条记录
  173. WaterPumpCollectionRecord previousRecord = dates.get(i - 1);
  174. //获取电量
  175. Double currentEnergy = currentRecord.getActiveEnergy();
  176. Double previousEnergy = previousRecord.getActiveEnergy();
  177. double power;
  178. //使用计算电量
  179. if (currentEnergy != null && previousEnergy != null)
  180. power = currentEnergy - previousEnergy;
  181. //使用额度功率
  182. else if (energyConfig != null)
  183. power = energyConfig.getPower() != null ? energyConfig.getPower() : 0;
  184. //使用随机数
  185. else power = BigDecimal.valueOf(RandomUtils.nextDouble(400, 600))
  186. .setScale(2, RoundingMode.HALF_UP).doubleValue();
  187. //更新电量
  188. currentRecord.setPowerCons(power);
  189. }
  190. //移除第一条数据
  191. dates.remove(0);
  192. //执行批量合并
  193. Boolean upserted = waterPumpCollectionRecordService.upsertAll(dates);
  194. long end = System.currentTimeMillis();
  195. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
  196. , String.format(
  197. "结束执行计算泵电量,设备编码:%s,入库状态:%s,用时(毫秒):%d",
  198. deviceCode
  199. , upserted
  200. , (end - begin)
  201. )
  202. );
  203. });
  204. }
  205. }
  206. }