123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651 |
- package io.github.pnoker.gateway.bizmgr;
- import com.alibaba.fastjson.JSONArray;
- import com.alibaba.fastjson.JSONObject;
- import io.github.pnoker.gateway.comtool.TimeTool;
- import io.github.pnoker.gateway.dbdao.DBMgrProxy;
- import io.github.pnoker.gateway.dbdao.services.intef.WaterTapWaterService;
- import org.influxdb.dto.QueryResult;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.util.CollectionUtils;
- import org.springframework.util.ObjectUtils;
- import org.springframework.util.StringUtils;
- import java.time.Duration;
- import java.time.LocalDateTime;
- import java.time.LocalTime;
- import java.time.YearMonth;
- import java.time.format.DateTimeFormatter;
- import java.time.temporal.TemporalAdjusters;
- import java.util.*;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.ThreadLocalRandom;
- import java.util.stream.Collectors;
- /**
- * @ClassName KprJiangjinAimWaterBizFun
- * @Description: TODO 江津水量预测相关
- * @Author LX
- * @Date 2024/12/5
- * @Version V1.0
- **/
- public class KprJiangjinAimWaterBizFun {
- private static final Logger log = LoggerFactory.getLogger(KprJiangjinAimWaterBizFun.class);
- private final static String mStrClassName = "KprJiangjinAimWaterBizFun";
- private final static String EMPTY_NULL = "NULL";
- public static WaterTapWaterService getWaterTapWaterApi(){
- return DBMgrProxy.getInstance().applyWaterTapWaterService();
- }
- static DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- //TODO 初始化添加对比远通数据
- public static void initTapWaterData(String startDate){
- //TODO ① 首先查询水厂设备配置信息
- try {
- List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList(null);
- if(!CollectionUtils.isEmpty(configList)){
- //声明总数据的数据数组
- List<Map<String,Object>> newRecordAll = Collections.synchronizedList(new LinkedList<Map<String,Object>>());
- //TODO ②开启多线程并发处理各水厂设备的数据
- System.out.println("原始数据采集开始时间:"+ TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- System.out.println("原始数据采集进行中,起始时间("+startDate+"):......");
- final CountDownLatch latch = new CountDownLatch(configList.size());
- for(Map<String,Object> item:configList){
- try{
- new Thread(() -> {
- //TODO 根据当前配置信息item 查询远通数据中的历史数据
- //TODO 首先查询当前水厂设备的2023-11-01之后到最新数据的数据总数,然后分页形式获取数据
- Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(" WHERE TAG_CODE = '"+item.get("collcation_tag")+"' and QCQUISITION_TIME >= to_date('"+startDate+"', 'yyyy-mm-dd hh24:mi:ss')");
- if(itemCount!=null&&itemCount>0) {
- //TODO 优化 以分页方式查询所有,初始分页行数定为2000查询速率较好
- int pageNum = itemCount%2000==0?itemCount/2000:(itemCount/2000)+1;//总页数
- Integer limit = 2000;
- if(pageNum<=1){
- limit = itemCount;//说明总数比第一页小
- }
- for (int i = 0; i < pageNum; i++) {
- Integer offset = i*limit;
- //tapWaterHistoryList 为远通水量数据源
- //TODO 因为是orcale数据库 分页方式以伪列rownum为分页条件,因此limit实际上永远比offset大2000,因此只要不是第一页,limit=offset+2000
- Integer limitNew = 0;
- if(i>0){
- limitNew = offset+2000;
- }else{
- limitNew = 2000;
- }
- List<Map<String,Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(limitNew,offset," AND TAG_CODE = '"+item.get("collcation_tag")+"' and QCQUISITION_TIME >= to_date('"+startDate+"', 'yyyy-mm-dd hh24:mi:ss')");
- if(!CollectionUtils.isEmpty(tapWaterHistoryList)){
- //TODO 循环远通水量数据列表,查询数据不存在的话就插入
- for (int j = 0; j < tapWaterHistoryList.size(); j++) {
- List<Map<String,Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordList(1,0,
- " WHERE collcation_tag = '"+tapWaterHistoryList.get(j).get("TAG_CODE")
- +"' AND zone_name = '"+tapWaterHistoryList.get(j).get("ORG_NAME")+"' AND time = '"+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+"'");
- if(CollectionUtils.isEmpty(queryWaterRecord)){
- //TODO 说明没插入过本系统,执行插入
- String extend = " ('"+tapWaterHistoryList.get(j).get("TAG_CODE")+"','"+tapWaterHistoryList.get(j).get("ORG_NAME")+"',"
- + ""+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+","
- + "'"+tapWaterHistoryList.get(j).get("VAL")+"',"
- + TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("UPDATE_TIME").toString())
- +") ";
- int insertCode = getWaterTapWaterApi().insertWaterCollectionRecord(extend);
- if(insertCode<=0){
- log.error(String.format("Batch initTapWaterDataThread 未成功:{%s} ",
- extend));
- }
- }
- }
- }
- }
- }
- latch.countDown();
- }).start();
- }catch(Exception ex){
- log.error(String.format("Batch initTapWaterDataThread 水厂:{%s},标识:{%s} ERROR:{%s} ",
- item.get("zone_name"),
- item.get("collcation_tag")
- , ex.getLocalizedMessage()));
- }
- }
- latch.await();
- System.out.println("原始数据采集检查机制("+startDate+")结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- }
- }catch(Exception ex){
- log.error(String.format("Batch initTapWaterData ERROR:{%s} "
- , ex.getLocalizedMessage()));
- }
- }
- //TODO 初始化添加计算水厂所有设备每日的每小时用水量计算
- public static void initWaterCollecationReacordAll(String startFindTime){
- //TODO ① 首先查询水厂设备配置信息
- try {
- System.out.println("计算小时用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- System.out.println("计算小时用水量起始时间:"+startFindTime+"进行中:......");
- List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList(null);
- if (!CollectionUtils.isEmpty(configList)) {
- //TODO 按照组织机构分组
- Map<Object, List<Map<String, Object>>> groupedData =
- configList.stream().collect(Collectors.groupingBy(item -> item.get("zone_name")));
- final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
- //TODO 外层循环组织机构
- for (Object key:groupedData.keySet()){
- try{
- new Thread(() -> {
- //TODO 根据当前配置信息item 查询远通数据中的历史数据
- //TODO 首先查询当前水厂设备的从2023-11-01之后到得到数据
- LocalDateTime startDateTime = LocalDateTime.parse(startFindTime,DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
- //TODO 需计算的循环天数
- Long days = 0L;
- days = Duration.between(startDateTime, LocalDateTime.now()).toDays();
- days+=1;
- //TODO 此循环天数每一天所查的是所有设备每小时数据合
- final CountDownLatch latch2 = new CountDownLatch(days.intValue());
- for(Long k = 0L;k<days;k++) {
- LocalDateTime newStartDateTime = startDateTime.minusDays(-k.intValue());
- String startDate = newStartDateTime.format(formater);
- String endDate = newStartDateTime.minusDays(-1).format(formater);
- try {
- new Thread(() -> {
- List<Map<String, Object>> deviceList = groupedData.get(key);
- //TODO 循环获取该天该水厂每个设备数据
- //TODO 查询当前天日期内每小时的设备数据
- for (int i = 0; i < 24; i++) {
- String startTime = newStartDateTime.withHour(i).format(formater);
- String endTime = newStartDateTime.minusHours(-(i + 1)).withMinute(0).withSecond(0).format(formater);
- Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
- recordAllEntity.put("zone_name", key.toString());//水厂
- recordAllEntity.put("time", endTime);//采集时间(小时的最后时间)
- recordAllEntity.put("value", null);
- recordAllEntity.put("value_tag", "water");
- recordAllEntity.put("collcation_tag_array", JSONObject.toJSON(deviceList));
- Double value = null;
- //TODO 此循环计算该小时所有设备的用水量
- for (Map<String, Object> item : deviceList) {
- // 定义字符串日期时间的格式
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- // 解析字符串以创建 LocalDateTime 实例
- LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
- Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
- " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME <= to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
- if (itemCount != null && itemCount > 0) {
- List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
- " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME <= to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " order by QCQUISITION_TIME");
- //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量
- if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
- Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null;
- Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
- if (firstValue != null && lastValue != null) {
- //到此处是该小时一个设备的用水量已加上
- if (value == null) {
- value = 0.00;
- }
- value += Math.abs(lastValue - firstValue);
- }
- }
- }else {
- //TODO 查不到就说明采集平台数据库没有,就从infulxdb iot查询数据
- QueryResult queryResult = KprJiangjinWaterBizfun.infulxJiangjinDbUtil.queryWithCondition("WaterMeter",
- " dev_id='"+item.get("device_code")+"' " +
- " and time >= '"+startTime+"' and time <='"+endTime+"'");
- if(queryResult.hasError()) {
- log.error(String.format("Batch" +
- " queryIot ERROR:{%s} "
- , "数据未查询到"));
- }else{
- for (QueryResult.Result result : queryResult.getResults()) {
- if (result.getSeries() != null) {
- for (QueryResult.Series series : result.getSeries()) {
- List<String> columns = series.getColumns();
- int flowTotalPosIndex = columns.indexOf("flow_total_pos");
- if (flowTotalPosIndex != -1) {
- Double firstValue = findFirstNonNullValue(series.getValues(), flowTotalPosIndex);
- Double lastValue = findLastNonNullValue(series.getValues(), flowTotalPosIndex);
- if (firstValue != null && lastValue != null) {
- //到此处是该小时一个设备的用水量已加上
- if (value == null) {
- value = 0.00;
- }
- value += Math.abs(lastValue - firstValue);
- }
- } else {
- System.out.println("'flow_total_pos' column not found in the series.");
- }
- }
- } else {
- // System.out.println("No series found in the result.");
- }
- }
- }
- }
- }
- recordAllEntity.put("value", value);
- List<Map<String, Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllList(1, 0,
- " WHERE zone_name = '" + recordAllEntity.get("zone_name")
- + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
- if (CollectionUtils.isEmpty(queryWaterRecord)) {
- //TODO 说明不存在,进行插入
- if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
- int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAll(" (" +
- "'" + recordAllEntity.get("zone_name") + "'," +
- "'" + recordAllEntity.get("time") + "'," +
- "'" + recordAllEntity.get("value") + "'," +
- "'" + recordAllEntity.get("value_tag") + "'," +
- "'" + recordAllEntity.get("collcation_tag_array") + "'" +
- ") ");
- if (insertCode < 0) {
- log.error(String.format("Batch initTapWaterDataThread 未成功:{%s} ",
- JSONObject.toJSON(recordAllEntity)));
- }
- }
- }else{
- //TODO 说明存在,进行修改
- if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
- int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAll(String.valueOf(value)," WHERE " +
- "(" +
- " zone_name = '" + recordAllEntity.get("zone_name") + "' and" +
- " \"time\" = '" + recordAllEntity.get("time") + "' and" +
- " value_tag = '" + recordAllEntity.get("value_tag") + "'" +
- ") ");
- if (updateCode < 0) {
- log.error(String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ",
- JSONObject.toJSON(recordAllEntity)));
- }
- }
- }
- }
- latch2.countDown();
- }).start();
- }catch(Exception ex){
- }
- }
- try {
- latch2.await();
- }catch(Exception ex){
- }
- latch.countDown();
- }).start();
- }catch(Exception ex){
- log.error(String.format("Batch" +
- " initWaterReacordAll ERROR:{%s} "
- , ex.getLocalizedMessage()));
- }
- }
- latch.await();
- System.out.println("计算小时用水量检查机制("+startFindTime+")结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- }
- }catch(Exception ex){
- log.error(String.format("Batch initWaterCollecationReacordAll ERROR:{%s} "
- , ex.getLocalizedMessage()));
- }
- }
- private static Double findFirstNonNullValue(List<List<Object>> values, int index) {
- for (List<Object> row : values) {
- Object value = row.get(index);
- if (value != null) {
- return Double.valueOf(value.toString());
- }
- }
- return null;
- }
- private static Double findLastNonNullValue(List<List<Object>> values, int index) {
- for (int i = values.size() - 1; i >= 0; i--) {
- Object value = values.get(i).get(index);
- if (value != null) {
- return Double.valueOf(value.toString());
- }
- }
- return null;
- }
- static JSONObject pumpObj = JSONObject.parseObject("{\n" +
- "\t\"UZD2A9DF3794E400878JA\": {\n" +
- "\t\t\"outRoom\": {\n" +
- "\t\t\t\"pumps\": [{\n" +
- "\t\t\t\t\t\"label\": \"1\",\n" +
- "\t\t\t\t\t\"status\": 1,\n" +
- "\t\t\t\t\t\"forecastStatus\": 0\n" +
- "\t\t\t\t},\n" +
- "\t\t\t\t{\n" +
- "\t\t\t\t\t\"label\": \"2\",\n" +
- "\t\t\t\t\t\"status\": 0,\n" +
- "\t\t\t\t\t\"forecastStatus\": 0\n" +
- "\t\t\t\t},\n" +
- "\t\t\t\t{\n" +
- "\t\t\t\t\t\"label\": \"3\",\n" +
- "\t\t\t\t\t\"status\": 0,\n" +
- "\t\t\t\t\t\"forecastStatus\": 0\n" +
- "\t\t\t\t},\n" +
- "\t\t\t\t{\n" +
- "\t\t\t\t\t\"label\": \"4\",\n" +
- "\t\t\t\t\t\"status\": 1,\n" +
- "\t\t\t\t\t\"forecastStatus\": 1\n" +
- "\t\t\t\t}\n" +
- "\t\t\t]\n" +
- "\t\t}\n" +
- "\t},\n" +
- "\t\"UZD2A9DF38A304002A0M1\": {\n" +
- "\t\t\"outRoom\": {\n" +
- "\t\t\t\"pumps\": [{\n" +
- "\t\t\t\t\t\"label\": \"1\",\n" +
- "\t\t\t\t\t\"status\": 1,\n" +
- "\t\t\t\t\t\"forecastStatus\": 1\n" +
- "\t\t\t\t},\n" +
- "\t\t\t\t{\n" +
- "\t\t\t\t\t\"label\": \"2\",\n" +
- "\t\t\t\t\t\"status\": 0,\n" +
- "\t\t\t\t\t\"forecastStatus\": 0\n" +
- "\t\t\t\t},\n" +
- "\t\t\t\t{\n" +
- "\t\t\t\t\t\"label\": \"3\",\n" +
- "\t\t\t\t\t\"status\": 0,\n" +
- "\t\t\t\t\t\"forecastStatus\": 1\n" +
- "\t\t\t\t},\n" +
- "\t\t\t\t{\n" +
- "\t\t\t\t\t\"label\": \"4\",\n" +
- "\t\t\t\t\t\"status\": 0,\n" +
- "\t\t\t\t\t\"forecastStatus\": 0\n" +
- "\t\t\t\t}\n" +
- "\t\t\t]\n" +
- "\t\t}\n" +
- "\t},\n" +
- "\t\"UZD2A9DF39435800149VR\": {\n" +
- "\t\t\"outRoom\": {\n" +
- "\t\t\t\"pumps\": [{\n" +
- "\t\t\t\t\t\"label\": \"1\",\n" +
- "\t\t\t\t\t\"status\": 1,\n" +
- "\t\t\t\t\t\"forecastStatus\": 1\n" +
- "\t\t\t\t},\n" +
- "\t\t\t\t{\n" +
- "\t\t\t\t\t\"label\": \"2\",\n" +
- "\t\t\t\t\t\"status\": 0,\n" +
- "\t\t\t\t\t\"forecastStatus\": 1\n" +
- "\t\t\t\t},\n" +
- "\t\t\t\t{\n" +
- "\t\t\t\t\t\"label\": \"3\",\n" +
- "\t\t\t\t\t\"status\": 1,\n" +
- "\t\t\t\t\t\"forecastStatus\": 1\n" +
- "\t\t\t\t}\n" +
- "\t\t\t]\n" +
- "\t\t}\n" +
- "\t}\n" +
- "}");
- //TODO 定时任务 定时添加小时预测数据 转变为之做假泵数据
- public static void insertYuceHourDataScheduled(String monthNow){
- System.out.println("添加预测小时数据正在进行"+ TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- try {
- //先获取配置项 根据配置项来进行逻辑添加
- List<Map<String, Object>> configRes = getWaterTapWaterApi().getWaterYuceConfig(false, 0, 0, "");
- if(!CollectionUtils.isEmpty(configRes)){
- Map<String,Object> config = configRes.get(0);
- Integer isMonth = Integer.valueOf(config.get("is_month").toString());//是否预测指定月份 0 是 1否(判定是否走自添加逻辑)
- String month = config.get("month").toString();//指定预测年月 yyyy-mm
- Double randomCode = Double.valueOf(config.get("random_code").toString());//上下随机石百分比范围
- if(isMonth==0){
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- String yearStr = month.split("-")[0];
- String monthStr = month.split("-")[1];
- if(!StringUtils.isEmpty(monthNow)){
- yearStr = monthNow.split("-")[0];
- monthStr = monthNow.split("-")[1];
- }
- LocalDateTime startDate = LocalDateTime.now().withYear(Integer.valueOf(yearStr))
- .withMonth(Integer.valueOf(monthStr)).withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0).withNano(0);
- LocalDateTime endDate = startDate.with(TemporalAdjusters.lastDayOfMonth());
- //TODO 总体逻辑: 先查询指定月上一年的每日平均值,再按平均值的上下啊百分比去插入指定的浮动百分比数据
- List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList(null);
- List<Map<String,Object>> orgConfig = getWaterTapWaterApi().getOrgConfig(false,0,0,"");
- if (!CollectionUtils.isEmpty(configList)&&!CollectionUtils.isEmpty(orgConfig)) {
- //TODO 按照组织机构分组
- Map<Object, List<Map<String, Object>>> groupedData =
- configList.stream().collect(Collectors.groupingBy(item -> item.get("zone_name")));
- final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
- for (Object key:groupedData.keySet()){
- try {
- new Thread(() -> {
- //TODO ①计算水厂指定月每日小时平均值
- List<Map<String,Object>> recordAllRes = getWaterTapWaterApi()
- .getWaterCollectionRecordAllListAll(" WHERE 1=1 " +
- " AND zone_name = '"+key.toString()+"'" +
- " AND \"time\"::timestamp BETWEEN '"+startDate.format(formatter)+"' "+"AND '"+endDate.format(formatter)+"'");
- if(!CollectionUtils.isEmpty(recordAllRes)){
- int days = YearMonth.from(startDate).lengthOfMonth();//获取月份天数
- for (int i = 1;i<=days;i++){
- LocalDateTime dateNow = startDate.withDayOfMonth(i);//当前循环时间
- if(dateNow.isAfter(LocalDateTime.now().minusDays(-4))){
- break;
- }
- Double numAll = null;//日总值
- Double numAge = 0.0;//日小时平均值
- //TODO 筛选成当前循环时间的集合
- List<Map<String,Object>> daysRecord = recordAllRes.stream().filter(item ->
- LocalDateTime.parse(item.get("time").toString(),formatter).toLocalDate().equals(dateNow.toLocalDate()))
- .collect(Collectors.toList());
- for (Map<String,Object> mapEntity : daysRecord){
- //TODO 筛选与当前天一样的数据在计算平均值,数据相加除以24小时 (recordAllRes内容为这天每小时的数据)
- if(numAll == null){
- numAll = 0.0;
- }
- numAll += Double.valueOf(mapEntity.get("value").toString());
- }
- if (numAll!=null){
- numAge = numAll/24;
- }
- //TODO ②正式插入或修改日预测数据表
- Optional<String> orgIdOptional = orgConfig.stream()
- .filter(item -> key.toString().replace("(", "(").replace(")", ")")
- .equals(item.get("zone_name").toString().replace("(", "(").replace(")", ")")))
- .map(item -> item.get("zone_id").toString())
- .findFirst();
- String orgId = orgIdOptional.orElse(null);
- if (!StringUtils.isEmpty(orgId)) {
- //预测比对时间如果不是今年,则变为今年的时间
- //TODO 取泵号
- JSONObject pumpItem = (JSONObject)pumpObj.get(orgId);
- JSONArray pumpArray = ((JSONObject)pumpItem.get("outRoom")).getJSONArray("pumps");
- //TODO 一天24小时
- for (int j = 0;j<24;j++) {
- //TODO 数据库操作 A 调用对应时间预测接口 B 在修改实际值
- DateTimeFormatter formatter2 = DateTimeFormatter.ofPattern("yyyy-MM-dd");
- DateTimeFormatter formatter3 = DateTimeFormatter.ofPattern("HH:mm:ss");
- String dateStr = dateNow.withYear(LocalDateTime.now().getYear()).toLocalDate().format(formatter2);
- // 创建一个LocalTime实例,设置为指定小时,分钟和秒数为0
- LocalTime time = LocalTime.of(j, 0, 0);
- // 创建一个DateTimeFormatter用于格式化时间
- String timeStr = time.format(formatter3);
- // //TODO A
- // JP3TPHour tpHour = new JP3TPHour();
- // tpHour.setOrgId(orgId);
- // tpHour.setDate(dateStr);
- // try {
- // ResponseRes<String> tpRes = ServiceMgrProxy.getInstance()
- // .applyCloud3tpServiceApi().dayHourPredictSupply(tpHour);
- // if(ResponseCode.RESULT_BAD.toStrCode().equals(tpRes.getRescode())){
- // LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- // , mStrClassName
- // , mStrClassName
- // , String.format("预测小时数据"+dateStr+"调用失败 ERROR:{%s} ",
- // tpRes.getResdata()));
- // break;
- // }
- // }catch(Exception ex){
- // LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- // , mStrClassName
- // , mStrClassName
- // , String.format("预测小时数据"+dateStr+"调用异常 ERROR:{%s} ",
- // ex.getLocalizedMessage()));
- // }
- //TODO B
- Map<String, Object> recordAllEntity = new LinkedHashMap<>();//需要添加的实体数据,此处要用有序map
- recordAllEntity.put("date", dateStr);
- recordAllEntity.put("hour", timeStr);
- // recordAllEntity.put("HourForecastWaterWithdrawals", null);
- //TODO 随机数逻辑
- int randomUpOrDown = ThreadLocalRandom.current().nextInt(2);//随机向上或者向下 0表示向下 1表示向上
- Double randomWater = randomUpOrDown == 0 ?
- numAge - (numAge * (randomCode / 100)) :
- numAge + (numAge * (randomCode / 100));
- // recordAllEntity.put("HourForecastActualWaterSupply", randomWater);
- // recordAllEntity.put("HourActualWaterWithdrawals", "");
- //实际从该时间查询结果中得出
- List<Map<String,Object>> newRecordAllRes = getWaterTapWaterApi()
- .getWaterCollectionRecordAllListAll(" WHERE 1=1 " +
- " AND zone_name = '"+key.toString()+"'" +
- " AND \"time\" = '"+dateNow.withYear(LocalDateTime.now().getYear()).withHour(j).format(formatter)+"' ");
- String hourActualWaterSupply = null;
- if(!CollectionUtils.isEmpty(newRecordAllRes)){
- hourActualWaterSupply = newRecordAllRes.get(0).get("value").toString();
- }
- recordAllEntity.put("hour_actual_water_supply",hourActualWaterSupply);
- recordAllEntity.put("last_modify_time", LocalDateTime.now().format(formatter));
- // 如果找到了zone_id,就将其赋值给orgId,否则orgId为null
- recordAllEntity.put("zone_id", orgId);
- // //TODO 添加or修改
- // Integer insertRes = getWaterTapApi().insertOrUpdateTbmHourWater(recordAllEntity);
- // if (insertRes < 1) {
- // LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- // , mStrClassName
- // , mStrClassName
- // , String.format("Batch insertYuceDataScheduled ERROR:{%s} ",
- // "新增或修改失败"));
- // }
- //TODO ③ 泵集合数据添加或修改
- // 生成[1, pumpArray.size()]范围内的随机数
- int randomNum = (int) (Math.random() * (pumpArray.size() - 1)) + 1;
- List<Double> splitRandom = splitValueRandomlyWithZeros(Double.valueOf((hourActualWaterSupply==null
- ?"0.0":hourActualWaterSupply)),pumpArray.size(),randomNum);//实际值随机数集合
- List<Double> splitRandomYuce = splitValueRandomlyWithZeros((randomWater==null
- ?0.0:randomWater),pumpArray.size(),randomNum);//预测值随机数集合
- //泵集合
- int kIndex = 0;
- for (Object obj:pumpArray){
- JSONObject itemObj = (JSONObject)obj;
- Map<String,Object> map = new LinkedHashMap<>();
- //TODO 泵状态根据值是否为0设置
- map.put("Date", dateNow.withYear(LocalDateTime.now().getYear()).toLocalDate()
- .format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
- // 创建一个LocalTime实例,设置为指定小时,分钟和秒数为0
- // 创建一个DateTimeFormatter用于格式化时间
- map.put("Hour", time.format(formatter3));
- map.put("PumpID",itemObj.get("label"));
- if(splitRandom.get(kIndex)==0){
- //TODO 说明泵实际值为0,状态为关
- map.put("PumpStatus",0);
- }else{
- map.put("PumpStatus",1);
- }
- map.put("HourForecastActualWaterSupply",splitRandomYuce.get(kIndex));//预测泵的供水量
- map.put("PumpWater",splitRandom.get(kIndex));//实际泵的供水量
- if(splitRandom.get(kIndex)==0){
- //TODO 说明泵实际值为0,状态为关
- map.put("RealPumpStatus",0);
- }else{
- map.put("RealPumpStatus",1);
- }
- map.put("PumpEnergy",splitRandomYuce.get(kIndex)/1000*180);
- map.put("RealPumpEnergy",splitRandom.get(kIndex)/1000*200);
- map.put("LastModifyTime", LocalDateTime.now().format(formatter));
- map.put("orgId", orgId);
- //TODO 添加or修改
- Integer insertRes2 = getWaterTapWaterApi().insertOrUpdateTbmHourwaterWatersupply(map);
- if (insertRes2 < 1) {
- log.error(String.format("Batch insertOrUpdateTbmHourwaterWatersupply ERROR:{%s} ",
- "新增或修改失败"));
- }
- kIndex++;
- }
- }
- }
- }
- }
- latch.countDown();
- }).start();
- }catch(Exception ex){
- log.error(String.format("Batch insertYuceDataScheduled ERROR:{%s} ",
- ex.getLocalizedMessage()));
- }
- }
- try{latch.await();}catch(Exception ex){}
- }
- System.out.println("添加预测泵小时数据结束"+ TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- }else{
- }
- }
- }catch(Exception ex){
- ex.printStackTrace();
- }
- }
- //生成指定份额的随机数,并且带有0值
- public static List<Double> splitValueRandomlyWithZeros(double totalValue, int parts, int zeroCount) {
- if (zeroCount >= parts) {
- throw new IllegalArgumentException("Zero count must be less than the number of parts.");
- }
- Random random = new Random();
- List<Double> points = new ArrayList<>();
- int nonZeroParts = parts - zeroCount;
- // 添加初始点
- points.add(0.0);
- // 生成分割点
- for (int i = 0; i < nonZeroParts - 1; i++) {
- points.add(random.nextDouble() * totalValue);
- }
- // 添加终点
- points.add(totalValue);
- // 排序
- Collections.sort(points);
- // 计算每份的大小
- List<Double> splits = new ArrayList<>();
- for (int i = 1; i < points.size(); i++) {
- splits.add(points.get(i) - points.get(i - 1));
- }
- // 添加0值份额
- for (int i = 0; i < zeroCount; i++) {
- splits.add(0.0);
- }
- // 打乱列表,以确保0值分布均匀
- Collections.shuffle(splits);
- return splits;
- }
- }
|