package com.shkpr.service.aimodelpower.components; import com.fasterxml.jackson.databind.ObjectMapper; import com.global.base.log.LogLevelFlag; import com.global.base.log.LogPrintMgr; import com.shkpr.service.aimodelpower.commproperties.PredictProperties; import com.shkpr.service.aimodelpower.commtools.AsyncRequestUtil; import com.shkpr.service.aimodelpower.commtools.ModelPredictorUtil; import com.shkpr.service.aimodelpower.constants.LogFlagBusiType; import com.shkpr.service.aimodelpower.constants.ModelPredictorTaskStatus; import com.shkpr.service.aimodelpower.dbdao.services.intef.OrgConfigService; import com.shkpr.service.aimodelpower.dbdao.services.intef.WaterVolumePredictService; import com.shkpr.service.aimodelpower.dto.ModelPredictorResult; import com.shkpr.service.aimodelpower.dto.ModelPredictorTask; import com.shkpr.service.aimodelpower.dto.OrgConfig; import com.shkpr.service.aimodelpower.dto.WaterVolumePredictHour; import com.shkpr.service.aimodelpower.jsonbean.ModelPredictorDay; import com.shkpr.service.aimodelpower.jsonbean.ModelPredictorHour; import com.shkpr.service.aimodelpower.jsonbean.ModelPredictorTrain; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.time.LocalDate; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; /** * 模型预测器 * * @author 欧阳劲驰 * @serial 1.0.4 */ @Component public class ModelPredictor { /** * log */ private final static String mStrClassName = "ModelPredictor"; private final static String mBizType = LogFlagBusiType.BUSI_MODEL_PREDICT.toStrValue(); /** * 预测日水量路径 */ private final static String predictDayWaterUri = "PredictSupplyAsync"; /** * 预测小时水量路径 */ private final static String predictHourWaterUri = "PredictHourAsync"; /** * 训练日水量路径 */ private final static String trainDayWaterUri = "TrainSupplyAsync"; /** * 训练小时水量路径 */ private final static String trainHourWaterUri = "TrainHourSupplyAsync"; /** * 预测小时泵路径 */ private final static String predictHourPumpUri = "PumpSchedulingAsync"; /** * 异步任务进度 */ private final static String taskProgressUri = "task/progress/"; final ThreadPoolTaskExecutor taskScheduler; final ObjectMapper objectMapper; final PredictProperties predictProperties; final OrgConfigService orgConfigService; final WaterVolumePredictService waterVolumePredictService; final AsyncRequestUtil asyncRequestUtil; public ModelPredictor(@Qualifier("taskExecutor") ThreadPoolTaskExecutor taskScheduler, ObjectMapper objectMapper , PredictProperties predictProperties, OrgConfigService orgConfigService , WaterVolumePredictService waterVolumePredictService, AsyncRequestUtil asyncRequestUtil) { this.taskScheduler = taskScheduler; this.objectMapper = objectMapper; this.predictProperties = predictProperties; this.orgConfigService = orgConfigService; this.waterVolumePredictService = waterVolumePredictService; this.asyncRequestUtil = asyncRequestUtil; } /** * 预测日水量 * * @param previousDays 往前回溯天数 * @param nextDays 往后预测天数 * @param orgName 组织名称 */ public void predictDayWater(int previousDays, int nextDays, String orgName) { //计算开始和结束日期 LocalDate startTime = LocalDate.now().minusDays(previousDays); LocalDate endTime = LocalDate.now().plusDays(nextDays); //关闭的组织名称 List closeOrgNames = predictProperties.getCloseOrgNames(); //获取组织配置 List orgConfigs = orgConfigService.findAll().stream() //排除关闭组织 .filter(config -> !closeOrgNames.contains(config.getOrgName())) .collect(Collectors.toList()); //遍历组织配置 orgConfigs.forEach(orgConfig -> { if (orgName != null && !Objects.equals(orgConfig.getOrgName(), orgName)) return; long begin = System.currentTimeMillis(); //请求参数 ModelPredictorDay predictorDay = new ModelPredictorDay(orgConfig.getOrgId(), startTime.toString(), endTime.toString()); //发送请求 ModelPredictorResult startResult = asyncRequestUtil.start(predictProperties.getUrl() + predictDayWaterUri, predictorDay); if (startResult == null) return; //任务id String taskId = (String) startResult.getKeyValue(); if (StringUtils.isBlank(taskId)) return; LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName , String.format("开始执行预测日水量 组织名称:%s,开始日期:%s,结束日期:%s,任务id:%s", orgConfig.getOrgName(), startTime, endTime, taskId)); //等待结果 ModelPredictorResult taskResult = asyncRequestUtil.wait(predictProperties.getUrl() + taskProgressUri + taskId); if (taskResult == null || taskResult.getTask() == null) return; long end = System.currentTimeMillis(); //任务状态 ModelPredictorTask task = taskResult.getTask(); switch (task.getStatus()) { case ModelPredictorTaskStatus.COMPLETED: LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName , String.format("执行预测日水量成功,组织名称:%s,用时(毫秒):%d,返回:%s" , orgConfig.getOrgName(), (end - begin), taskResult)); break; case ModelPredictorTaskStatus.FAILED: case ModelPredictorTaskStatus.CANCELLED: LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName , String.format("执行预测日水量失败,组织名称:%s,用时(毫秒):%d,返回:%s" , orgConfig.getOrgName(), (end - begin), taskResult)); break; } }); } /** * 预测小时水量 * * @param previousDays 往前回溯天数 * @param nextDays 往后预测天数 * @param orgName 组织名称 */ public void predictHourWater(int previousDays, int nextDays, String orgName) { //计算开始日期 LocalDate startTime = LocalDate.now().minusDays(previousDays); //关闭的组织名称 List closeOrgNames = predictProperties.getCloseOrgNames(); //获取组织配置 List orgConfigs = orgConfigService.findAll().stream() //排除关闭组织 .filter(config -> !closeOrgNames.contains(config.getOrgName())) .collect(Collectors.toList()); //遍历组织配置 for (OrgConfig orgConfig : orgConfigs) { if (orgName != null && !Objects.equals(orgConfig.getOrgName(), orgName)) continue; //遍历预测天数 for (int i = 0; i <= nextDays; i++) { long begin = System.currentTimeMillis(); //计算预测日期,开始日期+i String time = startTime.plusDays(i).toString(); //请求参数 ModelPredictorHour predictorHour = new ModelPredictorHour(orgConfig.getOrgId(), time); //发送请求 ModelPredictorResult startResult = asyncRequestUtil.start(predictProperties.getUrl() + predictHourWaterUri, predictorHour); if (startResult == null) return; //任务id String taskId = (String) startResult.getKeyValue(); if (StringUtils.isBlank(taskId)) return; LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName , String.format("开始执行预测小时水量 组织名称:%s,日期:%s,任务id:%s", orgConfig.getOrgName(), time, taskId)); //等待结果 ModelPredictorResult taskResult = asyncRequestUtil.wait(predictProperties.getUrl() + taskProgressUri + taskId); if (taskResult == null || taskResult.getTask() == null) return; long end = System.currentTimeMillis(); //任务状态 ModelPredictorTask task = taskResult.getTask(); switch (task.getStatus()) { case ModelPredictorTaskStatus.COMPLETED: LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName , String.format("执行预测小时水量成功,组织名称:%s,用时(毫秒):%d,返回:%s" , orgConfig.getOrgName(), (end - begin), taskResult)); break; case ModelPredictorTaskStatus.FAILED: case ModelPredictorTaskStatus.CANCELLED: LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName , String.format("执行预测小时水量失败,组织名称:%s,用时(毫秒):%d,返回:%s" , orgConfig.getOrgName(), (end - begin), taskResult)); break; } } } } /** * 预测分钟水量 * * @param previousDays 往前回溯天数 * @param nextDays 往后预测天数 * @param orgName 组织名称 */ public void predictMinuteWater(int previousDays, int nextDays, String orgName) { //计算开始和结束日期 LocalDate startTime = LocalDate.now().minusDays(previousDays); LocalDate endTime = LocalDate.now().plusDays(nextDays); //关闭的组织名称 List closeOrgNames = predictProperties.getCloseOrgNames(); //获取组织配置 List orgConfigs = orgConfigService.findAll().stream() //排除关闭所组织 .filter(config -> !closeOrgNames.contains(config.getOrgName())) .collect(Collectors.toList()); //遍历组织配置 for (OrgConfig orgConfig : orgConfigs) { if (orgName != null && !Objects.equals(orgConfig.getOrgName(), orgName)) continue; LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName , String.format("开始执行预测分钟水量 组织名称:%s,开始日期:%s,结束日期:%s", orgConfig.getOrgName(), startTime, endTime)); long begin = System.currentTimeMillis(); //读取小时预测数据 List hours = waterVolumePredictService.findHour(startTime, endTime, orgConfig.getOrgId()); //计算分钟预测数据 List minutes = ModelPredictorUtil.calcMinuteWater(hours); //执行批量合并 Boolean upserted = waterVolumePredictService.upsertAllMinute(minutes); long end = System.currentTimeMillis(); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName , String.format( "结束执行预测分钟水量,组织名称:%s,入库状态:%s,用时(毫秒):%d", orgConfig.getOrgName() , upserted , (end - begin) ) ); } } /** * 训练小时水量 */ public void trainDayWater() { //关闭的组织名称 List closeOrgNames = predictProperties.getCloseOrgNames(); //获取组织配置 List orgConfigs = orgConfigService.findAll().stream() //排除关闭组织 .filter(config -> !closeOrgNames.contains(config.getOrgName())) .collect(Collectors.toList()); //遍历组织配置 orgConfigs.forEach(orgConfig -> { long begin = System.currentTimeMillis(); //请求参数 ModelPredictorTrain predictorDay = new ModelPredictorTrain(orgConfig.getOrgId()); //发送请求 ModelPredictorResult startResult = asyncRequestUtil.start(predictProperties.getUrl() + trainDayWaterUri, predictorDay); if (startResult == null) return; //任务id String taskId = (String) startResult.getKeyValue(); if (StringUtils.isBlank(taskId)) return; LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName , String.format("开始执行训练日水量 组织名称:%s,任务id:%s", orgConfig.getOrgName(), taskId)); //等待结果 ModelPredictorResult taskResult = asyncRequestUtil.wait(predictProperties.getUrl() + taskProgressUri + taskId); if (taskResult == null || taskResult.getTask() == null) return; long end = System.currentTimeMillis(); //任务状态 ModelPredictorTask task = taskResult.getTask(); switch (task.getStatus()) { case ModelPredictorTaskStatus.COMPLETED: LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName , String.format("执行训练日水量成功,组织名称:%s,用时(毫秒):%d,返回:%s" , orgConfig.getOrgName(), (end - begin), taskResult)); break; case ModelPredictorTaskStatus.FAILED: case ModelPredictorTaskStatus.CANCELLED: LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName , String.format("执行训练日水量失败,组织名称:%s,用时(毫秒):%d,返回:%s" , orgConfig.getOrgName(), (end - begin), taskResult)); break; } }); } /** * 训练小时水量 */ public void trainHourWater() { //关闭的组织名称 List closeOrgNames = predictProperties.getCloseOrgNames(); //获取组织配置 List orgConfigs = orgConfigService.findAll().stream() //排除关闭组织 .filter(config -> !closeOrgNames.contains(config.getOrgName())) .collect(Collectors.toList()); //遍历组织配置 orgConfigs.forEach(orgConfig -> { long begin = System.currentTimeMillis(); //请求参数 ModelPredictorTrain predictorDay = new ModelPredictorTrain(orgConfig.getOrgId()); //发送请求 ModelPredictorResult startResult = asyncRequestUtil.start(predictProperties.getUrl() + trainHourWaterUri, predictorDay); if (startResult == null) return; //任务id String taskId = (String) startResult.getKeyValue(); if (StringUtils.isBlank(taskId)) return; LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName , String.format("开始执行训练小时水量 组织名称:%s,任务id:%s", orgConfig.getOrgName(), taskId)); //等待结果 ModelPredictorResult taskResult = asyncRequestUtil.wait(predictProperties.getUrl() + taskProgressUri + taskId); if (taskResult == null || taskResult.getTask() == null) return; long end = System.currentTimeMillis(); //任务状态 ModelPredictorTask task = taskResult.getTask(); switch (task.getStatus()) { case ModelPredictorTaskStatus.COMPLETED: LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName , String.format("执行训练小时水量成功,组织名称:%s,用时(毫秒):%d,返回:%s" , orgConfig.getOrgName(), (end - begin), taskResult)); break; case ModelPredictorTaskStatus.FAILED: case ModelPredictorTaskStatus.CANCELLED: LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName , String.format("执行训练小时水量失败,组织名称:%s,用时(毫秒):%d,返回:%s" , orgConfig.getOrgName(), (end - begin), taskResult)); break; } }); } /** * 预测小时泵 * * @param nextDays 往后预测天数 */ public void predictHourPump(int nextDays) { //计算开始日期 LocalDate startTime = LocalDate.now(); //关闭的组织名称 List closeOrgNames = predictProperties.getCloseOrgNames(); //获取组织配置 List orgConfigs = orgConfigService.findAll().stream() //排除关闭组织 .filter(config -> !closeOrgNames.contains(config.getOrgName())) .collect(Collectors.toList()); //遍历组织配置 for (OrgConfig orgConfig : orgConfigs) { //遍历预测天数 for (int i = 0; i <= nextDays; i++) { long begin = System.currentTimeMillis(); //计算预测日期,开始日期+i String time = startTime.plusDays(i).toString(); //请求参数 ModelPredictorHour predictorHour = new ModelPredictorHour(orgConfig.getOrgId(), time); //发送请求 ModelPredictorResult startResult = asyncRequestUtil.start(predictProperties.getUrl() + predictHourPumpUri, predictorHour); if (startResult == null) return; //任务id String taskId = (String) startResult.getKeyValue(); if (StringUtils.isBlank(taskId)) return; LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName , String.format("开始执行预测小时泵 组织名称:%s,日期:%s,任务id:%s", orgConfig.getOrgName(), time, taskId)); //等待结果 ModelPredictorResult taskResult = asyncRequestUtil.wait(predictProperties.getUrl() + taskProgressUri + taskId); if (taskResult == null || taskResult.getTask() == null) return; long end = System.currentTimeMillis(); //任务状态 ModelPredictorTask task = taskResult.getTask(); switch (task.getStatus()) { case ModelPredictorTaskStatus.COMPLETED: LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName , String.format("执行预测小时泵成功,组织名称:%s,用时(毫秒):%d,返回:%s" , orgConfig.getOrgName(), (end - begin), taskResult)); break; case ModelPredictorTaskStatus.FAILED: case ModelPredictorTaskStatus.CANCELLED: LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName , String.format("执行预测小时泵失败,组织名称:%s,用时(毫秒):%d,返回:%s" , orgConfig.getOrgName(), (end - begin), taskResult)); break; } } } } /** * 同步小时泵 * * @param nextDays 往后预测天数 */ public void syncHourPump(int nextDays) { //计算开始和结束日期 LocalDate startTime = LocalDate.now(); LocalDate endTime = LocalDate.now().plusDays(nextDays); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName , String.format("开始执行同步小时泵 开始日期:%s,结束日期:%s", startTime, endTime)); long begin = System.currentTimeMillis(); //执行同步 Boolean synced = waterVolumePredictService.syncStatusAndEnergy(startTime, endTime); long end = System.currentTimeMillis(); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName , String.format("执行预测小时泵成功 开始日期:%s,结束日期:%s,用时(毫秒):%d,同步状态:%s", startTime, endTime, (end - begin), synced)); } }