ModelPredictor.java 21 KB


  1. package com.shkpr.service.aimodelpower.components;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import com.global.base.log.LogLevelFlag;
  4. import com.global.base.log.LogPrintMgr;
  5. import com.shkpr.service.aimodelpower.commproperties.PredictProperties;
  6. import com.shkpr.service.aimodelpower.commtools.AsyncRequestUtil;
  7. import com.shkpr.service.aimodelpower.commtools.ModelPredictorUtil;
  8. import com.shkpr.service.aimodelpower.constants.LogFlagBusiType;
  9. import com.shkpr.service.aimodelpower.constants.ModelPredictorTaskStatus;
  10. import com.shkpr.service.aimodelpower.dbdao.services.intef.OrgConfigService;
  11. import com.shkpr.service.aimodelpower.dbdao.services.intef.WaterVolumePredictService;
  12. import com.shkpr.service.aimodelpower.dto.ModelPredictorResult;
  13. import com.shkpr.service.aimodelpower.dto.ModelPredictorTask;
  14. import com.shkpr.service.aimodelpower.dto.OrgConfig;
  15. import com.shkpr.service.aimodelpower.dto.WaterVolumePredictHour;
  16. import com.shkpr.service.aimodelpower.jsonbean.ModelPredictorDay;
  17. import com.shkpr.service.aimodelpower.jsonbean.ModelPredictorHour;
  18. import com.shkpr.service.aimodelpower.jsonbean.ModelPredictorTrain;
  19. import org.apache.commons.lang3.StringUtils;
  20. import org.springframework.beans.factory.annotation.Qualifier;
  21. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  22. import org.springframework.stereotype.Component;
  23. import java.time.LocalDate;
  24. import java.util.List;
  25. import java.util.Objects;
  26. import java.util.stream.Collectors;
  27. /**
  28. * 模型预测器
  29. *
  30. * @author 欧阳劲驰
  31. * @serial 1.0.4
  32. */
  33. @Component
  34. public class ModelPredictor {
  35. /**
  36. * log
  37. */
  38. private final static String mStrClassName = "ModelPredictor";
  39. private final static String mBizType = LogFlagBusiType.BUSI_MODEL_PREDICT.toStrValue();
  40. /**
  41. * 预测日水量路径
  42. */
  43. private final static String predictDayWaterUri = "PredictSupplyAsync";
  44. /**
  45. * 预测小时水量路径
  46. */
  47. private final static String predictHourWaterUri = "PredictHourAsync";
  48. /**
  49. * 训练日水量路径
  50. */
  51. private final static String trainDayWaterUri = "TrainSupplyAsync";
  52. /**
  53. * 训练小时水量路径
  54. */
  55. private final static String trainHourWaterUri = "TrainHourSupplyAsync";
  56. /**
  57. * 预测小时泵路径
  58. */
  59. private final static String predictHourPumpUri = "PumpSchedulingAsync";
  60. /**
  61. * 异步任务进度
  62. */
  63. private final static String taskProgressUri = "task/progress/";
  64. final
  65. ThreadPoolTaskExecutor taskScheduler;
  66. final
  67. ObjectMapper objectMapper;
  68. final
  69. PredictProperties predictProperties;
  70. final
  71. OrgConfigService orgConfigService;
  72. final
  73. WaterVolumePredictService waterVolumePredictService;
  74. final
  75. AsyncRequestUtil asyncRequestUtil;
  76. public ModelPredictor(@Qualifier("taskExecutor") ThreadPoolTaskExecutor taskScheduler, ObjectMapper objectMapper
  77. , PredictProperties predictProperties, OrgConfigService orgConfigService
  78. , WaterVolumePredictService waterVolumePredictService, AsyncRequestUtil asyncRequestUtil) {
  79. this.taskScheduler = taskScheduler;
  80. this.objectMapper = objectMapper;
  81. this.predictProperties = predictProperties;
  82. this.orgConfigService = orgConfigService;
  83. this.waterVolumePredictService = waterVolumePredictService;
  84. this.asyncRequestUtil = asyncRequestUtil;
  85. }
  86. /**
  87. * 预测日水量
  88. *
  89. * @param previousDays 往前回溯天数
  90. * @param nextDays 往后预测天数
  91. * @param orgName 组织名称
  92. */
  93. public void predictDayWater(int previousDays, int nextDays, String orgName) {
  94. //计算开始和结束日期
  95. LocalDate startTime = LocalDate.now().minusDays(previousDays);
  96. LocalDate endTime = LocalDate.now().plusDays(nextDays);
  97. //关闭的组织名称
  98. List<String> closeOrgNames = predictProperties.getCloseOrgNames();
  99. //获取组织配置
  100. List<OrgConfig> orgConfigs = orgConfigService.findAll().stream()
  101. //排除关闭组织
  102. .filter(config -> !closeOrgNames.contains(config.getOrgName()))
  103. .collect(Collectors.toList());
  104. //遍历组织配置
  105. orgConfigs.forEach(orgConfig -> {
  106. if (orgName != null && !Objects.equals(orgConfig.getOrgName(), orgName)) return;
  107. long begin = System.currentTimeMillis();
  108. //请求参数
  109. ModelPredictorDay predictorDay = new ModelPredictorDay(orgConfig.getOrgId(), startTime.toString(), endTime.toString());
  110. //发送请求
  111. ModelPredictorResult startResult = asyncRequestUtil.start(predictProperties.getUrl() + predictDayWaterUri, predictorDay);
  112. if (startResult == null) return;
  113. //任务id
  114. String taskId = (String) startResult.getKeyValue();
  115. if (StringUtils.isBlank(taskId)) return;
  116. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
  117. , String.format("开始执行预测日水量 组织名称:%s,开始日期:%s,结束日期:%s,任务id:%s",
  118. orgConfig.getOrgName(), startTime, endTime, taskId));
  119. //等待结果
  120. ModelPredictorResult taskResult = asyncRequestUtil.wait(predictProperties.getUrl() + taskProgressUri + taskId);
  121. if (taskResult == null || taskResult.getTask() == null) return;
  122. long end = System.currentTimeMillis();
  123. //任务状态
  124. ModelPredictorTask task = taskResult.getTask();
  125. switch (task.getStatus()) {
  126. case ModelPredictorTaskStatus.COMPLETED:
  127. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
  128. , String.format("执行预测日水量成功,组织名称:%s,用时(毫秒):%d,返回:%s"
  129. , orgConfig.getOrgName(), (end - begin), taskResult));
  130. break;
  131. case ModelPredictorTaskStatus.FAILED:
  132. case ModelPredictorTaskStatus.CANCELLED:
  133. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
  134. , String.format("执行预测日水量失败,组织名称:%s,用时(毫秒):%d,返回:%s"
  135. , orgConfig.getOrgName(), (end - begin), taskResult));
  136. break;
  137. }
  138. });
  139. }
  140. /**
  141. * 预测小时水量
  142. *
  143. * @param previousDays 往前回溯天数
  144. * @param nextDays 往后预测天数
  145. * @param orgName 组织名称
  146. */
  147. public void predictHourWater(int previousDays, int nextDays, String orgName) {
  148. //计算开始日期
  149. LocalDate startTime = LocalDate.now().minusDays(previousDays);
  150. //关闭的组织名称
  151. List<String> closeOrgNames = predictProperties.getCloseOrgNames();
  152. //获取组织配置
  153. List<OrgConfig> orgConfigs = orgConfigService.findAll().stream()
  154. //排除关闭组织
  155. .filter(config -> !closeOrgNames.contains(config.getOrgName()))
  156. .collect(Collectors.toList());
  157. //遍历组织配置
  158. for (OrgConfig orgConfig : orgConfigs) {
  159. if (orgName != null && !Objects.equals(orgConfig.getOrgName(), orgName)) continue;
  160. //遍历预测天数
  161. for (int i = 0; i <= nextDays; i++) {
  162. long begin = System.currentTimeMillis();
  163. //计算预测日期,开始日期+i
  164. String time = startTime.plusDays(i).toString();
  165. //请求参数
  166. ModelPredictorHour predictorHour = new ModelPredictorHour(orgConfig.getOrgId(), time);
  167. //发送请求
  168. ModelPredictorResult startResult = asyncRequestUtil.start(predictProperties.getUrl() + predictHourWaterUri, predictorHour);
  169. if (startResult == null) return;
  170. //任务id
  171. String taskId = (String) startResult.getKeyValue();
  172. if (StringUtils.isBlank(taskId)) return;
  173. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
  174. , String.format("开始执行预测小时水量 组织名称:%s,日期:%s,任务id:%s",
  175. orgConfig.getOrgName(), time, taskId));
  176. //等待结果
  177. ModelPredictorResult taskResult = asyncRequestUtil.wait(predictProperties.getUrl() + taskProgressUri + taskId);
  178. if (taskResult == null || taskResult.getTask() == null) return;
  179. long end = System.currentTimeMillis();
  180. //任务状态
  181. ModelPredictorTask task = taskResult.getTask();
  182. switch (task.getStatus()) {
  183. case ModelPredictorTaskStatus.COMPLETED:
  184. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
  185. , String.format("执行预测小时水量成功,组织名称:%s,用时(毫秒):%d,返回:%s"
  186. , orgConfig.getOrgName(), (end - begin), taskResult));
  187. break;
  188. case ModelPredictorTaskStatus.FAILED:
  189. case ModelPredictorTaskStatus.CANCELLED:
  190. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
  191. , String.format("执行预测小时水量失败,组织名称:%s,用时(毫秒):%d,返回:%s"
  192. , orgConfig.getOrgName(), (end - begin), taskResult));
  193. break;
  194. }
  195. }
  196. }
  197. }
  198. /**
  199. * 预测分钟水量
  200. *
  201. * @param previousDays 往前回溯天数
  202. * @param nextDays 往后预测天数
  203. * @param orgName 组织名称
  204. */
  205. public void predictMinuteWater(int previousDays, int nextDays, String orgName) {
  206. //计算开始和结束日期
  207. LocalDate startTime = LocalDate.now().minusDays(previousDays);
  208. LocalDate endTime = LocalDate.now().plusDays(nextDays);
  209. //关闭的组织名称
  210. List<String> closeOrgNames = predictProperties.getCloseOrgNames();
  211. //获取组织配置
  212. List<OrgConfig> orgConfigs = orgConfigService.findAll().stream()
  213. //排除关闭所组织
  214. .filter(config -> !closeOrgNames.contains(config.getOrgName()))
  215. .collect(Collectors.toList());
  216. //遍历组织配置
  217. for (OrgConfig orgConfig : orgConfigs) {
  218. if (orgName != null && !Objects.equals(orgConfig.getOrgName(), orgName)) continue;
  219. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
  220. , String.format("开始执行预测分钟水量 组织名称:%s,开始日期:%s,结束日期:%s",
  221. orgConfig.getOrgName(), startTime, endTime));
  222. long begin = System.currentTimeMillis();
  223. //读取小时预测数据
  224. List<WaterVolumePredictHour> hours = waterVolumePredictService.findHour(startTime, endTime, orgConfig.getOrgId());
  225. //计算分钟预测数据
  226. List<WaterVolumePredictHour> minutes = ModelPredictorUtil.calcMinuteWater(hours);
  227. //执行批量合并
  228. Boolean upserted = waterVolumePredictService.upsertAllMinute(minutes);
  229. long end = System.currentTimeMillis();
  230. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
  231. , String.format(
  232. "结束执行预测分钟水量,组织名称:%s,入库状态:%s,用时(毫秒):%d",
  233. orgConfig.getOrgName()
  234. , upserted
  235. , (end - begin)
  236. )
  237. );
  238. }
  239. }
  240. /**
  241. * 训练小时水量
  242. */
  243. public void trainDayWater() {
  244. //关闭的组织名称
  245. List<String> closeOrgNames = predictProperties.getCloseOrgNames();
  246. //获取组织配置
  247. List<OrgConfig> orgConfigs = orgConfigService.findAll().stream()
  248. //排除关闭组织
  249. .filter(config -> !closeOrgNames.contains(config.getOrgName()))
  250. .collect(Collectors.toList());
  251. //遍历组织配置
  252. orgConfigs.forEach(orgConfig -> {
  253. long begin = System.currentTimeMillis();
  254. //请求参数
  255. ModelPredictorTrain predictorDay = new ModelPredictorTrain(orgConfig.getOrgId());
  256. //发送请求
  257. ModelPredictorResult startResult = asyncRequestUtil.start(predictProperties.getUrl() + trainDayWaterUri, predictorDay);
  258. if (startResult == null) return;
  259. //任务id
  260. String taskId = (String) startResult.getKeyValue();
  261. if (StringUtils.isBlank(taskId)) return;
  262. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
  263. , String.format("开始执行训练日水量 组织名称:%s,任务id:%s",
  264. orgConfig.getOrgName(), taskId));
  265. //等待结果
  266. ModelPredictorResult taskResult = asyncRequestUtil.wait(predictProperties.getUrl() + taskProgressUri + taskId);
  267. if (taskResult == null || taskResult.getTask() == null) return;
  268. long end = System.currentTimeMillis();
  269. //任务状态
  270. ModelPredictorTask task = taskResult.getTask();
  271. switch (task.getStatus()) {
  272. case ModelPredictorTaskStatus.COMPLETED:
  273. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
  274. , String.format("执行训练日水量成功,组织名称:%s,用时(毫秒):%d,返回:%s"
  275. , orgConfig.getOrgName(), (end - begin), taskResult));
  276. break;
  277. case ModelPredictorTaskStatus.FAILED:
  278. case ModelPredictorTaskStatus.CANCELLED:
  279. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
  280. , String.format("执行训练日水量失败,组织名称:%s,用时(毫秒):%d,返回:%s"
  281. , orgConfig.getOrgName(), (end - begin), taskResult));
  282. break;
  283. }
  284. });
  285. }
  286. /**
  287. * 训练小时水量
  288. */
  289. public void trainHourWater() {
  290. //关闭的组织名称
  291. List<String> closeOrgNames = predictProperties.getCloseOrgNames();
  292. //获取组织配置
  293. List<OrgConfig> orgConfigs = orgConfigService.findAll().stream()
  294. //排除关闭组织
  295. .filter(config -> !closeOrgNames.contains(config.getOrgName()))
  296. .collect(Collectors.toList());
  297. //遍历组织配置
  298. orgConfigs.forEach(orgConfig -> {
  299. long begin = System.currentTimeMillis();
  300. //请求参数
  301. ModelPredictorTrain predictorDay = new ModelPredictorTrain(orgConfig.getOrgId());
  302. //发送请求
  303. ModelPredictorResult startResult = asyncRequestUtil.start(predictProperties.getUrl() + trainHourWaterUri, predictorDay);
  304. if (startResult == null) return;
  305. //任务id
  306. String taskId = (String) startResult.getKeyValue();
  307. if (StringUtils.isBlank(taskId)) return;
  308. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
  309. , String.format("开始执行训练小时水量 组织名称:%s,任务id:%s",
  310. orgConfig.getOrgName(), taskId));
  311. //等待结果
  312. ModelPredictorResult taskResult = asyncRequestUtil.wait(predictProperties.getUrl() + taskProgressUri + taskId);
  313. if (taskResult == null || taskResult.getTask() == null) return;
  314. long end = System.currentTimeMillis();
  315. //任务状态
  316. ModelPredictorTask task = taskResult.getTask();
  317. switch (task.getStatus()) {
  318. case ModelPredictorTaskStatus.COMPLETED:
  319. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
  320. , String.format("执行训练小时水量成功,组织名称:%s,用时(毫秒):%d,返回:%s"
  321. , orgConfig.getOrgName(), (end - begin), taskResult));
  322. break;
  323. case ModelPredictorTaskStatus.FAILED:
  324. case ModelPredictorTaskStatus.CANCELLED:
  325. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
  326. , String.format("执行训练小时水量失败,组织名称:%s,用时(毫秒):%d,返回:%s"
  327. , orgConfig.getOrgName(), (end - begin), taskResult));
  328. break;
  329. }
  330. });
  331. }
  332. /**
  333. * 预测小时泵
  334. *
  335. * @param nextDays 往后预测天数
  336. */
  337. public void predictHourPump(int nextDays) {
  338. //计算开始日期
  339. LocalDate startTime = LocalDate.now();
  340. //关闭的组织名称
  341. List<String> closeOrgNames = predictProperties.getCloseOrgNames();
  342. //获取组织配置
  343. List<OrgConfig> orgConfigs = orgConfigService.findAll().stream()
  344. //排除关闭组织
  345. .filter(config -> !closeOrgNames.contains(config.getOrgName()))
  346. .collect(Collectors.toList());
  347. //遍历组织配置
  348. for (OrgConfig orgConfig : orgConfigs) {
  349. //遍历预测天数
  350. for (int i = 0; i <= nextDays; i++) {
  351. long begin = System.currentTimeMillis();
  352. //计算预测日期,开始日期+i
  353. String time = startTime.plusDays(i).toString();
  354. //请求参数
  355. ModelPredictorHour predictorHour = new ModelPredictorHour(orgConfig.getOrgId(), time);
  356. //发送请求
  357. ModelPredictorResult startResult = asyncRequestUtil.start(predictProperties.getUrl() + predictHourPumpUri, predictorHour);
  358. if (startResult == null) return;
  359. //任务id
  360. String taskId = (String) startResult.getKeyValue();
  361. if (StringUtils.isBlank(taskId)) return;
  362. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
  363. , String.format("开始执行预测小时泵 组织名称:%s,日期:%s,任务id:%s",
  364. orgConfig.getOrgName(), time, taskId));
  365. //等待结果
  366. ModelPredictorResult taskResult = asyncRequestUtil.wait(predictProperties.getUrl() + taskProgressUri + taskId);
  367. if (taskResult == null || taskResult.getTask() == null) return;
  368. long end = System.currentTimeMillis();
  369. //任务状态
  370. ModelPredictorTask task = taskResult.getTask();
  371. switch (task.getStatus()) {
  372. case ModelPredictorTaskStatus.COMPLETED:
  373. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
  374. , String.format("执行预测小时泵成功,组织名称:%s,用时(毫秒):%d,返回:%s"
  375. , orgConfig.getOrgName(), (end - begin), taskResult));
  376. break;
  377. case ModelPredictorTaskStatus.FAILED:
  378. case ModelPredictorTaskStatus.CANCELLED:
  379. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
  380. , String.format("执行预测小时泵失败,组织名称:%s,用时(毫秒):%d,返回:%s"
  381. , orgConfig.getOrgName(), (end - begin), taskResult));
  382. break;
  383. }
  384. }
  385. }
  386. }
  387. /**
  388. * 同步小时泵
  389. *
  390. * @param nextDays 往后预测天数
  391. */
  392. public void syncHourPump(int nextDays) {
  393. //计算开始和结束日期
  394. LocalDate startTime = LocalDate.now();
  395. LocalDate endTime = LocalDate.now().plusDays(nextDays);
  396. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
  397. , String.format("开始执行同步小时泵 开始日期:%s,结束日期:%s", startTime, endTime));
  398. long begin = System.currentTimeMillis();
  399. //执行同步
  400. Boolean synced = waterVolumePredictService.syncStatusAndEnergy(startTime, endTime);
  401. long end = System.currentTimeMillis();
  402. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
  403. , String.format("执行预测小时泵成功 开始日期:%s,结束日期:%s,用时(毫秒):%d,同步状态:%s",
  404. startTime, endTime, (end - begin), synced));
  405. }
  406. }