package com.shkpr.service.aimodelpower.bizmgr; import com.alibaba.fastjson.JSONObject; import com.global.base.log.LogLevelFlag; import com.global.base.log.LogPrintMgr; import com.global.base.tools.FastJsonUtil; import com.shkpr.service.aimodelpower.commtools.TimeTool; import com.shkpr.service.aimodelpower.dbdao.DBMgrProxy; import com.shkpr.service.aimodelpower.dbdao.shizilaishuiDataSource.service.intef.WaterCollecationService; import com.shkpr.service.aimodelpower.dbdao.shizilaishuiDataSource.service.intef.WaterTapWaterService; import com.shkpr.service.aimodelpower.dto.TraceRunnable; import com.shkpr.service.aimodelpower.globalmgr.ThreadTaskMgr; import com.shkpr.service.aimodelpower.globalmgr.TraceLogMgr; import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.time.Duration; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; /** * @ClassName KprAimTapWaterBizFun * @Description: TODO * @Author LX * @Date 2024/5/22 * @Version V1.0 **/ public class KprAimTapWaterBizFun { private static final String MSG_SUCCESS = "success."; private static final String MSG_FAILED = "failed."; private static final String mStrClassName = "KprAimTapWaterBizFun"; private static final String EMPTY_NULL = "NULL"; public static WaterTapWaterService getWaterTapWaterApi(){ return DBMgrProxy.getInstance().applyWaterTapWaterService(); } public static WaterCollecationService getWaterCollecationApi(){ return DBMgrProxy.getInstance().applyWaterCollecationService(); } static DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); //TODO 15分间隔的预测插值(做假暂时用,不用时注释) //TODO 思路:等预测小时的原程序完成后, 原预测表中为小时预测数据,分钟字段为空,将预测表中的数据进行二次清洗然后插入新预测表 //TODO beforeDays为往后推清洗多少天 n-1为逻辑, 例如传1就是0天当天 public static void yuceZuojia(int beforeDays){ try{ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"预测15分钟插值开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"预测15分钟插值进行中:......"); List> orgConfig = getWaterCollecationApi().getOrgConfig(false, 0, 0, " AND org_name not like '%营业所' and (org_name!='北碚水厂' and org_name!='渝中区水厂')"); final CountDownLatch latch = new CountDownLatch(orgConfig.size()); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); LocalDateTime startDate = LocalDateTime.now().withMinute(0).withSecond(0); for (Map org:orgConfig) { ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) { @Override public void function() { try { for (int dayNum=0;dayNum> resHourList = getWaterCollecationApi().getTbMHourwater(false, 20, 0, " AND \"orgId\" = '" + org.get("org_id") + "' AND \"Date\" = '" + startDate.minusDays(-dayNum).format(formatter) + "' ORDER BY \"Date\",\"Hour\" ASC "); processWaterData(resHourList); } latch.countDown(); }catch(Exception ex){ ex.printStackTrace(); } } }); } latch.await(); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"预测15分钟插值已结束"); }catch(Exception ex){ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR , mStrClassName , mStrClassName , String.format("Batch yuceZuojia ERROR:{%s} " , ex.getLocalizedMessage())); } } private static DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); private static DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss"); public static void processWaterData(List> resHourList) { List>> trendGroups = new ArrayList<>(); List> currentGroup = new ArrayList<>(); for (int i = 0; i < resHourList.size(); i++) { Map currentMap = resHourList.get(i); double currentHourData = getDoubleValue(currentMap, "HourForecastActualWaterSupply"); if (i == 0) { currentGroup.add(currentMap); } else { double previousHourData = getDoubleValue(resHourList.get(i - 1), "HourForecastActualWaterSupply"); boolean isIncreasing = currentHourData > previousHourData; if (currentGroup.size() > 0) { double lastGroupData = getDoubleValue(currentGroup.get(currentGroup.size() - 1), "HourForecastActualWaterSupply"); boolean wasIncreasing = lastGroupData < currentHourData; if (isIncreasing != wasIncreasing) { trendGroups.add(new ArrayList<>(currentGroup)); currentGroup.clear(); } } currentGroup.add(currentMap); } } if (!currentGroup.isEmpty()) { trendGroups.add(currentGroup); } for (List> group : trendGroups) { for (int i = 0; i < group.size() - 1; i++) { Map currentMap = group.get(i); double currentHourData = getDoubleValue(currentMap, "HourForecastActualWaterSupply"); double nextHourData = getDoubleValue(group.get(i + 1), "HourForecastActualWaterSupply"); boolean isIncreasing = nextHourData > currentHourData; double[] splitData = splitHourData(currentHourData, isIncreasing); // 步骤2:平滑处理 double alpha = 0.3; // 平滑系数 double[] smoothedData = exponentialSmoothing(splitData,alpha); // 解析日期字符串 LocalDate date = LocalDate.parse(currentMap.get("Date").toString(), DateTimeFormatter.ISO_LOCAL_DATE); // 解析时间字符串 LocalTime time = LocalTime.parse(currentMap.get("Hour").toString(), DateTimeFormatter.ISO_LOCAL_TIME); LocalDateTime dateTime = LocalDateTime.of(date, time).minusMinutes(45); for (int j = 0; j < smoothedData.length; j++) { Map newMap = new HashMap<>(currentMap); newMap.put("HourForecastActualWaterSupply", smoothedData[j]); LocalDateTime quarterHourTime = dateTime.plusMinutes(j * 15); newMap.put("Date", quarterHourTime.format(dateFormatter)); newMap.put("Hour", quarterHourTime.format(timeFormatter)); // Here you would query the database and insert/update data // For example: double originalValue = queryOriginalValue(quarterHourTime); newMap.put("HourActualWaterSupply", originalValue); // Retain only specific keys Map filteredMap = filterMap(newMap, "Date", "Hour", "HourForecastActualWaterSupply", "HourActualWaterSupply", "LastModifyTime", "orgId"); // Insert or update the database // int insertRes = insertOrUpdateDatabase(filteredMap); // Handle the result if needed Integer insertRes = getWaterTapWaterApi().insertOrUpdateTbmHourWaterNew(filteredMap); if (insertRes < 1) { LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR , mStrClassName , mStrClassName , String.format("Batch yuceZuojia ERROR:{%s} ", "新增或修改失败")); } } } } } private static double getDoubleValue(Map map, String key) { Object value = map.get(key); return value != null ? Double.parseDouble(value.toString()) : 0.0; } //拆分函数 private static double[] splitHourData(double hourData, boolean isIncreasing) { double[] splitData = new double[4]; if (isIncreasing) { // 调整比例以实现更平滑的上升曲线 splitData[0] = hourData * 0.24; splitData[1] = hourData * 0.26; splitData[2] = hourData * 0.25; splitData[3] = hourData * 0.25; } else { // 调整比例以实现更平滑的下降曲线 splitData[0] = hourData * 0.25; splitData[1] = hourData * 0.25; splitData[2] = hourData * 0.26; splitData[3] = hourData * 0.24; } return splitData; } //平滑函数 private static double[] smoothData(double[] data) { double[] smoothedData = new double[data.length]; int windowSize = 3; // 移动平均窗口大小 for (int i = 0; i < data.length; i++) { double sum = 0; int count = 0; for (int j = Math.max(0, i - windowSize / 2); j <= Math.min(data.length - 1, i + windowSize / 2); j++) { sum += data[j]; count++; } smoothedData[i] = sum / count; } return smoothedData; } //指数平滑 private static double[] exponentialSmoothing(double[] data, double alpha) { double[] smoothedData = new double[data.length]; smoothedData[0] = data[0]; // 初始化第一个值 for (int i = 1; i < data.length; i++) { smoothedData[i] = alpha * data[i] + (1 - alpha) * smoothedData[i - 1]; } return smoothedData; } private static double queryOriginalValue(LocalDateTime dateTime) { // Simulate querying the database for the original value // Replace with actual database query return 100.0; // Placeholder value } private static Map filterMap(Map map, String... keys) { Map filteredMap = new LinkedHashMap<>(); for (String key : keys) { if (map.containsKey(key)) { filteredMap.put(key, map.get(key)); } } return filteredMap; } private static Map> parseConfig(String configstr) { // Initialize the map to hold the parsed configuration Map> configMap = new HashMap<>(); // Get the configuration string String configString = configstr; if (configString != null) { // Split the configuration by different locations String[] locations = configString.split("},"); for (String location : locations) { // Find the index of the opening brace int braceIndex = location.indexOf('{'); if (braceIndex > 0) { // Extract the location name String locationName = location.substring(0,braceIndex).trim(); // Extract the tags and split by comma String tagsString = location.substring(braceIndex + 1).replace("}", "").trim(); List tags = Arrays.asList(tagsString.split(",")); // Add to the map configMap.put(locationName, tags); } } } return configMap; } //TODO 小时营业所片区用水量(自供+(供入-供出)) //TODO beforHour是小时数,当前时间往前扣多少个小时 按15分钟刻度计算用水量 public static void checkBusinessRecordAllData(int beforHour,String selfconfessStr,String supplyinStr,String confessStr){ //TODO 检查小时用水量 DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); try{ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算营业所小时用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算营业所小时用水量进行中:......"); //获取配置关系 //TODO 自供 Map> selfconfess = parseConfig(selfconfessStr); //TODO 供入 Map> supplyin = parseConfig(supplyinStr); //TODO 供出 Map> confess = parseConfig(confessStr); //TODO 根据当前配置信息item 查询远通数据中的历史数据 //TODO 首先查询当前水厂设备的从昨天之后到得到数据的数据 LocalDateTime today = LocalDateTime.now(); LocalDateTime startDateTime = today.withMinute(0).withSecond(0).withNano(0).minusHours(beforHour); //TODO 需计算的循环天数 //TODO 此循环天数每一天所查的是所有设备每小时数据合 LocalDateTime newStartDateTime = startDateTime; String startDate = newStartDateTime.format(formater); String endDate = today.withMinute(1).withSecond(0).format(formater); final CountDownLatch latch = new CountDownLatch(selfconfess.keySet().size()); for (String orgName:selfconfess.keySet()) { try { ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) { @Override public void function() { for (int i = 0; i < Integer.valueOf(String.valueOf(Duration.between(startDateTime, today.withSecond(0)).toMinutes() / 15)); i++) { String startTime = newStartDateTime.minusMinutes(-(i * 15)).format(formater); String endTime = newStartDateTime.minusMinutes(-1).minusMinutes(-((i + 1) * 15)).format(formater);//查询时间加一分钟 Map recordAllEntity = new HashMap<>();//需要添加的实体数据 recordAllEntity.put("org_name", orgName);//水厂 recordAllEntity.put("time", newStartDateTime.minusMinutes(-((i + 1) * 15)).format(formater));//采集时间(小时的最后时间) recordAllEntity.put("value", null); recordAllEntity.put("value_tag", "water"); recordAllEntity.put("collcation_tag_array", ""); //TODO 此循环计算该小时所有设备的用水量 //TODO 先查各个营业所的自供值 List tagTags = selfconfess.get(orgName);//自供 List tagTags2 = supplyin.get(orgName);//供入 List tagTags3 = confess.get(orgName);//供出 Double selfValue = null; Double supplyInValue = null; Double confessValue = null; try { selfValue = selfconfess(tagTags, startDate, endDate, startTime, endTime);//自供 supplyInValue = selfconfess(tagTags2, startDate, endDate, startTime, endTime);//供入 confessValue = selfconfess(tagTags3, startDate, endDate, startTime, endTime);//供出 } catch (Exception ex) { } Double value = null;//总计算值 if (selfValue != null) { if (supplyInValue != null && confessValue == null) { value = selfValue + supplyInValue; } else if (supplyInValue != null && confessValue != null) { value = selfValue + supplyInValue - confessValue; } else if (supplyInValue == null && confessValue == null) { value = selfValue; } else if (supplyInValue == null && confessValue != null) { value = selfValue - confessValue; } } recordAllEntity.put("value", value); System.out.println("营业所片区" + orgName + "值:" + value + ",采集时间:" + newStartDateTime.minusMinutes(-((i + 1) * 15)).format(formater)); List> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllListNew(1, 0, " WHERE org_name = '" + recordAllEntity.get("org_name") + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'"); if (CollectionUtils.isEmpty(queryWaterRecord)) { // //TODO 说明不存在,进行插入 if (!ObjectUtils.isEmpty(recordAllEntity.get("value"))) { int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAllNew(" (" + "'" + recordAllEntity.get("org_name") + "'," + "'" + recordAllEntity.get("time") + "'," + "'" + recordAllEntity.get("value") + "'," + "'" + recordAllEntity.get("value_tag") + "'," + "'" + recordAllEntity.get("collcation_tag_array") + "'" + ") "); if (insertCode < 0) { LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR , mStrClassName , mStrClassName , String.format("Batch initTapWaterDataThread 未成功:{%s} ", FastJsonUtil.toJSON(recordAllEntity))); } } } else { //TODO 说明存在,进行修改 if (!ObjectUtils.isEmpty(recordAllEntity.get("value"))) { int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAllNew(String.valueOf(value), " WHERE " + "(" + " org_name = '" + recordAllEntity.get("org_name") + "' and" + " \"time\" = '" + recordAllEntity.get("time") + "' and" + " value_tag = '" + recordAllEntity.get("value_tag") + "'" + ") "); if (updateCode < 0) { LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR , mStrClassName , mStrClassName , String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ", FastJsonUtil.toJSON(recordAllEntity))); } } } } latch.countDown(); } }); }catch(Exception ex){ } } latch.await(); }catch(Exception ex){ ex.printStackTrace(); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR , mStrClassName , mStrClassName , String.format("Batch checkBusinessRecordAllData ERROR:{%s} " , ex.getLocalizedMessage())); } } //TODO 计算总值 private static Double selfconfess(List tagTags,String startDate,String endDate,String startTime,String endTime) throws Exception{ Double value = null; if (tagTags==null){ return value; } for (String tagTag : tagTags) { // 定义字符串日期时间的格式 DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); // 解析字符串以创建 LocalDateTime 实例 LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter); if (dateTime.getHour()==1&&("OPC_SC.HSS.HSS1.AI.LJLL1.F_CV".equals(tagTag) || "OPC_SC.HSS.HSS1.AI.LJLL2.F_CV".equals(tagTag) || "OPC_SC.HSS.HSS1.AI.LJLL3.F_CV".equals(tagTag))) { Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount( " WHERE TAG_CODE = '" + tagTag + "' " + " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" + " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')"); if(itemCount!=null&&itemCount>0) { List> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0, " AND TAG_CODE = '" + tagTag + "' " + " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" + " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" + " order by QCQUISITION_TIME"); if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) { if (value == null) { value = 0.00; } value += tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null; } } }else { Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount( " WHERE TAG_CODE = '" + tagTag + "' " + " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" + " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')"); if (itemCount != null && itemCount > 0) { List> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0, " AND TAG_CODE = '" + tagTag + "' " + " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" + " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" + " order by QCQUISITION_TIME"); //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量 if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) { Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null; Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null; if (firstValue != null && lastValue != null) { //到此处是该小时一个设备的用水量已加上 if (value == null) { value = 0.00; } value += Math.abs(lastValue - firstValue); } } } } } return value; } //TODO 15分钟 分区用水量 每天整点10分检查并写入,检查范围为当前时间到昨天23点 public static void checkRecordAllDataBusinessFenqu(int beforHour){ //TODO 检查小时用水量 DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算15分钟用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算15分钟用水量进行中:......"); List> configList = getWaterTapWaterApi().getWaterCollectionConfigList("WHERE 1=1 AND org_name not like '%营业所' AND org_name!='北碚水厂' AND org_name!='渝中区水厂'"); if (!CollectionUtils.isEmpty(configList)) { //TODO 按照组织机构分组 Map>> groupedData = configList.stream().collect(Collectors.groupingBy(item -> item.get("org_name"))); final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size()); for (Object key:groupedData.keySet()){ try { ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) { @Override public void function() { //TODO 根据当前配置信息item 查询远通数据中的历史数据 //TODO 首先查询当前水厂设备的从昨天之后到得到数据的数据 LocalDateTime today = LocalDateTime.now(); LocalDateTime startDateTime = today.withMinute(0).withSecond(0).withNano(0).minusHours(beforHour); //TODO 需计算的循环天数 //TODO 此循环天数每一天所查的是所有设备每小时数据合 LocalDateTime newStartDateTime = startDateTime; String startDate = newStartDateTime.format(formater); String endDate = today.withMinute(1).withSecond(0).format(formater); List> deviceList = groupedData.get(key); //TODO 循环获取该天该水厂每个设备数据 //TODO 查询当前天日期内每小时的设备数据 for (int i = 0; i < Integer.valueOf(String.valueOf(Duration.between(startDateTime, today.withSecond(0)).toMinutes()/15)); i++) { String startTime = newStartDateTime.minusMinutes(-(i * 15)).format(formater); String endTime = newStartDateTime.minusMinutes(-1).minusMinutes(-((i + 1)*15)).format(formater);//查询时间加一分钟 Map recordAllEntity = new HashMap<>();//需要添加的实体数据 recordAllEntity.put("org_name", key.toString());//水厂 recordAllEntity.put("time", newStartDateTime.minusMinutes(-((i + 1)*15)).format(formater));//采集时间(小时的最后时间) recordAllEntity.put("value", null); recordAllEntity.put("value_tag", "water"); recordAllEntity.put("collcation_tag_array", FastJsonUtil.toJSON(deviceList)); Double value = null; //TODO 此循环计算该小时所有设备的用水量 for (Map item : deviceList) { //TODO 和尚山水厂的三个OPC的设备0点到1点的数据特殊处理,直接取最后一条数据作为0点到1点的用水量 // 定义字符串日期时间的格式 DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); // 解析字符串以创建 LocalDateTime 实例 LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter); if (dateTime.getHour()==1&&("OPC_SC.HSS.HSS1.AI.LJLL1.F_CV".equals(item.get("collcation_tag").toString()) || "OPC_SC.HSS.HSS1.AI.LJLL2.F_CV".equals(item.get("collcation_tag").toString()) || "OPC_SC.HSS.HSS1.AI.LJLL3.F_CV".equals(item.get("collcation_tag").toString()))) { Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount( " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " + " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" + " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')"); if(itemCount!=null&&itemCount>0) { List> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0, " AND TAG_CODE = '" + item.get("collcation_tag") + "' " + " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" + " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" + " order by QCQUISITION_TIME"); if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) { if (value == null) { value = 0.00; } value += tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null; } } }else { Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount( " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " + " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" + " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')"); if (itemCount != null && itemCount > 0) { List> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0, " AND TAG_CODE = '" + item.get("collcation_tag") + "' " + " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" + " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" + " order by QCQUISITION_TIME"); //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量 if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) { Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null; Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null; if (firstValue != null && lastValue != null) { //到此处是该小时一个设备的用水量已加上 if (value == null) { value = 0.00; } value += Math.abs(lastValue - firstValue); } } } } } recordAllEntity.put("value", value); List> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllListNew(1, 0, " WHERE org_name = '" + recordAllEntity.get("org_name") + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'"); if (CollectionUtils.isEmpty(queryWaterRecord)) { //TODO 说明不存在,进行插入 if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) { int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAllNew(" (" + "'" + recordAllEntity.get("org_name") + "'," + "'" + recordAllEntity.get("time") + "'," + "'" + recordAllEntity.get("value") + "'," + "'" + recordAllEntity.get("value_tag") + "'," + "'" + recordAllEntity.get("collcation_tag_array") + "'" + ") "); if (insertCode < 0) { LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR , mStrClassName , mStrClassName , String.format("Batch initTapWaterDataThread 未成功:{%s} ", FastJsonUtil.toJSON(recordAllEntity))); } } }else{ //TODO 说明存在,进行修改 if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) { int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAllNew(String.valueOf(value)," WHERE " + "(" + " org_name = '" + recordAllEntity.get("org_name") + "' and" + " \"time\" = '" + recordAllEntity.get("time") + "' and" + " value_tag = '" + recordAllEntity.get("value_tag") + "'" + ") "); if (updateCode < 0) { LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR , mStrClassName , mStrClassName , String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ", FastJsonUtil.toJSON(recordAllEntity))); } } } } latch.countDown(); } }); }catch(Exception ex){} } try{latch.await();}catch(Exception ex){} LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算15分钟用水量结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)); } } //TODO 小时用水量 每天整点10分检查并写入,检查范围为当前时间到昨天23点 public static void checkRecordAllData(int beforHour){ //TODO 检查小时用水量 DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算小时用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算小时用水量进行中:......"); List> configList = getWaterTapWaterApi().getWaterCollectionConfigList("WHERE 1=1 AND org_name not like '%营业所' AND org_name!='北碚水厂' AND org_name!='渝中区水厂'"); if (!CollectionUtils.isEmpty(configList)) { //TODO 按照组织机构分组 Map>> groupedData = configList.stream().collect(Collectors.groupingBy(item -> item.get("org_name"))); final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size()); for (Object key:groupedData.keySet()){ try { ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) { @Override public void function() { //TODO 根据当前配置信息item 查询远通数据中的历史数据 //TODO 首先查询当前水厂设备的从昨天之后到得到数据的数据 LocalDateTime today = LocalDateTime.now(); LocalDateTime startDateTime = today.withMinute(0).withSecond(0).withNano(0).minusHours(beforHour); //TODO 需计算的循环天数 //TODO 此循环天数每一天所查的是所有设备每小时数据合 LocalDateTime newStartDateTime = startDateTime; String startDate = newStartDateTime.format(formater); String endDate = today.withMinute(1).withSecond(0).format(formater); List> deviceList = groupedData.get(key); //TODO 循环获取该天该水厂每个设备数据 //TODO 查询当前天日期内每小时的设备数据 for (int i = 0; i < Integer.valueOf(String.valueOf(Duration.between(startDateTime, today.withSecond(0)).toMinutes()/15)); i++) { String startTime = newStartDateTime.minusHours(-i).format(formater); String endTime = newStartDateTime.minusMinutes(-1).minusHours(-(i+1)).format(formater);//查询时间加一分钟 Map recordAllEntity = new HashMap<>();//需要添加的实体数据 recordAllEntity.put("org_name", key.toString());//水厂 recordAllEntity.put("time", newStartDateTime.minusHours(-(i+1)).format(formater));//采集时间(小时的最后时间) recordAllEntity.put("value", null); recordAllEntity.put("value_tag", "water"); recordAllEntity.put("collcation_tag_array", FastJsonUtil.toJSON(deviceList)); Double value = null; //TODO 此循环计算该小时所有设备的用水量 for (Map item : deviceList) { //TODO 和尚山水厂的三个OPC的设备0点到1点的数据特殊处理,直接取最后一条数据作为0点到1点的用水量 // 定义字符串日期时间的格式 DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); // 解析字符串以创建 LocalDateTime 实例 LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter); if (dateTime.getHour()==1&&("OPC_SC.HSS.HSS1.AI.LJLL1.F_CV".equals(item.get("collcation_tag").toString()) || "OPC_SC.HSS.HSS1.AI.LJLL2.F_CV".equals(item.get("collcation_tag").toString()) || "OPC_SC.HSS.HSS1.AI.LJLL3.F_CV".equals(item.get("collcation_tag").toString()))) { Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount( " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " + " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" + " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')"); if(itemCount!=null&&itemCount>0) { List> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0, " AND TAG_CODE = '" + item.get("collcation_tag") + "' " + " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" + " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" + " order by QCQUISITION_TIME"); if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) { if (value == null) { value = 0.00; } value += tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null; } } }else { Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount( " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " + " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" + " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')"); if (itemCount != null && itemCount > 0) { List> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0, " AND TAG_CODE = '" + item.get("collcation_tag") + "' " + " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" + " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" + " order by QCQUISITION_TIME"); //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量 if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) { Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null; Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null; if (firstValue != null && lastValue != null) { //到此处是该小时一个设备的用水量已加上 if (value == null) { value = 0.00; } value += Math.abs(lastValue - firstValue); } } } } } recordAllEntity.put("value", value); List> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllList(1, 0, " WHERE org_name = '" + recordAllEntity.get("org_name") + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'"); if (CollectionUtils.isEmpty(queryWaterRecord)) { //TODO 说明不存在,进行插入 if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) { int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAll(" (" + "'" + recordAllEntity.get("org_name") + "'," + "'" + recordAllEntity.get("time") + "'," + "'" + recordAllEntity.get("value") + "'," + "'" + recordAllEntity.get("value_tag") + "'," + "'" + recordAllEntity.get("collcation_tag_array") + "'" + ") "); if (insertCode < 0) { LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR , mStrClassName , mStrClassName , String.format("Batch initTapWaterDataThread 未成功:{%s} ", FastJsonUtil.toJSON(recordAllEntity))); } } }else{ //TODO 说明存在,进行修改 if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) { int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAll(String.valueOf(value)," WHERE " + "(" + " org_name = '" + recordAllEntity.get("org_name") + "' and" + " \"time\" = '" + recordAllEntity.get("time") + "' and" + " value_tag = '" + recordAllEntity.get("value_tag") + "'" + ") "); if (updateCode < 0) { LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR , mStrClassName , mStrClassName , String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ", FastJsonUtil.toJSON(recordAllEntity))); } } } } latch.countDown(); } }); }catch(Exception ex){} } try{latch.await();}catch(Exception ex){} LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算小时用水量结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)); } } //TODO 原始历史数据 每天整点15分检查并写入,检查范围为当前时间到昨天 public static void checkRecordData(){ //TODO ① 首先查询水厂设备配置信息 try { List> configList = getWaterTapWaterApi().getWaterCollectionConfigList("WHERE 1=1 AND org_name not like '%营业所' AND org_name!='北碚水厂' AND org_name!='渝中区水厂'"); if(!CollectionUtils.isEmpty(configList)){ //声明总数据的数据数组 List> newRecordAll = Collections.synchronizedList(new LinkedList>()); //TODO ②开启多线程并发处理各水厂设备的数据 LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制--开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制--进行中:......"); final CountDownLatch latch = new CountDownLatch(configList.size()); for(Map item:configList){ try{ ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) { @Override public void function() { //TODO 根据当前配置信息item 查询远通数据中的历史数据 //TODO 首先查询当前水厂设备的昨天0点之后到当前时间最新数据的数据 LocalDateTime today = LocalDateTime.now(); LocalDateTime yesterday = today.minusDays(1).withHour(0).withMinute(0).withSecond(0).withNano(0); Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(" WHERE TAG_CODE = '"+item.get("collcation_tag")+"' " + " and QCQUISITION_TIME >= to_date('"+yesterday.format(formater)+"', 'yyyy-mm-dd hh24:mi:ss')"); if(itemCount!=null&&itemCount>0) { //TODO 优化 以分页方式查询所有,初始分页行数定为2000查询速率较好 int pageNum = itemCount%2000==0?itemCount/2000:(itemCount/2000)+1;//总页数 Integer limit = 2000; if(pageNum<=1){ limit = itemCount;//说明总数比第一页小 } for (int i = 0; i < pageNum; i++) { Integer offset = i*limit; //tapWaterHistoryList 为远通水量数据源 //TODO 因为是orcale数据库 分页方式以伪列rownum为分页条件,因此limit实际上永远比offset大2000,因此只要不是第一页,limit=offset+2000 Integer limitNew = 0; if(i>0){ limitNew = offset+2000; }else{ limitNew = 2000; } List> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(limitNew,offset," " + "AND TAG_CODE = '"+item.get("collcation_tag")+"' " + "and QCQUISITION_TIME >= to_date('"+yesterday.format(formater)+"', 'yyyy-mm-dd hh24:mi:ss')"); if(!CollectionUtils.isEmpty(tapWaterHistoryList)){ //TODO 循环远通水量数据列表,查询数据不存在的话就插入 for (int j = 0; j < tapWaterHistoryList.size(); j++) { List> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordList(1,0, " WHERE collcation_tag = '"+tapWaterHistoryList.get(j).get("TAG_CODE") +"' AND org_name = '"+tapWaterHistoryList.get(j).get("NAME")+"' AND time = '"+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+"'"); if(CollectionUtils.isEmpty(queryWaterRecord)){ //TODO 说明没插入过本系统,执行插入 String extend = " ('"+tapWaterHistoryList.get(j).get("TAG_CODE")+"','"+tapWaterHistoryList.get(j).get("NAME")+"'," + ""+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+"," + "'"+tapWaterHistoryList.get(j).get("VAL")+"'," + TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("UPDATE_TIME").toString()) +") "; int insertCode = getWaterTapWaterApi().insertWaterCollectionRecord(extend); if(insertCode<=0){ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR , mStrClassName , mStrClassName , String.format("Batch initTapWaterDataThread 未成功:{%s} ", extend)); } } } } } } latch.countDown(); } }); }catch(Exception ex){ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR , mStrClassName , mStrClassName , String.format("Batch initTapWaterDataThread 水厂:{%s},标识:{%s} ERROR:{%s} ", item.get("org_name"), item.get("collcation_tag") , ex.getLocalizedMessage())); } } latch.await(); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制--结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)); } }catch(Exception ex){ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR , mStrClassName , mStrClassName , String.format("Batch initTapWaterData ERROR:{%s} " , ex.getLocalizedMessage())); } } //TODO 初始化添加对比远通数据 public static void initTapWaterData(){ //TODO ① 首先查询水厂设备配置信息 try { List> configList = getWaterTapWaterApi().getWaterCollectionConfigList("WHERE 1=1 AND org_name not like '%营业所' AND org_name!='北碚水厂' AND org_name!='渝中区水厂'"); if(!CollectionUtils.isEmpty(configList)){ //声明总数据的数据数组 List> newRecordAll = Collections.synchronizedList(new LinkedList>()); //TODO ②开启多线程并发处理各水厂设备的数据 LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"比对远通原始数据开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"进行中:......"); final CountDownLatch latch = new CountDownLatch(configList.size()); LocalDateTime startTimeLocal = LocalDateTime.now().minusDays(1).minusHours(1); for(Map item:configList){ try{ ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) { @Override public void function() { //TODO 根据当前配置信息item 查询远通数据中的历史数据 //TODO 首先查询当前水厂设备的2025-01-01之后到最新数据的数据总数,然后分页形式获取数据 Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(" WHERE TAG_CODE = '"+item.get("collcation_tag")+"' and QCQUISITION_TIME >= to_date('"+startTimeLocal.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"', 'yyyy-mm-dd hh24:mi:ss')"); if(itemCount!=null&&itemCount>0) { //TODO 优化 以分页方式查询所有,初始分页行数定为2000查询速率较好 int pageNum = itemCount%2000==0?itemCount/2000:(itemCount/2000)+1;//总页数 Integer limit = 2000; if(pageNum<=1){ limit = itemCount;//说明总数比第一页小 } for (int i = 0; i < pageNum; i++) { Integer offset = i*limit; //tapWaterHistoryList 为远通水量数据源 //TODO 因为是orcale数据库 分页方式以伪列rownum为分页条件,因此limit实际上永远比offset大2000,因此只要不是第一页,limit=offset+2000 Integer limitNew = 0; if(i>0){ limitNew = offset+2000; }else{ limitNew = 2000; } List> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(limitNew,offset," AND TAG_CODE = '"+item.get("collcation_tag")+"' and QCQUISITION_TIME >= to_date('"+startTimeLocal.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"', 'yyyy-mm-dd hh24:mi:ss')"); if(!CollectionUtils.isEmpty(tapWaterHistoryList)){ //TODO 循环远通水量数据列表,查询数据不存在的话就插入 for (int j = 0; j < tapWaterHistoryList.size(); j++) { List> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordList(1,0, " WHERE collcation_tag = '"+tapWaterHistoryList.get(j).get("TAG_CODE") +"' AND org_name = '"+tapWaterHistoryList.get(j).get("NAME")+"' AND time = '"+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+"'"); if(CollectionUtils.isEmpty(queryWaterRecord)){ //TODO 说明没插入过本系统,执行插入 String extend = " ('"+tapWaterHistoryList.get(j).get("TAG_CODE")+"','"+tapWaterHistoryList.get(j).get("NAME")+"'," + ""+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+"," + "'"+tapWaterHistoryList.get(j).get("VAL")+"'," + TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("UPDATE_TIME").toString()) +") "; int insertCode = getWaterTapWaterApi().insertWaterCollectionRecord(extend); if(insertCode<=0){ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR , mStrClassName , mStrClassName , String.format("Batch initTapWaterDataThread 未成功:{%s} ", extend)); } } } } } } latch.countDown(); } }); }catch(Exception ex){ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR , mStrClassName , mStrClassName , String.format("Batch initTapWaterDataThread 水厂:{%s},标识:{%s} ERROR:{%s} ", item.get("org_name"), item.get("collcation_tag") , ex.getLocalizedMessage())); } } latch.await(); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"比对远通原始数据结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)); } }catch(Exception ex){ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR , mStrClassName , mStrClassName , String.format("Batch initTapWaterData ERROR:{%s} " , ex.getLocalizedMessage())); } } //TODO 初始化添加计算水厂所有设备每日的每小时用水量计算 public static void initWaterCollecationReacordAll(){ //TODO ① 首先查询水厂设备配置信息 try { LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"计算小时用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"计算小时用水量进行中:......"); List> configList = getWaterTapWaterApi().getWaterCollectionConfigList("WHERE 1=1 AND org_name not like '%营业所' AND org_name!='北碚水厂' AND org_name!='渝中区水厂'"); if (!CollectionUtils.isEmpty(configList)) { //TODO 按照组织机构分组 Map>> groupedData = configList.stream().collect(Collectors.groupingBy(item -> item.get("org_name"))); final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size()); //TODO 外层循环组织机构 for (Object key:groupedData.keySet()){ try{ ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) { @Override public void function() { //TODO 根据当前配置信息item 查询远通数据中的历史数据 //TODO 首先查询当前水厂设备的从2025-01-01之后到得到数据 // LocalDateTime startDateTime = LocalDateTime.now().withMinute(0).withSecond(0) // .minusDays(1).minusHours(1); LocalDateTime startDateTime = LocalDateTime.now().withHour(0).withMinute(0).withSecond(0) .minusDays(3); //TODO 需计算的循环天数 Long days = 0L; days = Duration.between(startDateTime, LocalDateTime.now()).toDays(); //TODO 此循环天数每一天所查的是所有设备每小时数据合 final CountDownLatch latch2 = new CountDownLatch(days.intValue()); for(Long k = 0L;k> deviceList = groupedData.get(key); //TODO 循环获取该天该水厂每个设备数据 //TODO 查询当前天日期内每15分钟的设备数据 for (int i = 0; i < 96; i++) { String startTime = newStartDateTime.minusHours(-i).format(formater); String endTime = newStartDateTime.minusMinutes(-1).minusHours(-(i+1)).format(formater);//查询时间加一分钟 Map recordAllEntity = new HashMap<>();//需要添加的实体数据 recordAllEntity.put("org_name", key.toString());//水厂 recordAllEntity.put("time", newStartDateTime.minusHours(-(i+1)).format(formater));//采集时间(小时的最后时间) recordAllEntity.put("value", null); recordAllEntity.put("value_tag", "water"); recordAllEntity.put("collcation_tag_array", FastJsonUtil.toJSON(deviceList)); Double value = null; //TODO 此循环计算该小时所有设备的用水量 for (Map item : deviceList) { //TODO 和尚山水厂的三个OPC的设备0点到1点的数据特殊处理,直接取最后一条数据作为0点到1点的用水量 // 定义字符串日期时间的格式 DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); // 解析字符串以创建 LocalDateTime 实例 LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter); if (dateTime.getHour()==1&&("OPC_SC.HSS.HSS1.AI.LJLL1.F_CV".equals(item.get("collcation_tag").toString()) || "OPC_SC.HSS.HSS1.AI.LJLL2.F_CV".equals(item.get("collcation_tag").toString()) || "OPC_SC.HSS.HSS1.AI.LJLL3.F_CV".equals(item.get("collcation_tag").toString()))) { Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount( " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " + " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" + " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')"); if(itemCount!=null&&itemCount>0) { List> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0, " AND TAG_CODE = '" + item.get("collcation_tag") + "' " + " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" + " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" + " order by QCQUISITION_TIME"); if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) { if (value == null) { value = 0.00; } value += tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null; } } }else { Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount( " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " + " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" + " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')"); if (itemCount != null && itemCount > 0) { List> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0, " AND TAG_CODE = '" + item.get("collcation_tag") + "' " + " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" + " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" + " order by QCQUISITION_TIME"); //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量 if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) { Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null; Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null; if (firstValue != null && lastValue != null) { //到此处是该小时一个设备的用水量已加上 if (value == null) { value = 0.00; } value += Math.abs(lastValue - firstValue); } } } } } recordAllEntity.put("value", value); List> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllList(1, 0, " WHERE org_name = '" + recordAllEntity.get("org_name") + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'"); if (CollectionUtils.isEmpty(queryWaterRecord)) { //TODO 说明不存在,进行插入 if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) { int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAll(" (" + "'" + recordAllEntity.get("org_name") + "'," + "'" + recordAllEntity.get("time") + "'," + "'" + recordAllEntity.get("value") + "'," + "'" + recordAllEntity.get("value_tag") + "'," + "'" + recordAllEntity.get("collcation_tag_array") + "'" + ") "); if (insertCode < 0) { LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR , mStrClassName , mStrClassName , String.format("Batch initTapWaterDataThread 未成功:{%s} ", FastJsonUtil.toJSON(recordAllEntity))); } } }else{ //TODO 说明存在,进行修改 if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) { int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAll(String.valueOf(value)," WHERE " + "(" + " org_name = '" + recordAllEntity.get("org_name") + "' and" + " \"time\" = '" + recordAllEntity.get("time") + "' and" + " value_tag = '" + recordAllEntity.get("value_tag") + "'" + ") "); if (updateCode < 0) { LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR , mStrClassName , mStrClassName , String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ", FastJsonUtil.toJSON(recordAllEntity))); } } } } latch2.countDown(); } }); }catch(Exception ex){ } } try { latch2.await(); }catch(Exception ex){ } latch.countDown(); } }); }catch(Exception ex){ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR , mStrClassName , mStrClassName , String.format("Batch" + " initWaterReacordAll ERROR:{%s} " , ex.getLocalizedMessage())); } } latch.await(); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"计算小时用水量结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)); } }catch(Exception ex){ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR , mStrClassName , mStrClassName , String.format("Batch initWaterCollecationReacordAll ERROR:{%s} " , ex.getLocalizedMessage())); } } //TODO 初始化添加计算泵每日的小时数据 public static void initWaterPumpReacordAll(String startFindTime){ //TODO ① 首先查询水厂设备配置信息 try { LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"添加小时泵数据开始时间:"+ TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"添加小时泵数据起始时间:("+startFindTime+")进行中:......"); List> configList = getWaterTapWaterApi().getWaterPumpCollectionConfigList(null); if (!CollectionUtils.isEmpty(configList)) { //TODO 按照设备分组 Map>> groupedData = configList.stream().collect(Collectors.groupingBy(item -> item.get("device_code"))); final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size()); //TODO 外层循环组织机构 for (Object key:groupedData.keySet()){ try{ new Thread(() -> { //TODO 根据当前配置信息item 查询远通数据中的历史数据 //TODO 首先查询当前水厂设备的从startFindTime之后到得到数据 LocalDateTime startDateTime = LocalDateTime.parse(startFindTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); //TODO 需计算的循环天数 Long days = 0L; days = Duration.between(startDateTime, LocalDateTime.now()).toDays(); days+=1; //TODO 此循环天数每一天所查的是所有设备每小时数据合 final CountDownLatch latch2 = new CountDownLatch(days.intValue()); for(Long k = 0L;k { List> deviceList = groupedData.get(key); //TODO 循环获取该天该水厂每个设备数据 //TODO 查询当前天日期内每小时的设备数据 for (int i = 0; i < 24; i++) { String startTime = newStartDateTime.withHour(i).format(formater); //TODO 需要加个05分把endTime的整点数据查出来 String endTime = newStartDateTime.withHour(i).minusHours(-1).withMinute(5).withSecond(0).format(formater); //TODO 此循环计算该小时所有设备的泵数据 for (Map item : deviceList) { // 定义字符串日期时间的格式 DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); // 解析字符串以创建 LocalDateTime 实例 LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter); Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount( " WHERE 1=1 AND TAG_CODE = '" + item.get("collcation_tag") + "' " + " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" + " and QCQUISITION_TIME <= to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')" + " AND ABS(" + " (QCQUISITION_TIME - TRUNC(QCQUISITION_TIME, 'HH')) * 24 * 60 " + " ) <= 5");//查询 -- 采集时间接近整点,误差在5分钟内 if (itemCount != null && itemCount > 0) { List> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0, " AND TAG_CODE = '" + item.get("collcation_tag") + "' " + " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" + " and QCQUISITION_TIME <= to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" + " AND ABS( " + " (QCQUISITION_TIME - TRUNC(QCQUISITION_TIME, 'HH')) * 24 * 60 " + " ) <= 5 " + " order by QCQUISITION_TIME"); //TODO 这里需要注意泵数据的采集配置表设备号可能有多个,因此,要判断对应的standard_code做对应处理 if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() >= 1) { //TODO 数据库操作 for (Map mapEntity:tapWaterHistoryList) { Map recordAllEntity = new HashMap<>();//需要添加的实体数据 recordAllEntity.put("device_code", groupedData.get(key).get(0).get("device_code")); recordAllEntity.put("time", LocalDateTime.parse(mapEntity.get("QCQUISITION_TIME").toString(), formatter) .withMinute(0).withSecond(0).format(formatter));//采集时间(小时的最后时间) if(item.get("standard_code").equals("active_energy")){ recordAllEntity.put("active_energy",mapEntity.get("VAL")); } if(item.get("standard_code").equals("startup_state")){ recordAllEntity.put("startup_state",mapEntity.get("VAL")); } if(item.get("standard_code").equals("phase_a_current")){ if(ObjectUtils.isEmpty(mapEntity.get("VAL"))||"0".equals(mapEntity.get("VAL").toString())) { recordAllEntity.put("startup_state", 0); }else{ recordAllEntity.put("startup_state", 1); } } //TODO 直接调用插入或者新增方法 int insertCode = getWaterTapWaterApi().insertOrUpdateWaterPumpRecordAll(recordAllEntity); if (insertCode < 0) { System.out.print(String.format("Batch insertOrUpdateWaterPumpRecordAll 未成功:{%s} ", JSONObject.toJSON(recordAllEntity))); } } } } } } latch2.countDown(); }).start(); }catch(Exception ex){ } } try { latch2.await(); }catch(Exception ex){ } latch.countDown(); }).start(); }catch(Exception ex){ System.out.print(String.format("Batch" + " initWaterReacordAll ERROR:{%s} " , ex.getLocalizedMessage())); } } latch.await(); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"添加小时泵数据检查机制("+startFindTime+")结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)); } }catch(Exception ex){ System.out.print(String.format("Batch initWaterCollecationReacordAll ERROR:{%s} " , ex.getLocalizedMessage())); } } }