package io.github.pnoker.gateway.bizmgr; import com.alibaba.fastjson.JSONObject; import io.github.pnoker.gateway.comtool.TimeTool; import io.github.pnoker.gateway.dbdao.DBMgrProxy; import io.github.pnoker.gateway.dbdao.zilaishuiSource.service.ZilaishuiRealListService; import org.influxdb.dto.QueryResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; import java.time.Duration; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; /** * @ClassName KprZilaishuiLevelBizFun * @Description: TODO 市自来水水位预测 * @Author LX * @Date 2024/12/11 * @Version V1.0 **/ public class KprZilaishuiLevelBizFun { private static final Logger log = LoggerFactory.getLogger(KprZilaishuiLevelBizFun.class); private final static String mStrClassName = "KprZilaishuiLevelBizFun"; private final static String EMPTY_NULL = "NULL"; public static ZilaishuiRealListService getZilaishuiApi(){ return DBMgrProxy.getInstance().applyZilaishuiDbApi(); } static DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); //TODO 初始化添加计算每个液位计站点每日的每小时液位量计算 public static void initWaterLevelReacordAll(String startFindTime){ //TODO ① 首先查询水厂设备配置信息 try { System.out.println("计算小时液位开始时间:"+ TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)); System.out.println("计算小时液位起始时间:("+startFindTime+")进行中:......"); List> configList = getZilaishuiApi().getWaterLevelCollectionConfigList(null); if (!CollectionUtils.isEmpty(configList)) { //TODO 按照设备分组 Map>> groupedData = configList.stream().collect(Collectors.groupingBy(item -> item.get("device_code"))); final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size()); //TODO 外层循环组织机构 for (Object key:groupedData.keySet()){ try{ new Thread(() -> { //TODO 根据当前配置信息item 查询远通数据中的历史数据 //TODO 首先查询当前水厂设备的从startFindTime之后到得到数据 LocalDateTime startDateTime = LocalDateTime.parse(startFindTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); //TODO 需计算的循环天数 Long days = 0L; days = Duration.between(startDateTime, LocalDateTime.now()).toDays(); days+=1; //TODO 此循环天数每一天所查的是所有设备每小时数据合 final CountDownLatch latch2 = new CountDownLatch(days.intValue()); for(Long k = 0L;k { List> deviceList = groupedData.get(key); //TODO 循环获取该天该水厂每个设备数据 //TODO 查询当前天日期内每小时的设备数据 for (int i = 0; i < 24; i++) { String startTime = newStartDateTime.withHour(i).format(formater); //TODO 需要加个05分把endTime的整点数据查出来 String endTime = newStartDateTime.minusHours(-(i + 1)).withMinute(5).withSecond(0).format(formater); //TODO 此循环计算该小时所有设备的用水量 for (Map item : deviceList) { // 定义字符串日期时间的格式 DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); // 解析字符串以创建 LocalDateTime 实例 LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter); Integer itemCount = getZilaishuiApi().getTabWaterHistoryCount( " AND TAG_CODE = '" + item.get("collcation_tag") + "' " + " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" + " and QCQUISITION_TIME <= to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')" + " AND ABS(" + " (QCQUISITION_TIME - TRUNC(QCQUISITION_TIME, 'HH')) * 24 * 60 " + " ) <= 5");//查询 -- 采集时间接近整点,误差在5分钟内 if (itemCount != null && itemCount > 0) { List> tapWaterHistoryList = getZilaishuiApi().getPageZILAISHUI_HISTORY2(itemCount, 0, " AND TAG_CODE = '" + item.get("collcation_tag") + "' " + " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" + " and QCQUISITION_TIME <= to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" + " AND ABS( " + " (QCQUISITION_TIME - TRUNC(QCQUISITION_TIME, 'HH')) * 24 * 60 " + " ) <= 5 " + " order by QCQUISITION_TIME"); //TODO 直接插入即可 查询已经筛选出两条了 if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) { //TODO 数据库操作 for (Map mapEntity:tapWaterHistoryList) { Map recordAllEntity = new HashMap<>();//需要添加的实体数据 recordAllEntity.put("device_code", groupedData.get(key).get(0).get("device_code")); recordAllEntity.put("value", mapEntity.get("VAL")); recordAllEntity.put("time", LocalDateTime.parse(mapEntity.get("QCQUISITION_TIME").toString(), formatter) .withMinute(0).withSecond(0).format(formatter));//采集时间(小时的最后时间) recordAllEntity.put("value_tag", "level"); List> queryLevelRecord = getZilaishuiApi().getWaterLevelRecordAllList(1, 0, " WHERE device_code = '" + recordAllEntity.get("device_code") + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'"); if (CollectionUtils.isEmpty(queryLevelRecord)) { //TODO 说明不存在,进行插入 if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) { int insertCode = getZilaishuiApi().insertWaterLevelRecordAll(" (" + "'" + recordAllEntity.get("device_code") + "'," + "'" + recordAllEntity.get("value") + "'," + "'" + recordAllEntity.get("value_tag") + "'," + "'" + recordAllEntity.get("time") + "'" + ") "); if (insertCode < 0) { log.error(String.format("Batch insertWaterLevelRecordAll 未成功:{%s} ", JSONObject.toJSON(recordAllEntity))); } } }else{ //TODO 说明存在,进行修改 if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) { int updateCode = getZilaishuiApi().updateWaterLevelRecordAll(String.valueOf(recordAllEntity.get("value"))," WHERE " + "(" + " device_code = '" + recordAllEntity.get("device_code") + "' and" + " \"time\" = '" + recordAllEntity.get("time") + "' and" + " value_tag = '" + recordAllEntity.get("value_tag") + "'" + ") "); if (updateCode < 0) { log.error(String.format("Batch updateWaterLevelRecordAll 未成功:{%s} ", JSONObject.toJSON(recordAllEntity))); } } } } } } } } latch2.countDown(); }).start(); }catch(Exception ex){ } } try { latch2.await(); }catch(Exception ex){ } latch.countDown(); }).start(); }catch(Exception ex){ log.error(String.format("Batch" + " initWaterReacordAll ERROR:{%s} " , ex.getLocalizedMessage())); } } latch.await(); System.out.println("计算小时液位检查机制("+startFindTime+")结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)); } }catch(Exception ex){ log.error(String.format("Batch initWaterCollecationReacordAll ERROR:{%s} " , ex.getLocalizedMessage())); } } public static void insertDailyData(){ KprZilaishuiLevelBizFun.getZilaishuiApi().insertDailyData(); } public static void insertForecastData(){ KprZilaishuiLevelBizFun.getZilaishuiApi().insertForecastData(); } }