| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460 |
- package com.shkpr.service.aimodelpower.components;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.global.base.log.LogLevelFlag;
- import com.global.base.log.LogPrintMgr;
- import com.shkpr.service.aimodelpower.commproperties.PredictProperties;
- import com.shkpr.service.aimodelpower.commtools.AsyncRequestUtil;
- import com.shkpr.service.aimodelpower.commtools.ModelPredictorUtil;
- import com.shkpr.service.aimodelpower.constants.LogFlagBusiType;
- import com.shkpr.service.aimodelpower.constants.ModelPredictorTaskStatus;
- import com.shkpr.service.aimodelpower.dbdao.services.intef.OrgConfigService;
- import com.shkpr.service.aimodelpower.dbdao.services.intef.WaterVolumePredictService;
- import com.shkpr.service.aimodelpower.dto.ModelPredictorResult;
- import com.shkpr.service.aimodelpower.dto.ModelPredictorTask;
- import com.shkpr.service.aimodelpower.dto.OrgConfig;
- import com.shkpr.service.aimodelpower.dto.WaterVolumePredictHour;
- import com.shkpr.service.aimodelpower.jsonbean.ModelPredictorDay;
- import com.shkpr.service.aimodelpower.jsonbean.ModelPredictorHour;
- import com.shkpr.service.aimodelpower.jsonbean.ModelPredictorTrain;
- import org.apache.commons.lang3.StringUtils;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
- import org.springframework.stereotype.Component;
- import java.time.LocalDate;
- import java.util.List;
- import java.util.Objects;
- import java.util.stream.Collectors;
- /**
- * 模型预测器
- *
- * @author 欧阳劲驰
- * @serial 1.0.4
- */
- @Component
- public class ModelPredictor {
- /**
- * log
- */
- private final static String mStrClassName = "ModelPredictor";
- private final static String mBizType = LogFlagBusiType.BUSI_MODEL_PREDICT.toStrValue();
- /**
- * 预测日水量路径
- */
- private final static String predictDayWaterUri = "PredictSupplyAsync";
- /**
- * 预测小时水量路径
- */
- private final static String predictHourWaterUri = "PredictHourAsync";
- /**
- * 训练日水量路径
- */
- private final static String trainDayWaterUri = "TrainSupplyAsync";
- /**
- * 训练小时水量路径
- */
- private final static String trainHourWaterUri = "TrainHourSupplyAsync";
- /**
- * 预测小时泵路径
- */
- private final static String predictHourPumpUri = "PumpSchedulingAsync";
- /**
- * 异步任务进度
- */
- private final static String taskProgressUri = "task/progress/";
- final
- ThreadPoolTaskExecutor taskScheduler;
- final
- ObjectMapper objectMapper;
- final
- PredictProperties predictProperties;
- final
- OrgConfigService orgConfigService;
- final
- WaterVolumePredictService waterVolumePredictService;
- final
- AsyncRequestUtil asyncRequestUtil;
- public ModelPredictor(@Qualifier("taskExecutor") ThreadPoolTaskExecutor taskScheduler, ObjectMapper objectMapper
- , PredictProperties predictProperties, OrgConfigService orgConfigService
- , WaterVolumePredictService waterVolumePredictService, AsyncRequestUtil asyncRequestUtil) {
- this.taskScheduler = taskScheduler;
- this.objectMapper = objectMapper;
- this.predictProperties = predictProperties;
- this.orgConfigService = orgConfigService;
- this.waterVolumePredictService = waterVolumePredictService;
- this.asyncRequestUtil = asyncRequestUtil;
- }
- /**
- * 预测日水量
- *
- * @param previousDays 往前回溯天数
- * @param nextDays 往后预测天数
- * @param orgName 组织名称
- */
- public void predictDayWater(int previousDays, int nextDays, String orgName) {
- //计算开始和结束日期
- LocalDate startTime = LocalDate.now().minusDays(previousDays);
- LocalDate endTime = LocalDate.now().plusDays(nextDays);
- //关闭的组织名称
- List<String> closeOrgNames = predictProperties.getCloseOrgNames();
- //获取组织配置
- List<OrgConfig> orgConfigs = orgConfigService.findAll().stream()
- //排除关闭组织
- .filter(config -> !closeOrgNames.contains(config.getOrgName()))
- .collect(Collectors.toList());
- //遍历组织配置
- orgConfigs.forEach(orgConfig -> {
- if (orgName != null && !Objects.equals(orgConfig.getOrgName(), orgName)) return;
- long begin = System.currentTimeMillis();
- //请求参数
- ModelPredictorDay predictorDay = new ModelPredictorDay(orgConfig.getOrgId(), startTime.toString(), endTime.toString());
- //发送请求
- ModelPredictorResult startResult = asyncRequestUtil.start(predictProperties.getUrl() + predictDayWaterUri, predictorDay);
- if (startResult == null) return;
- //任务id
- String taskId = (String) startResult.getKeyValue();
- if (StringUtils.isBlank(taskId)) return;
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
- , String.format("开始执行预测日水量 组织名称:%s,开始日期:%s,结束日期:%s,任务id:%s",
- orgConfig.getOrgName(), startTime, endTime, taskId));
- //等待结果
- ModelPredictorResult taskResult = asyncRequestUtil.wait(predictProperties.getUrl() + taskProgressUri + taskId);
- if (taskResult == null || taskResult.getTask() == null) return;
- long end = System.currentTimeMillis();
- //任务状态
- ModelPredictorTask task = taskResult.getTask();
- switch (task.getStatus()) {
- case ModelPredictorTaskStatus.COMPLETED:
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
- , String.format("执行预测日水量成功,组织名称:%s,用时(毫秒):%d,返回:%s"
- , orgConfig.getOrgName(), (end - begin), taskResult));
- break;
- case ModelPredictorTaskStatus.FAILED:
- case ModelPredictorTaskStatus.CANCELLED:
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
- , String.format("执行预测日水量失败,组织名称:%s,用时(毫秒):%d,返回:%s"
- , orgConfig.getOrgName(), (end - begin), taskResult));
- break;
- }
- });
- }
- /**
- * 预测小时水量
- *
- * @param previousDays 往前回溯天数
- * @param nextDays 往后预测天数
- * @param orgName 组织名称
- */
- public void predictHourWater(int previousDays, int nextDays, String orgName) {
- //计算开始日期
- LocalDate startTime = LocalDate.now().minusDays(previousDays);
- //关闭的组织名称
- List<String> closeOrgNames = predictProperties.getCloseOrgNames();
- //获取组织配置
- List<OrgConfig> orgConfigs = orgConfigService.findAll().stream()
- //排除关闭组织
- .filter(config -> !closeOrgNames.contains(config.getOrgName()))
- .collect(Collectors.toList());
- //遍历组织配置
- for (OrgConfig orgConfig : orgConfigs) {
- if (orgName != null && !Objects.equals(orgConfig.getOrgName(), orgName)) continue;
- //遍历预测天数
- for (int i = 0; i <= nextDays; i++) {
- long begin = System.currentTimeMillis();
- //计算预测日期,开始日期+i
- String time = startTime.plusDays(i).toString();
- //请求参数
- ModelPredictorHour predictorHour = new ModelPredictorHour(orgConfig.getOrgId(), time);
- //发送请求
- ModelPredictorResult startResult = asyncRequestUtil.start(predictProperties.getUrl() + predictHourWaterUri, predictorHour);
- if (startResult == null) return;
- //任务id
- String taskId = (String) startResult.getKeyValue();
- if (StringUtils.isBlank(taskId)) return;
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
- , String.format("开始执行预测小时水量 组织名称:%s,日期:%s,任务id:%s",
- orgConfig.getOrgName(), time, taskId));
- //等待结果
- ModelPredictorResult taskResult = asyncRequestUtil.wait(predictProperties.getUrl() + taskProgressUri + taskId);
- if (taskResult == null || taskResult.getTask() == null) return;
- long end = System.currentTimeMillis();
- //任务状态
- ModelPredictorTask task = taskResult.getTask();
- switch (task.getStatus()) {
- case ModelPredictorTaskStatus.COMPLETED:
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
- , String.format("执行预测小时水量成功,组织名称:%s,用时(毫秒):%d,返回:%s"
- , orgConfig.getOrgName(), (end - begin), taskResult));
- break;
- case ModelPredictorTaskStatus.FAILED:
- case ModelPredictorTaskStatus.CANCELLED:
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
- , String.format("执行预测小时水量失败,组织名称:%s,用时(毫秒):%d,返回:%s"
- , orgConfig.getOrgName(), (end - begin), taskResult));
- break;
- }
- }
- }
- }
- /**
- * 预测分钟水量
- *
- * @param previousDays 往前回溯天数
- * @param nextDays 往后预测天数
- * @param orgName 组织名称
- */
- public void predictMinuteWater(int previousDays, int nextDays, String orgName) {
- //计算开始和结束日期
- LocalDate startTime = LocalDate.now().minusDays(previousDays);
- LocalDate endTime = LocalDate.now().plusDays(nextDays);
- //关闭的组织名称
- List<String> closeOrgNames = predictProperties.getCloseOrgNames();
- //获取组织配置
- List<OrgConfig> orgConfigs = orgConfigService.findAll().stream()
- //排除关闭所组织
- .filter(config -> !closeOrgNames.contains(config.getOrgName()))
- .collect(Collectors.toList());
- //遍历组织配置
- for (OrgConfig orgConfig : orgConfigs) {
- if (orgName != null && !Objects.equals(orgConfig.getOrgName(), orgName)) continue;
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
- , String.format("开始执行预测分钟水量 组织名称:%s,开始日期:%s,结束日期:%s",
- orgConfig.getOrgName(), startTime, endTime));
- long begin = System.currentTimeMillis();
- //读取小时预测数据
- List<WaterVolumePredictHour> hours = waterVolumePredictService.findHour(startTime, endTime, orgConfig.getOrgId());
- //计算分钟预测数据
- List<WaterVolumePredictHour> minutes = ModelPredictorUtil.calcMinuteWater(hours);
- //执行批量合并
- Boolean upserted = waterVolumePredictService.upsertAllMinute(minutes);
- long end = System.currentTimeMillis();
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
- , String.format(
- "结束执行预测分钟水量,组织名称:%s,入库状态:%s,用时(毫秒):%d",
- orgConfig.getOrgName()
- , upserted
- , (end - begin)
- )
- );
- }
- }
- /**
- * 训练小时水量
- */
- public void trainDayWater() {
- //关闭的组织名称
- List<String> closeOrgNames = predictProperties.getCloseOrgNames();
- //获取组织配置
- List<OrgConfig> orgConfigs = orgConfigService.findAll().stream()
- //排除关闭组织
- .filter(config -> !closeOrgNames.contains(config.getOrgName()))
- .collect(Collectors.toList());
- //遍历组织配置
- orgConfigs.forEach(orgConfig -> {
- long begin = System.currentTimeMillis();
- //请求参数
- ModelPredictorTrain predictorDay = new ModelPredictorTrain(orgConfig.getOrgId());
- //发送请求
- ModelPredictorResult startResult = asyncRequestUtil.start(predictProperties.getUrl() + trainDayWaterUri, predictorDay);
- if (startResult == null) return;
- //任务id
- String taskId = (String) startResult.getKeyValue();
- if (StringUtils.isBlank(taskId)) return;
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
- , String.format("开始执行训练日水量 组织名称:%s,任务id:%s",
- orgConfig.getOrgName(), taskId));
- //等待结果
- ModelPredictorResult taskResult = asyncRequestUtil.wait(predictProperties.getUrl() + taskProgressUri + taskId);
- if (taskResult == null || taskResult.getTask() == null) return;
- long end = System.currentTimeMillis();
- //任务状态
- ModelPredictorTask task = taskResult.getTask();
- switch (task.getStatus()) {
- case ModelPredictorTaskStatus.COMPLETED:
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
- , String.format("执行训练日水量成功,组织名称:%s,用时(毫秒):%d,返回:%s"
- , orgConfig.getOrgName(), (end - begin), taskResult));
- break;
- case ModelPredictorTaskStatus.FAILED:
- case ModelPredictorTaskStatus.CANCELLED:
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
- , String.format("执行训练日水量失败,组织名称:%s,用时(毫秒):%d,返回:%s"
- , orgConfig.getOrgName(), (end - begin), taskResult));
- break;
- }
- });
- }
- /**
- * 训练小时水量
- */
- public void trainHourWater() {
- //关闭的组织名称
- List<String> closeOrgNames = predictProperties.getCloseOrgNames();
- //获取组织配置
- List<OrgConfig> orgConfigs = orgConfigService.findAll().stream()
- //排除关闭组织
- .filter(config -> !closeOrgNames.contains(config.getOrgName()))
- .collect(Collectors.toList());
- //遍历组织配置
- orgConfigs.forEach(orgConfig -> {
- long begin = System.currentTimeMillis();
- //请求参数
- ModelPredictorTrain predictorDay = new ModelPredictorTrain(orgConfig.getOrgId());
- //发送请求
- ModelPredictorResult startResult = asyncRequestUtil.start(predictProperties.getUrl() + trainHourWaterUri, predictorDay);
- if (startResult == null) return;
- //任务id
- String taskId = (String) startResult.getKeyValue();
- if (StringUtils.isBlank(taskId)) return;
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
- , String.format("开始执行训练小时水量 组织名称:%s,任务id:%s",
- orgConfig.getOrgName(), taskId));
- //等待结果
- ModelPredictorResult taskResult = asyncRequestUtil.wait(predictProperties.getUrl() + taskProgressUri + taskId);
- if (taskResult == null || taskResult.getTask() == null) return;
- long end = System.currentTimeMillis();
- //任务状态
- ModelPredictorTask task = taskResult.getTask();
- switch (task.getStatus()) {
- case ModelPredictorTaskStatus.COMPLETED:
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
- , String.format("执行训练小时水量成功,组织名称:%s,用时(毫秒):%d,返回:%s"
- , orgConfig.getOrgName(), (end - begin), taskResult));
- break;
- case ModelPredictorTaskStatus.FAILED:
- case ModelPredictorTaskStatus.CANCELLED:
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
- , String.format("执行训练小时水量失败,组织名称:%s,用时(毫秒):%d,返回:%s"
- , orgConfig.getOrgName(), (end - begin), taskResult));
- break;
- }
- });
- }
- /**
- * 预测小时泵
- *
- * @param nextDays 往后预测天数
- */
- public void predictHourPump(int nextDays) {
- //计算开始日期
- LocalDate startTime = LocalDate.now();
- //关闭的组织名称
- List<String> closeOrgNames = predictProperties.getCloseOrgNames();
- //获取组织配置
- List<OrgConfig> orgConfigs = orgConfigService.findAll().stream()
- //排除关闭组织
- .filter(config -> !closeOrgNames.contains(config.getOrgName()))
- .collect(Collectors.toList());
- //遍历组织配置
- for (OrgConfig orgConfig : orgConfigs) {
- //遍历预测天数
- for (int i = 0; i <= nextDays; i++) {
- long begin = System.currentTimeMillis();
- //计算预测日期,开始日期+i
- String time = startTime.plusDays(i).toString();
- //请求参数
- ModelPredictorHour predictorHour = new ModelPredictorHour(orgConfig.getOrgId(), time);
- //发送请求
- ModelPredictorResult startResult = asyncRequestUtil.start(predictProperties.getUrl() + predictHourPumpUri, predictorHour);
- if (startResult == null) return;
- //任务id
- String taskId = (String) startResult.getKeyValue();
- if (StringUtils.isBlank(taskId)) return;
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
- , String.format("开始执行预测小时泵 组织名称:%s,日期:%s,任务id:%s",
- orgConfig.getOrgName(), time, taskId));
- //等待结果
- ModelPredictorResult taskResult = asyncRequestUtil.wait(predictProperties.getUrl() + taskProgressUri + taskId);
- if (taskResult == null || taskResult.getTask() == null) return;
- long end = System.currentTimeMillis();
- //任务状态
- ModelPredictorTask task = taskResult.getTask();
- switch (task.getStatus()) {
- case ModelPredictorTaskStatus.COMPLETED:
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
- , String.format("执行预测小时泵成功,组织名称:%s,用时(毫秒):%d,返回:%s"
- , orgConfig.getOrgName(), (end - begin), taskResult));
- break;
- case ModelPredictorTaskStatus.FAILED:
- case ModelPredictorTaskStatus.CANCELLED:
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
- , String.format("执行预测小时泵失败,组织名称:%s,用时(毫秒):%d,返回:%s"
- , orgConfig.getOrgName(), (end - begin), taskResult));
- break;
- }
- }
- }
- }
- /**
- * 同步小时泵
- *
- * @param nextDays 往后预测天数
- */
- public void syncHourPump(int nextDays) {
- //计算开始和结束日期
- LocalDate startTime = LocalDate.now();
- LocalDate endTime = LocalDate.now().plusDays(nextDays);
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
- , String.format("开始执行同步小时泵 开始日期:%s,结束日期:%s", startTime, endTime));
- long begin = System.currentTimeMillis();
- //执行同步
- Boolean synced = waterVolumePredictService.syncStatusAndEnergy(startTime, endTime);
- long end = System.currentTimeMillis();
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
- , String.format("执行预测小时泵成功 开始日期:%s,结束日期:%s,用时(毫秒):%d,同步状态:%s",
- startTime, endTime, (end - begin), synced));
- }
- }
|