KprAimTapWaterBizFun.java 104 KB


  1. package com.shkpr.service.aimodelpower.bizmgr;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.global.base.log.LogLevelFlag;
  4. import com.global.base.log.LogPrintMgr;
  5. import com.global.base.tools.FastJsonUtil;
  6. import com.shkpr.service.aimodelpower.commtools.TimeTool;
  7. import com.shkpr.service.aimodelpower.dbdao.DBMgrProxy;
  8. import com.shkpr.service.aimodelpower.dbdao.shizilaishuiDataSource.service.intef.WaterCollecationService;
  9. import com.shkpr.service.aimodelpower.dbdao.shizilaishuiDataSource.service.intef.WaterTapWaterService;
  10. import com.shkpr.service.aimodelpower.dto.TraceRunnable;
  11. import com.shkpr.service.aimodelpower.globalmgr.ThreadTaskMgr;
  12. import com.shkpr.service.aimodelpower.globalmgr.TraceLogMgr;
  13. import org.springframework.util.CollectionUtils;
  14. import org.springframework.util.ObjectUtils;
  15. import java.io.FileInputStream;
  16. import java.io.IOException;
  17. import java.io.InputStream;
  18. import java.time.Duration;
  19. import java.time.LocalDate;
  20. import java.time.LocalDateTime;
  21. import java.time.LocalTime;
  22. import java.time.format.DateTimeFormatter;
  23. import java.util.*;
  24. import java.util.concurrent.CountDownLatch;
  25. import java.util.stream.Collectors;
  26. /**
  27. * @ClassName KprAimTapWaterBizFun
  28. * @Description: TODO
  29. * @Author LX
  30. * @Date 2024/5/22
  31. * @Version V1.0
  32. **/
  33. public class KprAimTapWaterBizFun {
  34. private static final String MSG_SUCCESS = "success.";
  35. private static final String MSG_FAILED = "failed.";
  36. private static final String mStrClassName = "KprAimTapWaterBizFun";
  37. private static final String EMPTY_NULL = "NULL";
  38. public static WaterTapWaterService getWaterTapWaterApi(){
  39. return DBMgrProxy.getInstance().applyWaterTapWaterService();
  40. }
  41. public static WaterCollecationService getWaterCollecationApi(){
  42. return DBMgrProxy.getInstance().applyWaterCollecationService();
  43. }
  44. static DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  45. //TODO 15分间隔的预测插值(做假暂时用,不用时注释)
  46. //TODO 思路:等预测小时的原程序完成后, 原预测表中为小时预测数据,分钟字段为空,将预测表中的数据进行二次清洗然后插入新预测表
  47. //TODO beforeDays为往后推清洗多少天 n-1为逻辑, 例如传1就是0天当天
  48. public static void yuceZuojia(int beforeDays){
  49. try{
  50. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"预测15分钟插值开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  51. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"预测15分钟插值进行中:......");
  52. List<Map<String, Object>> orgConfig = getWaterCollecationApi().getOrgConfig(false, 0, 0, " AND org_name != '井口水厂' AND org_name != '丰收坝水厂' AND org_name != '沙坪坝水厂' AND org_name != '北碚红工水厂' AND org_name != '大溪沟水厂' ");
  53. final CountDownLatch latch = new CountDownLatch(orgConfig.size());
  54. DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
  55. LocalDateTime startDate = LocalDateTime.now().withMinute(0).withSecond(0);
  56. for (Map<String,Object> org:orgConfig) {
  57. ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
  58. @Override
  59. public void function() {
  60. try {
  61. for (int dayNum=0;dayNum<beforeDays;dayNum++) {
  62. String dateStr = startDate.minusDays(-dayNum).format(formatter);
  63. List<Map<String, Object>> resHourList = getWaterCollecationApi().getTbMHourwater(false,
  64. 20, 0, " AND \"orgId\" = '" + org.get("org_id") + "' AND \"Date\" = '" + dateStr + "' ORDER BY \"Date\",\"Hour\" ASC ");
  65. processWaterData(resHourList);
  66. }
  67. latch.countDown();
  68. }catch(Exception ex){
  69. ex.printStackTrace();
  70. }
  71. }
  72. });
  73. }
  74. latch.await();
  75. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"预测15分钟插值已结束");
  76. }catch(Exception ex){
  77. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  78. , mStrClassName
  79. , mStrClassName
  80. , String.format("Batch yuceZuojia ERROR:{%s} "
  81. , ex.getLocalizedMessage()));
  82. }
  83. }
  84. private static DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
  85. private static DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
  86. public static void processWaterData(List<Map<String, Object>> resHourList) {
  87. List<List<Map<String, Object>>> trendGroups = new ArrayList<>();
  88. List<Map<String, Object>> currentGroup = new ArrayList<>();
  89. for (int i = 0; i < resHourList.size(); i++) {
  90. Map<String, Object> currentMap = resHourList.get(i);
  91. double currentHourData = getDoubleValue(currentMap, "HourForecastActualWaterSupply");
  92. if (i == 0) {
  93. currentGroup.add(currentMap);
  94. } else {
  95. double previousHourData = getDoubleValue(resHourList.get(i - 1), "HourForecastActualWaterSupply");
  96. boolean isIncreasing = currentHourData > previousHourData;
  97. if (currentGroup.size() > 0) {
  98. double lastGroupData = getDoubleValue(currentGroup.get(currentGroup.size() - 1), "HourForecastActualWaterSupply");
  99. boolean wasIncreasing = lastGroupData < currentHourData;
  100. if (isIncreasing != wasIncreasing) {
  101. trendGroups.add(new ArrayList<>(currentGroup));
  102. currentGroup.clear();
  103. }
  104. }
  105. currentGroup.add(currentMap);
  106. }
  107. }
  108. if (!currentGroup.isEmpty()) {
  109. trendGroups.add(currentGroup);
  110. }
  111. for (List<Map<String, Object>> group : trendGroups) {
  112. for (int i = 0; i < group.size() - 1; i++) {
  113. Map<String, Object> currentMap = group.get(i);
  114. double currentHourData = getDoubleValue(currentMap, "HourForecastActualWaterSupply");
  115. double nextHourData = getDoubleValue(group.get(i + 1), "HourForecastActualWaterSupply");
  116. boolean isIncreasing = nextHourData > currentHourData;
  117. double[] splitData = splitHourData(currentHourData, isIncreasing);
  118. // 步骤2:平滑处理
  119. double alpha = 0.3; // 平滑系数
  120. double[] smoothedData = exponentialSmoothing(splitData,alpha);
  121. // 解析日期字符串
  122. LocalDate date = LocalDate.parse(currentMap.get("Date").toString(), DateTimeFormatter.ISO_LOCAL_DATE);
  123. // 解析时间字符串
  124. LocalTime time = LocalTime.parse(currentMap.get("Hour").toString(), DateTimeFormatter.ISO_LOCAL_TIME);
  125. LocalDateTime dateTime = LocalDateTime.of(date, time).minusMinutes(45);
  126. for (int j = 0; j < smoothedData.length; j++) {
  127. Map<String, Object> newMap = new HashMap<>(currentMap);
  128. newMap.put("HourForecastActualWaterSupply", smoothedData[j]);
  129. LocalDateTime quarterHourTime = dateTime.plusMinutes(j * 15);
  130. newMap.put("Date", quarterHourTime.format(dateFormatter));
  131. newMap.put("Hour", quarterHourTime.format(timeFormatter));
  132. // Here you would query the database and insert/update data
  133. // For example:
  134. double originalValue = queryOriginalValue(quarterHourTime);
  135. newMap.put("HourActualWaterSupply", originalValue);
  136. newMap.put("LastModifyTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern(TimeTool.TIMESTAMP_FORMAT)));
  137. // Retain only specific keys
  138. Map<String, Object> filteredMap = filterMap(newMap, "Date", "Hour", "HourForecastActualWaterSupply", "HourActualWaterSupply", "LastModifyTime", "orgId");
  139. // Insert or update the database
  140. // int insertRes = insertOrUpdateDatabase(filteredMap);
  141. // Handle the result if needed
  142. Integer insertRes = getWaterTapWaterApi().insertOrUpdateTbmHourWaterNew(filteredMap);
  143. if (insertRes < 1) {
  144. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  145. , mStrClassName
  146. , mStrClassName
  147. , String.format("Batch yuceZuojia ERROR:{%s} ",
  148. "新增或修改失败"));
  149. }
  150. }
  151. }
  152. }
  153. }
  154. private static double getDoubleValue(Map<String, Object> map, String key) {
  155. Object value = map.get(key);
  156. return value != null ? Double.parseDouble(value.toString()) : 0.0;
  157. }
  158. //拆分函数
  159. private static double[] splitHourData(double hourData, boolean isIncreasing) {
  160. double[] splitData = new double[4];
  161. if (isIncreasing) {
  162. // 调整比例以实现更平滑的上升曲线
  163. splitData[0] = hourData * 0.24;
  164. splitData[1] = hourData * 0.26;
  165. splitData[2] = hourData * 0.25;
  166. splitData[3] = hourData * 0.25;
  167. } else {
  168. // 调整比例以实现更平滑的下降曲线
  169. splitData[0] = hourData * 0.25;
  170. splitData[1] = hourData * 0.25;
  171. splitData[2] = hourData * 0.26;
  172. splitData[3] = hourData * 0.24;
  173. }
  174. return splitData;
  175. }
  176. //平滑函数
  177. private static double[] smoothData(double[] data) {
  178. double[] smoothedData = new double[data.length];
  179. int windowSize = 3; // 移动平均窗口大小
  180. for (int i = 0; i < data.length; i++) {
  181. double sum = 0;
  182. int count = 0;
  183. for (int j = Math.max(0, i - windowSize / 2); j <= Math.min(data.length - 1, i + windowSize / 2); j++) {
  184. sum += data[j];
  185. count++;
  186. }
  187. smoothedData[i] = sum / count;
  188. }
  189. return smoothedData;
  190. }
  191. //指数平滑
  192. private static double[] exponentialSmoothing(double[] data, double alpha) {
  193. double[] smoothedData = new double[data.length];
  194. smoothedData[0] = data[0]; // 初始化第一个值
  195. for (int i = 1; i < data.length; i++) {
  196. smoothedData[i] = alpha * data[i] + (1 - alpha) * smoothedData[i - 1];
  197. }
  198. return smoothedData;
  199. }
  200. private static double queryOriginalValue(LocalDateTime dateTime) {
  201. // Simulate querying the database for the original value
  202. // Replace with actual database query
  203. return 100.0; // Placeholder value
  204. }
  205. private static Map<String, Object> filterMap(Map<String, Object> map, String... keys) {
  206. Map<String, Object> filteredMap = new LinkedHashMap<>();
  207. for (String key : keys) {
  208. if (map.containsKey(key)) {
  209. filteredMap.put(key, map.get(key));
  210. }
  211. }
  212. return filteredMap;
  213. }
  214. private static Map<String, List<String>> parseConfig(String configstr) {
  215. // Initialize the map to hold the parsed configuration
  216. Map<String, List<String>> configMap = new HashMap<>();
  217. // Get the configuration string
  218. String configString = configstr;
  219. if (configString != null) {
  220. // Split the configuration by different locations
  221. String[] locations = configString.split("},");
  222. for (String location : locations) {
  223. // Find the index of the opening brace
  224. int braceIndex = location.indexOf('{');
  225. if (braceIndex > 0) {
  226. // Extract the location name
  227. String locationName = location.substring(0,braceIndex).trim();
  228. // Extract the tags and split by comma
  229. String tagsString = location.substring(braceIndex + 1).replace("}", "").trim();
  230. List<String> tags = Arrays.asList(tagsString.split(","));
  231. // Add to the map
  232. configMap.put(locationName, tags);
  233. }
  234. }
  235. }
  236. return configMap;
  237. }
  238. //TODO 小时营业所片区用水量(自供+(供入-供出))
  239. //TODO beforHour是小时数,当前时间往前扣多少个小时 按15分钟刻度计算用水量
  240. public static void checkBusinessRecordAllData(int beforHour,String selfconfessStr,String supplyinStr,String confessStr){
  241. //TODO 检查小时用水量
  242. DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  243. try{
  244. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算营业所15分钟用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  245. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算营业所15分钟用水量进行中:......");
  246. //获取配置关系
  247. //TODO 自供
  248. Map<String,List<String>> selfconfess = parseConfig(selfconfessStr);
  249. //TODO 供入
  250. Map<String,List<String>> supplyin = parseConfig(supplyinStr);
  251. //TODO 供出
  252. Map<String,List<String>> confess = parseConfig(confessStr);
  253. //TODO 根据当前配置信息item 查询远通数据中的历史数据
  254. //TODO 首先查询当前水厂设备的从昨天之后到得到数据的数据
  255. LocalDateTime today = LocalDateTime.now();
  256. LocalDateTime startDateTime = today.withMinute(0).withSecond(0).withNano(0).minusHours(beforHour);
  257. //TODO 需计算的循环天数
  258. //TODO 此循环天数每一天所查的是所有设备每小时数据合
  259. LocalDateTime newStartDateTime = startDateTime;
  260. String startDate = newStartDateTime.format(formater);
  261. String endDate = today.withMinute(1).withSecond(0).format(formater);
  262. final CountDownLatch latch = new CountDownLatch(selfconfess.keySet().size());
  263. List<String> closeTag = new ArrayList<>();
  264. closeTag.add("FSBSC_MAS.MAS.TOTALFLOWD");
  265. closeTag.add("fsbsc_mas.mas.totalflowg");
  266. for (String orgName:selfconfess.keySet()) {
  267. try {
  268. ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
  269. @Override
  270. public void function() {
  271. for (int i = 0; i < Integer.valueOf(String.valueOf(Duration.between(startDateTime, today.withSecond(0)).toMinutes() / 15)); i++) {
  272. String startTime = newStartDateTime.minusMinutes(-(i * 15)).format(formater);
  273. String endTime = newStartDateTime.minusMinutes(-1).minusMinutes(-((i + 1) * 15)).format(formater);//查询时间加一分钟
  274. Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
  275. recordAllEntity.put("org_name", orgName);//水厂
  276. recordAllEntity.put("time", newStartDateTime.minusMinutes(-((i + 1) * 15)).format(formater));//采集时间(小时的最后时间)
  277. recordAllEntity.put("value", null);
  278. recordAllEntity.put("value_tag", "water");
  279. recordAllEntity.put("collcation_tag_array", "");
  280. //TODO 此循环计算该小时所有设备的用水量
  281. //TODO 先查各个营业所的自供值
  282. List<String> tagTags = selfconfess.get(orgName);//自供
  283. List<String> tagTags2 = supplyin.get(orgName);//供入
  284. List<String> tagTags3 = confess.get(orgName);//供出
  285. Double selfValue = null;
  286. Double supplyInValue = null;
  287. Double confessValue = null;
  288. try {
  289. selfValue = selfconfess(tagTags, startDate, endDate, startTime, endTime,orgName+"(自供)",closeTag);//自供
  290. supplyInValue = selfconfess(tagTags2, startDate, endDate, startTime, endTime,orgName+"(供入)",null);//供入
  291. confessValue = selfconfess(tagTags3, startDate, endDate, startTime, endTime,orgName+"(供出)",null);//供出
  292. } catch (Exception ex) {
  293. }
  294. Double value = null;//总计算值
  295. if (selfValue != null) {
  296. if (supplyInValue != null && confessValue == null) {
  297. value = selfValue + supplyInValue;
  298. } else if (supplyInValue != null && confessValue != null) {
  299. value = selfValue + supplyInValue - confessValue;
  300. } else if (supplyInValue == null && confessValue == null) {
  301. value = selfValue;
  302. } else if (supplyInValue == null && confessValue != null) {
  303. value = selfValue - confessValue;
  304. }
  305. }
  306. recordAllEntity.put("value", value);
  307. // System.out.println("营业所片区" + orgName + "值:" + value + ",采集时间:" + newStartDateTime.minusMinutes(-((i + 1) * 15)).format(formater));
  308. List<Map<String, Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllListNew(1, 0,
  309. " WHERE org_name = '" + recordAllEntity.get("org_name")
  310. + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
  311. if (CollectionUtils.isEmpty(queryWaterRecord)) {
  312. // //TODO 说明不存在,进行插入
  313. if (!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
  314. int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAllNew(" (" +
  315. "'" + recordAllEntity.get("org_name") + "'," +
  316. "'" + recordAllEntity.get("time") + "'," +
  317. "'" + recordAllEntity.get("value") + "'," +
  318. "'" + recordAllEntity.get("value_tag") + "'," +
  319. "'" + recordAllEntity.get("collcation_tag_array") + "'" +
  320. ") ");
  321. if (insertCode < 0) {
  322. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  323. , mStrClassName
  324. , mStrClassName
  325. , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
  326. FastJsonUtil.toJSON(recordAllEntity)));
  327. }
  328. }
  329. } else {
  330. //TODO 说明存在,进行修改
  331. if (!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
  332. int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAllNew(String.valueOf(value), " WHERE " +
  333. "(" +
  334. " org_name = '" + recordAllEntity.get("org_name") + "' and" +
  335. " \"time\" = '" + recordAllEntity.get("time") + "' and" +
  336. " value_tag = '" + recordAllEntity.get("value_tag") + "'" +
  337. ") ");
  338. if (updateCode < 0) {
  339. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  340. , mStrClassName
  341. , mStrClassName
  342. , String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ",
  343. FastJsonUtil.toJSON(recordAllEntity)));
  344. }
  345. }
  346. }
  347. }
  348. latch.countDown();
  349. }
  350. });
  351. }catch(Exception ex){
  352. }
  353. }
  354. latch.await();
  355. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算营业所15分钟用水量已完成");
  356. }catch(Exception ex){
  357. ex.printStackTrace();
  358. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  359. , mStrClassName
  360. , mStrClassName
  361. , String.format("Batch checkBusinessRecordAllData ERROR:{%s} "
  362. , ex.getLocalizedMessage()));
  363. }
  364. }
  365. //TODO beforHour是小时数,当前时间往前扣多少个小时 按一小时刻度计算用水量
  366. public static void checkBusinessRecordOldAllData(int beforHour,String selfconfessStr,String supplyinStr,String confessStr){
  367. //TODO 检查小时用水量
  368. DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  369. try{
  370. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算营业所小时用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  371. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算营业所小时用水量进行中:......");
  372. //获取配置关系
  373. //TODO 自供
  374. Map<String,List<String>> selfconfess = parseConfig(selfconfessStr);
  375. //TODO 供入
  376. Map<String,List<String>> supplyin = parseConfig(supplyinStr);
  377. //TODO 供出
  378. Map<String,List<String>> confess = parseConfig(confessStr);
  379. //TODO 根据当前配置信息item 查询远通数据中的历史数据
  380. //TODO 首先查询当前水厂设备的从昨天之后到得到数据的数据
  381. LocalDateTime today = LocalDateTime.now();
  382. LocalDateTime startDateTime = today.withMinute(0).withSecond(0).withNano(0).minusHours(beforHour);
  383. //TODO 需计算的循环天数
  384. //TODO 此循环天数每一天所查的是所有设备每小时数据合
  385. LocalDateTime newStartDateTime = startDateTime;
  386. String startDate = newStartDateTime.format(formater);
  387. String endDate = today.withMinute(1).withSecond(0).format(formater);
  388. List<String> closeTag = new ArrayList<>();
  389. closeTag.add("FSBSC_MAS.MAS.TOTALFLOWD");
  390. closeTag.add("fsbsc_mas.mas.totalflowg");
  391. final CountDownLatch latch = new CountDownLatch(selfconfess.keySet().size());
  392. for (String orgName:selfconfess.keySet()) {
  393. try {
  394. ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
  395. @Override
  396. public void function() {
  397. for (int i = 0; i < Integer.valueOf(String.valueOf(Duration.between(startDateTime, today.withMinute(0).withSecond(0)).toHours())); i++) {
  398. String startTime = newStartDateTime.minusHours(-i).format(formater);
  399. String endTime = newStartDateTime.minusMinutes(-1).minusHours(-(i+1)).format(formater);//查询时间加一分钟
  400. Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
  401. recordAllEntity.put("org_name", orgName);//水厂
  402. recordAllEntity.put("time", newStartDateTime.minusHours(-(i+1)).format(formater));//采集时间(小时的最后时间)
  403. recordAllEntity.put("value", null);
  404. recordAllEntity.put("value_tag", "water");
  405. recordAllEntity.put("collcation_tag_array", "");
  406. //TODO 此循环计算该小时所有设备的用水量
  407. //TODO 先查各个营业所的自供值
  408. List<String> tagTags = selfconfess.get(orgName);//自供
  409. List<String> tagTags2 = supplyin.get(orgName);//供入
  410. List<String> tagTags3 = confess.get(orgName);//供出
  411. Double selfValue = null;
  412. Double supplyInValue = null;
  413. Double confessValue = null;
  414. try {
  415. selfValue = selfconfess(tagTags, startDate, endDate, startTime, endTime,orgName+"(自供)",closeTag);//自供
  416. supplyInValue = selfconfess(tagTags2, startDate, endDate, startTime, endTime,orgName+"(供入)",null);//供入
  417. confessValue = selfconfess(tagTags3, startDate, endDate, startTime, endTime,orgName+"(供出)",null);//供出
  418. } catch (Exception ex) {
  419. }
  420. Double value = null;//总计算值
  421. if (selfValue != null) {
  422. if (supplyInValue != null && confessValue == null) {
  423. value = selfValue + supplyInValue;
  424. } else if (supplyInValue != null && confessValue != null) {
  425. value = selfValue + supplyInValue - confessValue;
  426. } else if (supplyInValue == null && confessValue == null) {
  427. value = selfValue;
  428. } else if (supplyInValue == null && confessValue != null) {
  429. value = selfValue - confessValue;
  430. }
  431. }
  432. recordAllEntity.put("value", value);
  433. System.out.println("营业所片区" + orgName + "值:" + value + ",采集时间:" + newStartDateTime.minusMinutes(-((i + 1) * 15)).format(formater));
  434. List<Map<String, Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllList(1, 0,
  435. " WHERE org_name = '" + recordAllEntity.get("org_name")
  436. + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
  437. if (CollectionUtils.isEmpty(queryWaterRecord)) {
  438. // //TODO 说明不存在,进行插入
  439. if (!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
  440. int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAll(" (" +
  441. "'" + recordAllEntity.get("org_name") + "'," +
  442. "'" + recordAllEntity.get("time") + "'," +
  443. "'" + recordAllEntity.get("value") + "'," +
  444. "'" + recordAllEntity.get("value_tag") + "'," +
  445. "'" + recordAllEntity.get("collcation_tag_array") + "'" +
  446. ") ");
  447. if (insertCode < 0) {
  448. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  449. , mStrClassName
  450. , mStrClassName
  451. , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
  452. FastJsonUtil.toJSON(recordAllEntity)));
  453. }
  454. }
  455. } else {
  456. //TODO 说明存在,进行修改
  457. if (!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
  458. int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAll(String.valueOf(value), " WHERE " +
  459. "(" +
  460. " org_name = '" + recordAllEntity.get("org_name") + "' and" +
  461. " \"time\" = '" + recordAllEntity.get("time") + "' and" +
  462. " value_tag = '" + recordAllEntity.get("value_tag") + "'" +
  463. ") ");
  464. if (updateCode < 0) {
  465. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  466. , mStrClassName
  467. , mStrClassName
  468. , String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ",
  469. FastJsonUtil.toJSON(recordAllEntity)));
  470. }
  471. }
  472. }
  473. }
  474. latch.countDown();
  475. }
  476. });
  477. }catch(Exception ex){
  478. }
  479. }
  480. latch.await();
  481. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算营业所小时用水量已完成");
  482. }catch(Exception ex){
  483. ex.printStackTrace();
  484. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  485. , mStrClassName
  486. , mStrClassName
  487. , String.format("Batch checkBusinessRecordAllData ERROR:{%s} "
  488. , ex.getLocalizedMessage()));
  489. }
  490. }
  491. //TODO 计算总值
  492. private static Double selfconfess(List<String> tagTags,String startDate,String endDate,String startTime,String endTime,String desc,List<String> closeTag) throws Exception{
  493. Double value = null;
  494. if (tagTags==null){
  495. return value;
  496. }
  497. for (String tagTag : tagTags) {
  498. // 定义字符串日期时间的格式
  499. DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  500. // 解析字符串以创建 LocalDateTime 实例
  501. LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
  502. if(closeTag!=null&&closeTag.size()>0){
  503. if(closeTag.contains(tagTag)){
  504. //如果存在就过滤 结束当前 循环
  505. continue;
  506. }
  507. }
  508. if (dateTime.getHour()==0&&dateTime.getMinute()==16&&("SS.SSCOM.F36:2".equals(tagTag) ||
  509. "SS.SSCOM.F36:5".equals(tagTag) ||
  510. "OPC_SC.HSS.HSS1.AI.LJLL3.F_CV".equals(tagTag))) {
  511. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  512. " WHERE TAG_CODE = '" + tagTag + "' " +
  513. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  514. " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
  515. if(itemCount!=null&&itemCount>0) {
  516. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  517. " AND TAG_CODE = '" + tagTag + "' " +
  518. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  519. " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  520. " order by QCQUISITION_TIME");
  521. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
  522. if (value == null) {
  523. value = 0.00;
  524. }
  525. value += tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
  526. }
  527. }
  528. System.out.println("设施名:"+desc+";时间范围:"+startTime+"--"+endTime+";设备标签:"+tagTag+";用水量:"+value);
  529. }else {
  530. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  531. " WHERE TAG_CODE = '" + tagTag + "' " +
  532. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  533. " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
  534. if (itemCount != null && itemCount > 0) {
  535. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  536. " AND TAG_CODE = '" + tagTag + "' " +
  537. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  538. " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  539. " order by QCQUISITION_TIME");
  540. //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量
  541. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
  542. Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null;
  543. Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
  544. if (firstValue != null && lastValue != null) {
  545. //到此处是该小时一个设备的用水量已加上
  546. if (value == null) {
  547. value = 0.00;
  548. }
  549. value += Math.abs(lastValue - firstValue);
  550. }
  551. }
  552. System.out.println("设施名:"+desc+";时间范围:"+startTime+"--"+endTime+";设备标签:"+tagTag+";用水量:"+value);
  553. }
  554. }
  555. }
  556. // System.out.println("设施名:"+desc+";时间范围:"+startTime+"--"+endTime+";用水量:"+value+";总值:"+value);
  557. return value;
  558. }
  559. //TODO 15分钟 分区用水量 每天整点10分检查并写入,检查范围为当前时间到昨天23点
  560. public static void checkRecordAllDataBusinessFenqu(int beforHour){
  561. //TODO 检查小时用水量
  562. DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  563. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算15分钟用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  564. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算15分钟用水量进行中:......");
  565. List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList("WHERE 1=1 AND org_name not like '%营业所' AND org_name!='北碚水厂' AND org_name!='渝中区水厂'");
  566. if (!CollectionUtils.isEmpty(configList)) {
  567. //TODO 按照组织机构分组
  568. Map<Object, List<Map<String, Object>>> groupedData =
  569. configList.stream().collect(Collectors.groupingBy(item -> item.get("org_name")));
  570. final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
  571. for (Object key:groupedData.keySet()){
  572. try {
  573. ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
  574. @Override
  575. public void function() {
  576. //TODO 根据当前配置信息item 查询远通数据中的历史数据
  577. //TODO 首先查询当前水厂设备的从昨天之后到得到数据的数据
  578. LocalDateTime today = LocalDateTime.now();
  579. LocalDateTime startDateTime = today.withMinute(0).withSecond(0).withNano(0).minusHours(beforHour);
  580. //TODO 需计算的循环天数
  581. //TODO 此循环天数每一天所查的是所有设备每小时数据合
  582. LocalDateTime newStartDateTime = startDateTime;
  583. String startDate = newStartDateTime.format(formater);
  584. String endDate = today.withMinute(1).withSecond(0).format(formater);
  585. List<Map<String, Object>> deviceList = groupedData.get(key);
  586. //TODO 循环获取该天该水厂每个设备数据
  587. //TODO 查询当前天日期内每小时的设备数据
  588. for (int i = 0; i < Integer.valueOf(String.valueOf(Duration.between(startDateTime, today.withSecond(0)).toMinutes()/15)); i++) {
  589. String startTime = newStartDateTime.minusMinutes(-(i * 15)).format(formater);
  590. String endTime = newStartDateTime.minusMinutes(-1).minusMinutes(-((i + 1)*15)).format(formater);//查询时间加一分钟
  591. Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
  592. recordAllEntity.put("org_name", key.toString());//水厂
  593. recordAllEntity.put("time", newStartDateTime.minusMinutes(-((i + 1)*15)).format(formater));//采集时间(小时的最后时间)
  594. recordAllEntity.put("value", null);
  595. recordAllEntity.put("value_tag", "water");
  596. recordAllEntity.put("collcation_tag_array", FastJsonUtil.toJSON(deviceList));
  597. Double value = null;
  598. //TODO 此循环计算该小时所有设备的用水量
  599. for (Map<String, Object> item : deviceList) {
  600. //TODO 和尚山水厂的三个OPC的设备0点到1点的数据特殊处理,直接取最后一条数据作为0点到1点的用水量
  601. // 定义字符串日期时间的格式
  602. DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  603. // 解析字符串以创建 LocalDateTime 实例
  604. LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
  605. if (dateTime.getHour()==0&&dateTime.getMinute()==16&&("SS.SSCOM.F36:2".equals(item.get("collcation_tag").toString()) ||
  606. "SS.SSCOM.F36:5".equals(item.get("collcation_tag").toString()) ||
  607. "OPC_SC.HSS.HSS1.AI.LJLL3.F_CV".equals(item.get("collcation_tag").toString()))) {
  608. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  609. " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
  610. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  611. " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
  612. if(itemCount!=null&&itemCount>0) {
  613. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  614. " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
  615. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  616. " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  617. " order by QCQUISITION_TIME");
  618. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
  619. if (value == null) {
  620. value = 0.00;
  621. }
  622. value += tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
  623. }
  624. }
  625. }else {
  626. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  627. " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
  628. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  629. " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
  630. if (itemCount != null && itemCount > 0) {
  631. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  632. " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
  633. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  634. " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  635. " order by QCQUISITION_TIME");
  636. //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量
  637. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
  638. Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null;
  639. Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
  640. if (firstValue != null && lastValue != null) {
  641. //到此处是该小时一个设备的用水量已加上
  642. if (value == null) {
  643. value = 0.00;
  644. }
  645. value += Math.abs(lastValue - firstValue);
  646. }
  647. }
  648. }
  649. }
  650. }
  651. recordAllEntity.put("value", value);
  652. List<Map<String, Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllListNew(1, 0,
  653. " WHERE org_name = '" + recordAllEntity.get("org_name")
  654. + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
  655. if (CollectionUtils.isEmpty(queryWaterRecord)) {
  656. //TODO 说明不存在,进行插入
  657. if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
  658. int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAllNew(" (" +
  659. "'" + recordAllEntity.get("org_name") + "'," +
  660. "'" + recordAllEntity.get("time") + "'," +
  661. "'" + recordAllEntity.get("value") + "'," +
  662. "'" + recordAllEntity.get("value_tag") + "'," +
  663. "'" + recordAllEntity.get("collcation_tag_array") + "'" +
  664. ") ");
  665. if (insertCode < 0) {
  666. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  667. , mStrClassName
  668. , mStrClassName
  669. , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
  670. FastJsonUtil.toJSON(recordAllEntity)));
  671. }
  672. }
  673. }else{
  674. //TODO 说明存在,进行修改
  675. if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
  676. int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAllNew(String.valueOf(value)," WHERE " +
  677. "(" +
  678. " org_name = '" + recordAllEntity.get("org_name") + "' and" +
  679. " \"time\" = '" + recordAllEntity.get("time") + "' and" +
  680. " value_tag = '" + recordAllEntity.get("value_tag") + "'" +
  681. ") ");
  682. if (updateCode < 0) {
  683. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  684. , mStrClassName
  685. , mStrClassName
  686. , String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ",
  687. FastJsonUtil.toJSON(recordAllEntity)));
  688. }
  689. }
  690. }
  691. }
  692. latch.countDown();
  693. }
  694. });
  695. }catch(Exception ex){}
  696. }
  697. try{latch.await();}catch(Exception ex){}
  698. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算15分钟用水量结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  699. }
  700. }
  701. //TODO 小时用水量 每天整点10分检查并写入,检查范围为当前时间到昨天23点
  702. public static void checkRecordAllData(int beforHour){
  703. //TODO 检查小时用水量
  704. DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  705. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算小时用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  706. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算小时用水量进行中:......");
  707. List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList("WHERE 1=1 AND org_name not like '%营业所' AND org_name!='北碚水厂' AND org_name!='渝中区水厂'");
  708. if (!CollectionUtils.isEmpty(configList)) {
  709. //TODO 按照组织机构分组
  710. Map<Object, List<Map<String, Object>>> groupedData =
  711. configList.stream().collect(Collectors.groupingBy(item -> item.get("org_name")));
  712. final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
  713. for (Object key:groupedData.keySet()){
  714. try {
  715. ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
  716. @Override
  717. public void function() {
  718. //TODO 根据当前配置信息item 查询远通数据中的历史数据
  719. //TODO 首先查询当前水厂设备的从昨天之后到得到数据的数据
  720. LocalDateTime today = LocalDateTime.now();
  721. LocalDateTime startDateTime = today.withMinute(0).withSecond(0).withNano(0).minusHours(beforHour);
  722. //TODO 需计算的循环天数
  723. //TODO 此循环天数每一天所查的是所有设备每小时数据合
  724. LocalDateTime newStartDateTime = startDateTime;
  725. String startDate = newStartDateTime.format(formater);
  726. String endDate = today.withMinute(1).withSecond(0).format(formater);
  727. List<Map<String, Object>> deviceList = groupedData.get(key);
  728. //TODO 循环获取该天该水厂每个设备数据
  729. //TODO 查询当前天日期内每小时的设备数据
  730. for (int i = 0; i < Integer.valueOf(String.valueOf(Duration.between(startDateTime, today.withMinute(0).withSecond(0)).toHours())); i++) {
  731. String startTime = newStartDateTime.minusHours(-i).format(formater);
  732. String endTime = newStartDateTime.minusMinutes(-1).minusHours(-(i+1)).format(formater);//查询时间加一分钟
  733. Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
  734. recordAllEntity.put("org_name", key.toString());//水厂
  735. recordAllEntity.put("time", newStartDateTime.minusHours(-(i+1)).format(formater));//采集时间(小时的最后时间)
  736. recordAllEntity.put("value", null);
  737. recordAllEntity.put("value_tag", "water");
  738. recordAllEntity.put("collcation_tag_array", FastJsonUtil.toJSON(deviceList));
  739. Double value = null;
  740. //TODO 此循环计算该小时所有设备的用水量
  741. for (Map<String, Object> item : deviceList) {
  742. //TODO 和尚山水厂的三个OPC的设备0点到1点的数据特殊处理,直接取最后一条数据作为0点到1点的用水量
  743. // 定义字符串日期时间的格式
  744. DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  745. // 解析字符串以创建 LocalDateTime 实例
  746. LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
  747. if (dateTime.getHour()==1&&("SS.SSCOM.F36:2".equals(item.get("collcation_tag").toString()) ||
  748. "SS.SSCOM.F36:5".equals(item.get("collcation_tag").toString()) ||
  749. "OPC_SC.HSS.HSS1.AI.LJLL3.F_CV".equals(item.get("collcation_tag").toString()))) {
  750. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  751. " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
  752. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  753. " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
  754. if(itemCount!=null&&itemCount>0) {
  755. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  756. " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
  757. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  758. " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  759. " order by QCQUISITION_TIME");
  760. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
  761. if (value == null) {
  762. value = 0.00;
  763. }
  764. value += tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
  765. }
  766. }
  767. }else {
  768. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  769. " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
  770. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  771. " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
  772. if (itemCount != null && itemCount > 0) {
  773. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  774. " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
  775. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  776. " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  777. " order by QCQUISITION_TIME");
  778. //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量
  779. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
  780. Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null;
  781. Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
  782. if (firstValue != null && lastValue != null) {
  783. //到此处是该小时一个设备的用水量已加上
  784. if (value == null) {
  785. value = 0.00;
  786. }
  787. value += Math.abs(lastValue - firstValue);
  788. }
  789. }
  790. }
  791. }
  792. }
  793. recordAllEntity.put("value", value);
  794. List<Map<String, Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllList(1, 0,
  795. " WHERE org_name = '" + recordAllEntity.get("org_name")
  796. + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
  797. if (CollectionUtils.isEmpty(queryWaterRecord)) {
  798. //TODO 说明不存在,进行插入
  799. if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
  800. int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAll(" (" +
  801. "'" + recordAllEntity.get("org_name") + "'," +
  802. "'" + recordAllEntity.get("time") + "'," +
  803. "'" + recordAllEntity.get("value") + "'," +
  804. "'" + recordAllEntity.get("value_tag") + "'," +
  805. "'" + recordAllEntity.get("collcation_tag_array") + "'" +
  806. ") ");
  807. if (insertCode < 0) {
  808. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  809. , mStrClassName
  810. , mStrClassName
  811. , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
  812. FastJsonUtil.toJSON(recordAllEntity)));
  813. }
  814. }
  815. }else{
  816. //TODO 说明存在,进行修改
  817. if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
  818. int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAll(String.valueOf(value)," WHERE " +
  819. "(" +
  820. " org_name = '" + recordAllEntity.get("org_name") + "' and" +
  821. " \"time\" = '" + recordAllEntity.get("time") + "' and" +
  822. " value_tag = '" + recordAllEntity.get("value_tag") + "'" +
  823. ") ");
  824. if (updateCode < 0) {
  825. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  826. , mStrClassName
  827. , mStrClassName
  828. , String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ",
  829. FastJsonUtil.toJSON(recordAllEntity)));
  830. }
  831. }
  832. }
  833. }
  834. latch.countDown();
  835. }
  836. });
  837. }catch(Exception ex){}
  838. }
  839. try{latch.await();}catch(Exception ex){}
  840. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算小时用水量结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  841. }
  842. }
  843. //TODO 原始历史数据 每天整点15分检查并写入,检查范围为当前时间到昨天
  844. public static void checkRecordData(){
  845. //TODO ① 首先查询水厂设备配置信息
  846. try {
  847. List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList("WHERE 1=1 AND org_name not like '%营业所' AND org_name!='北碚水厂' AND org_name!='渝中区水厂'");
  848. if(!CollectionUtils.isEmpty(configList)){
  849. //声明总数据的数据数组
  850. List<Map<String,Object>> newRecordAll = Collections.synchronizedList(new LinkedList<Map<String,Object>>());
  851. //TODO ②开启多线程并发处理各水厂设备的数据
  852. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制--开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  853. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制--进行中:......");
  854. final CountDownLatch latch = new CountDownLatch(configList.size());
  855. for(Map<String,Object> item:configList){
  856. try{
  857. ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
  858. @Override
  859. public void function() {
  860. //TODO 根据当前配置信息item 查询远通数据中的历史数据
  861. //TODO 首先查询当前水厂设备的昨天0点之后到当前时间最新数据的数据
  862. LocalDateTime today = LocalDateTime.now();
  863. LocalDateTime yesterday = today.minusDays(1).withHour(0).withMinute(0).withSecond(0).withNano(0);
  864. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(" WHERE TAG_CODE = '"+item.get("collcation_tag")+"' " +
  865. " and QCQUISITION_TIME >= to_date('"+yesterday.format(formater)+"', 'yyyy-mm-dd hh24:mi:ss')");
  866. if(itemCount!=null&&itemCount>0) {
  867. //TODO 优化 以分页方式查询所有,初始分页行数定为2000查询速率较好
  868. int pageNum = itemCount%2000==0?itemCount/2000:(itemCount/2000)+1;//总页数
  869. Integer limit = 2000;
  870. if(pageNum<=1){
  871. limit = itemCount;//说明总数比第一页小
  872. }
  873. for (int i = 0; i < pageNum; i++) {
  874. Integer offset = i*limit;
  875. //tapWaterHistoryList 为远通水量数据源
  876. //TODO 因为是orcale数据库 分页方式以伪列rownum为分页条件,因此limit实际上永远比offset大2000,因此只要不是第一页,limit=offset+2000
  877. Integer limitNew = 0;
  878. if(i>0){
  879. limitNew = offset+2000;
  880. }else{
  881. limitNew = 2000;
  882. }
  883. List<Map<String,Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(limitNew,offset," " +
  884. "AND TAG_CODE = '"+item.get("collcation_tag")+"' " +
  885. "and QCQUISITION_TIME >= to_date('"+yesterday.format(formater)+"', 'yyyy-mm-dd hh24:mi:ss')");
  886. if(!CollectionUtils.isEmpty(tapWaterHistoryList)){
  887. //TODO 循环远通水量数据列表,查询数据不存在的话就插入
  888. for (int j = 0; j < tapWaterHistoryList.size(); j++) {
  889. List<Map<String,Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordList(1,0,
  890. " WHERE collcation_tag = '"+tapWaterHistoryList.get(j).get("TAG_CODE")
  891. +"' AND org_name = '"+tapWaterHistoryList.get(j).get("NAME")+"' AND time = '"+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+"'");
  892. if(CollectionUtils.isEmpty(queryWaterRecord)){
  893. //TODO 说明没插入过本系统,执行插入
  894. String extend = " ('"+tapWaterHistoryList.get(j).get("TAG_CODE")+"','"+tapWaterHistoryList.get(j).get("NAME")+"',"
  895. + ""+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+","
  896. + "'"+tapWaterHistoryList.get(j).get("VAL")+"',"
  897. + TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("UPDATE_TIME").toString())
  898. +") ";
  899. int insertCode = getWaterTapWaterApi().insertWaterCollectionRecord(extend);
  900. if(insertCode<=0){
  901. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  902. , mStrClassName
  903. , mStrClassName
  904. , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
  905. extend));
  906. }
  907. }
  908. }
  909. }
  910. }
  911. }
  912. latch.countDown();
  913. }
  914. });
  915. }catch(Exception ex){
  916. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  917. , mStrClassName
  918. , mStrClassName
  919. , String.format("Batch initTapWaterDataThread 水厂:{%s},标识:{%s} ERROR:{%s} ",
  920. item.get("org_name"),
  921. item.get("collcation_tag")
  922. , ex.getLocalizedMessage()));
  923. }
  924. }
  925. latch.await();
  926. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制--结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  927. }
  928. }catch(Exception ex){
  929. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  930. , mStrClassName
  931. , mStrClassName
  932. , String.format("Batch initTapWaterData ERROR:{%s} "
  933. , ex.getLocalizedMessage()));
  934. }
  935. }
  936. //TODO 初始化添加对比远通数据
  937. public static void initTapWaterData(){
  938. //TODO ① 首先查询水厂设备配置信息
  939. try {
  940. List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList("WHERE 1=1 AND org_name not like '%营业所' AND org_name!='北碚水厂' AND org_name!='渝中区水厂'");
  941. if(!CollectionUtils.isEmpty(configList)){
  942. //声明总数据的数据数组
  943. List<Map<String,Object>> newRecordAll = Collections.synchronizedList(new LinkedList<Map<String,Object>>());
  944. //TODO ②开启多线程并发处理各水厂设备的数据
  945. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"比对远通原始数据开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  946. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"进行中:......");
  947. final CountDownLatch latch = new CountDownLatch(configList.size());
  948. LocalDateTime startTimeLocal = LocalDateTime.now().minusDays(1).minusHours(1);
  949. for(Map<String,Object> item:configList){
  950. try{
  951. ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
  952. @Override
  953. public void function() {
  954. //TODO 根据当前配置信息item 查询远通数据中的历史数据
  955. //TODO 首先查询当前水厂设备的2025-01-01之后到最新数据的数据总数,然后分页形式获取数据
  956. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(" WHERE TAG_CODE = '"+item.get("collcation_tag")+"' and QCQUISITION_TIME >= to_date('"+startTimeLocal.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"', 'yyyy-mm-dd hh24:mi:ss')");
  957. if(itemCount!=null&&itemCount>0) {
  958. //TODO 优化 以分页方式查询所有,初始分页行数定为2000查询速率较好
  959. int pageNum = itemCount%2000==0?itemCount/2000:(itemCount/2000)+1;//总页数
  960. Integer limit = 2000;
  961. if(pageNum<=1){
  962. limit = itemCount;//说明总数比第一页小
  963. }
  964. for (int i = 0; i < pageNum; i++) {
  965. Integer offset = i*limit;
  966. //tapWaterHistoryList 为远通水量数据源
  967. //TODO 因为是orcale数据库 分页方式以伪列rownum为分页条件,因此limit实际上永远比offset大2000,因此只要不是第一页,limit=offset+2000
  968. Integer limitNew = 0;
  969. if(i>0){
  970. limitNew = offset+2000;
  971. }else{
  972. limitNew = 2000;
  973. }
  974. List<Map<String,Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(limitNew,offset," AND TAG_CODE = '"+item.get("collcation_tag")+"' and QCQUISITION_TIME >= to_date('"+startTimeLocal.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"', 'yyyy-mm-dd hh24:mi:ss')");
  975. if(!CollectionUtils.isEmpty(tapWaterHistoryList)){
  976. //TODO 循环远通水量数据列表,查询数据不存在的话就插入
  977. for (int j = 0; j < tapWaterHistoryList.size(); j++) {
  978. List<Map<String,Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordList(1,0,
  979. " WHERE collcation_tag = '"+tapWaterHistoryList.get(j).get("TAG_CODE")
  980. +"' AND org_name = '"+tapWaterHistoryList.get(j).get("NAME")+"' AND time = '"+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+"'");
  981. if(CollectionUtils.isEmpty(queryWaterRecord)){
  982. //TODO 说明没插入过本系统,执行插入
  983. String extend = " ('"+tapWaterHistoryList.get(j).get("TAG_CODE")+"','"+tapWaterHistoryList.get(j).get("NAME")+"',"
  984. + ""+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+","
  985. + "'"+tapWaterHistoryList.get(j).get("VAL")+"',"
  986. + TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("UPDATE_TIME").toString())
  987. +") ";
  988. int insertCode = getWaterTapWaterApi().insertWaterCollectionRecord(extend);
  989. if(insertCode<=0){
  990. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  991. , mStrClassName
  992. , mStrClassName
  993. , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
  994. extend));
  995. }
  996. }
  997. }
  998. }
  999. }
  1000. }
  1001. latch.countDown();
  1002. }
  1003. });
  1004. }catch(Exception ex){
  1005. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  1006. , mStrClassName
  1007. , mStrClassName
  1008. , String.format("Batch initTapWaterDataThread 水厂:{%s},标识:{%s} ERROR:{%s} ",
  1009. item.get("org_name"),
  1010. item.get("collcation_tag")
  1011. , ex.getLocalizedMessage()));
  1012. }
  1013. }
  1014. latch.await();
  1015. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"比对远通原始数据结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  1016. }
  1017. }catch(Exception ex){
  1018. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  1019. , mStrClassName
  1020. , mStrClassName
  1021. , String.format("Batch initTapWaterData ERROR:{%s} "
  1022. , ex.getLocalizedMessage()));
  1023. }
  1024. }
  1025. //TODO 初始化添加计算水厂所有设备每日的每小时用水量计算
  1026. public static void initWaterCollecationReacordAll(){
  1027. //TODO ① 首先查询水厂设备配置信息
  1028. try {
  1029. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"计算小时用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  1030. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"计算小时用水量进行中:......");
  1031. List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList("WHERE 1=1 AND org_name not like '%营业所' AND org_name!='北碚水厂' AND org_name!='渝中区水厂'");
  1032. if (!CollectionUtils.isEmpty(configList)) {
  1033. //TODO 按照组织机构分组
  1034. Map<Object, List<Map<String, Object>>> groupedData =
  1035. configList.stream().collect(Collectors.groupingBy(item -> item.get("org_name")));
  1036. final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
  1037. //TODO 外层循环组织机构
  1038. for (Object key:groupedData.keySet()){
  1039. try{
  1040. ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
  1041. @Override
  1042. public void function() {
  1043. //TODO 根据当前配置信息item 查询远通数据中的历史数据
  1044. //TODO 首先查询当前水厂设备的从2025-01-01之后到得到数据
  1045. // LocalDateTime startDateTime = LocalDateTime.now().withMinute(0).withSecond(0)
  1046. // .minusDays(1).minusHours(1);
  1047. LocalDateTime startDateTime = LocalDateTime.now().withHour(0).withMinute(0).withSecond(0)
  1048. .minusDays(3);
  1049. //TODO 需计算的循环天数
  1050. Long days = 0L;
  1051. days = Duration.between(startDateTime, LocalDateTime.now()).toDays();
  1052. //TODO 此循环天数每一天所查的是所有设备每小时数据合
  1053. final CountDownLatch latch2 = new CountDownLatch(days.intValue());
  1054. for(Long k = 0L;k<days;k++) {
  1055. LocalDateTime newStartDateTime = startDateTime.minusDays(-k.intValue());
  1056. String startDate = newStartDateTime.format(formater);
  1057. String endDate = newStartDateTime.withMinute(1).minusDays(-1).format(formater);
  1058. try {
  1059. ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
  1060. @Override
  1061. public void function() {
  1062. List<Map<String, Object>> deviceList = groupedData.get(key);
  1063. //TODO 循环获取该天该水厂每个设备数据
  1064. //TODO 查询当前天日期内每15分钟的设备数据
  1065. for (int i = 0; i < 96; i++) {
  1066. String startTime = newStartDateTime.minusHours(-i).format(formater);
  1067. String endTime = newStartDateTime.minusMinutes(-1).minusHours(-(i+1)).format(formater);//查询时间加一分钟
  1068. Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
  1069. recordAllEntity.put("org_name", key.toString());//水厂
  1070. recordAllEntity.put("time", newStartDateTime.minusHours(-(i+1)).format(formater));//采集时间(小时的最后时间)
  1071. recordAllEntity.put("value", null);
  1072. recordAllEntity.put("value_tag", "water");
  1073. recordAllEntity.put("collcation_tag_array", FastJsonUtil.toJSON(deviceList));
  1074. Double value = null;
  1075. //TODO 此循环计算该小时所有设备的用水量
  1076. for (Map<String, Object> item : deviceList) {
  1077. //TODO 和尚山水厂的三个OPC的设备0点到1点的数据特殊处理,直接取最后一条数据作为0点到1点的用水量
  1078. // 定义字符串日期时间的格式
  1079. DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  1080. // 解析字符串以创建 LocalDateTime 实例
  1081. LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
  1082. if (dateTime.getHour()==1&&("SS.SSCOM.F36:2".equals(item.get("collcation_tag").toString()) ||
  1083. "SS.SSCOM.F36:5".equals(item.get("collcation_tag").toString()) ||
  1084. "OPC_SC.HSS.HSS1.AI.LJLL3.F_CV".equals(item.get("collcation_tag").toString()))) {
  1085. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  1086. " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
  1087. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  1088. " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
  1089. if(itemCount!=null&&itemCount>0) {
  1090. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  1091. " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
  1092. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  1093. " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  1094. " order by QCQUISITION_TIME");
  1095. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
  1096. if (value == null) {
  1097. value = 0.00;
  1098. }
  1099. value += tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
  1100. }
  1101. }
  1102. }else {
  1103. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  1104. " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
  1105. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  1106. " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
  1107. if (itemCount != null && itemCount > 0) {
  1108. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  1109. " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
  1110. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  1111. " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  1112. " order by QCQUISITION_TIME");
  1113. //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量
  1114. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
  1115. Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null;
  1116. Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
  1117. if (firstValue != null && lastValue != null) {
  1118. //到此处是该小时一个设备的用水量已加上
  1119. if (value == null) {
  1120. value = 0.00;
  1121. }
  1122. value += Math.abs(lastValue - firstValue);
  1123. }
  1124. }
  1125. }
  1126. }
  1127. }
  1128. recordAllEntity.put("value", value);
  1129. List<Map<String, Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllList(1, 0,
  1130. " WHERE org_name = '" + recordAllEntity.get("org_name")
  1131. + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
  1132. if (CollectionUtils.isEmpty(queryWaterRecord)) {
  1133. //TODO 说明不存在,进行插入
  1134. if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
  1135. int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAll(" (" +
  1136. "'" + recordAllEntity.get("org_name") + "'," +
  1137. "'" + recordAllEntity.get("time") + "'," +
  1138. "'" + recordAllEntity.get("value") + "'," +
  1139. "'" + recordAllEntity.get("value_tag") + "'," +
  1140. "'" + recordAllEntity.get("collcation_tag_array") + "'" +
  1141. ") ");
  1142. if (insertCode < 0) {
  1143. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  1144. , mStrClassName
  1145. , mStrClassName
  1146. , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
  1147. FastJsonUtil.toJSON(recordAllEntity)));
  1148. }
  1149. }
  1150. }else{
  1151. //TODO 说明存在,进行修改
  1152. if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
  1153. int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAll(String.valueOf(value)," WHERE " +
  1154. "(" +
  1155. " org_name = '" + recordAllEntity.get("org_name") + "' and" +
  1156. " \"time\" = '" + recordAllEntity.get("time") + "' and" +
  1157. " value_tag = '" + recordAllEntity.get("value_tag") + "'" +
  1158. ") ");
  1159. if (updateCode < 0) {
  1160. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  1161. , mStrClassName
  1162. , mStrClassName
  1163. , String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ",
  1164. FastJsonUtil.toJSON(recordAllEntity)));
  1165. }
  1166. }
  1167. }
  1168. }
  1169. latch2.countDown();
  1170. }
  1171. });
  1172. }catch(Exception ex){
  1173. }
  1174. }
  1175. try {
  1176. latch2.await();
  1177. }catch(Exception ex){
  1178. }
  1179. latch.countDown();
  1180. }
  1181. });
  1182. }catch(Exception ex){
  1183. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  1184. , mStrClassName
  1185. , mStrClassName
  1186. , String.format("Batch" +
  1187. " initWaterReacordAll ERROR:{%s} "
  1188. , ex.getLocalizedMessage()));
  1189. }
  1190. }
  1191. latch.await();
  1192. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"计算小时用水量结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  1193. }
  1194. }catch(Exception ex){
  1195. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  1196. , mStrClassName
  1197. , mStrClassName
  1198. , String.format("Batch initWaterCollecationReacordAll ERROR:{%s} "
  1199. , ex.getLocalizedMessage()));
  1200. }
  1201. }
  1202. //TODO 初始化添加计算泵每日的小时数据
  1203. public static void initWaterPumpReacordAll(String startFindTime){
  1204. //TODO ① 首先查询水厂设备配置信息
  1205. try {
  1206. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"添加小时泵数据开始时间:"+ TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  1207. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"添加小时泵数据起始时间:("+startFindTime+")进行中:......");
  1208. List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterPumpCollectionConfigList(null);
  1209. if (!CollectionUtils.isEmpty(configList)) {
  1210. //TODO 按照设备分组
  1211. Map<Object, List<Map<String, Object>>> groupedData =
  1212. configList.stream().collect(Collectors.groupingBy(item -> item.get("device_code")));
  1213. final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
  1214. //TODO 外层循环组织机构
  1215. for (Object key:groupedData.keySet()){
  1216. try{
  1217. new Thread(() -> {
  1218. //TODO 根据当前配置信息item 查询远通数据中的历史数据
  1219. //TODO 首先查询当前水厂设备的从startFindTime之后到得到数据
  1220. LocalDateTime startDateTime = LocalDateTime.parse(startFindTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  1221. //TODO 需计算的循环天数
  1222. Long days = 0L;
  1223. days = Duration.between(startDateTime, LocalDateTime.now()).toDays();
  1224. days+=1;
  1225. //TODO 此循环天数每一天所查的是所有设备每小时数据合
  1226. final CountDownLatch latch2 = new CountDownLatch(days.intValue());
  1227. for(Long k = 0L;k<days;k++) {
  1228. LocalDateTime newStartDateTime = startDateTime.minusDays(-k.intValue());
  1229. String startDate = newStartDateTime.format(formater);
  1230. String endDate = newStartDateTime.withMinute(1).minusDays(-1).format(formater);
  1231. try {
  1232. new Thread(() -> {
  1233. List<Map<String, Object>> deviceList = groupedData.get(key);
  1234. //TODO 循环获取该天该水厂每个设备数据
  1235. //TODO 查询当前天日期内每小时的设备数据
  1236. for (int i = 0; i < 24; i++) {
  1237. String startTime = newStartDateTime.withHour(i).format(formater);
  1238. //TODO 需要加个05分把endTime的整点数据查出来
  1239. String endTime = newStartDateTime.withHour(i).minusHours(-1).withMinute(5).withSecond(0).format(formater);
  1240. //TODO 此循环计算该小时所有设备的泵数据
  1241. for (Map<String, Object> item : deviceList) {
  1242. // 定义字符串日期时间的格式
  1243. DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  1244. // 解析字符串以创建 LocalDateTime 实例
  1245. LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
  1246. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  1247. " WHERE 1=1 AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
  1248. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  1249. " and QCQUISITION_TIME <= to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  1250. " AND ABS(" +
  1251. " (QCQUISITION_TIME - TRUNC(QCQUISITION_TIME, 'HH')) * 24 * 60 " +
  1252. " ) <= 5");//查询 -- 采集时间接近整点,误差在5分钟内
  1253. if (itemCount != null && itemCount > 0) {
  1254. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  1255. " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
  1256. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  1257. " and QCQUISITION_TIME <= to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  1258. " AND ABS( " +
  1259. " (QCQUISITION_TIME - TRUNC(QCQUISITION_TIME, 'HH')) * 24 * 60 " +
  1260. " ) <= 5 " +
  1261. " order by QCQUISITION_TIME");
  1262. //TODO 这里需要注意泵数据的采集配置表设备号可能有多个,因此,要判断对应的standard_code做对应处理
  1263. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() >= 1) {
  1264. //TODO 数据库操作
  1265. for (Map<String,Object> mapEntity:tapWaterHistoryList) {
  1266. Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
  1267. recordAllEntity.put("device_code", groupedData.get(key).get(0).get("device_code"));
  1268. recordAllEntity.put("time", LocalDateTime.parse(mapEntity.get("QCQUISITION_TIME").toString(), formatter)
  1269. .withMinute(0).withSecond(0).format(formatter));//采集时间(小时的最后时间)
  1270. if(item.get("standard_code").equals("active_energy")){
  1271. recordAllEntity.put("active_energy",mapEntity.get("VAL"));
  1272. }
  1273. if(item.get("standard_code").equals("startup_state")){
  1274. recordAllEntity.put("startup_state",mapEntity.get("VAL"));
  1275. }
  1276. if(item.get("standard_code").equals("phase_a_current")){
  1277. if(ObjectUtils.isEmpty(mapEntity.get("VAL"))||"0".equals(mapEntity.get("VAL").toString())) {
  1278. recordAllEntity.put("startup_state", 0);
  1279. }else{
  1280. recordAllEntity.put("startup_state", 1);
  1281. }
  1282. }
  1283. //TODO 直接调用插入或者新增方法
  1284. int insertCode = getWaterTapWaterApi().insertOrUpdateWaterPumpRecordAll(recordAllEntity);
  1285. if (insertCode < 0) {
  1286. System.out.print(String.format("Batch insertOrUpdateWaterPumpRecordAll 未成功:{%s} ",
  1287. JSONObject.toJSON(recordAllEntity)));
  1288. }
  1289. }
  1290. }
  1291. }
  1292. }
  1293. }
  1294. latch2.countDown();
  1295. }).start();
  1296. }catch(Exception ex){
  1297. }
  1298. }
  1299. try {
  1300. latch2.await();
  1301. }catch(Exception ex){
  1302. }
  1303. latch.countDown();
  1304. }).start();
  1305. }catch(Exception ex){
  1306. System.out.print(String.format("Batch" +
  1307. " initWaterReacordAll ERROR:{%s} "
  1308. , ex.getLocalizedMessage()));
  1309. }
  1310. }
  1311. latch.await();
  1312. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"添加小时泵数据检查机制("+startFindTime+")结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  1313. }
  1314. }catch(Exception ex){
  1315. System.out.print(String.format("Batch initWaterCollecationReacordAll ERROR:{%s} "
  1316. , ex.getLocalizedMessage()));
  1317. }
  1318. }
  1319. }