KprAimTapWaterBizFun.java 93 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245
  1. package com.shkpr.service.aimodelpower.bizmgr;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.global.base.log.LogLevelFlag;
  4. import com.global.base.log.LogPrintMgr;
  5. import com.global.base.tools.FastJsonUtil;
  6. import com.shkpr.service.aimodelpower.commtools.TimeTool;
  7. import com.shkpr.service.aimodelpower.dbdao.DBMgrProxy;
  8. import com.shkpr.service.aimodelpower.dbdao.shizilaishuiDataSource.service.intef.WaterCollecationService;
  9. import com.shkpr.service.aimodelpower.dbdao.shizilaishuiDataSource.service.intef.WaterTapWaterService;
  10. import com.shkpr.service.aimodelpower.dto.TraceRunnable;
  11. import com.shkpr.service.aimodelpower.globalmgr.ThreadTaskMgr;
  12. import com.shkpr.service.aimodelpower.globalmgr.TraceLogMgr;
  13. import org.springframework.util.CollectionUtils;
  14. import org.springframework.util.ObjectUtils;
  15. import java.io.FileInputStream;
  16. import java.io.IOException;
  17. import java.io.InputStream;
  18. import java.time.Duration;
  19. import java.time.LocalDate;
  20. import java.time.LocalDateTime;
  21. import java.time.LocalTime;
  22. import java.time.format.DateTimeFormatter;
  23. import java.util.*;
  24. import java.util.concurrent.CountDownLatch;
  25. import java.util.stream.Collectors;
  26. /**
  27. * @ClassName KprAimTapWaterBizFun
  28. * @Description: TODO
  29. * @Author LX
  30. * @Date 2024/5/22
  31. * @Version V1.0
  32. **/
  33. public class KprAimTapWaterBizFun {
  34. private static final String MSG_SUCCESS = "success.";
  35. private static final String MSG_FAILED = "failed.";
  36. private static final String mStrClassName = "KprAimTapWaterBizFun";
  37. private static final String EMPTY_NULL = "NULL";
  38. public static WaterTapWaterService getWaterTapWaterApi(){
  39. return DBMgrProxy.getInstance().applyWaterTapWaterService();
  40. }
  41. public static WaterCollecationService getWaterCollecationApi(){
  42. return DBMgrProxy.getInstance().applyWaterCollecationService();
  43. }
  44. static DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  45. //TODO 15分间隔的预测插值(做假暂时用,不用时注释)
  46. //TODO 思路:等预测小时的原程序完成后, 原预测表中为小时预测数据,分钟字段为空,将预测表中的数据进行二次清洗然后插入新预测表
  47. //TODO beforeDays为往后推清洗多少天 n-1为逻辑, 例如传1就是0天当天
  48. public static void yuceZuojia(int beforeDays){
  49. try{
  50. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"预测15分钟插值开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  51. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"预测15分钟插值进行中:......");
  52. List<Map<String, Object>> orgConfig = getWaterCollecationApi().getOrgConfig(false, 0, 0, " AND org_name not like '%营业所' and (org_name!='北碚水厂' and org_name!='渝中区水厂')");
  53. final CountDownLatch latch = new CountDownLatch(orgConfig.size());
  54. DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
  55. LocalDateTime startDate = LocalDateTime.now().withMinute(0).withSecond(0);
  56. for (Map<String,Object> org:orgConfig) {
  57. ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
  58. @Override
  59. public void function() {
  60. try {
  61. for (int dayNum=0;dayNum<beforeDays;dayNum++) {
  62. List<Map<String, Object>> resHourList = getWaterCollecationApi().getTbMHourwater(false,
  63. 20, 0, " AND \"orgId\" = '" + org.get("org_id") + "' AND \"Date\" = '" + startDate.minusDays(-dayNum).format(formatter) + "' ORDER BY \"Date\",\"Hour\" ASC ");
  64. processWaterData(resHourList);
  65. }
  66. latch.countDown();
  67. }catch(Exception ex){
  68. ex.printStackTrace();
  69. }
  70. }
  71. });
  72. }
  73. latch.await();
  74. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"预测15分钟插值已结束");
  75. }catch(Exception ex){
  76. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  77. , mStrClassName
  78. , mStrClassName
  79. , String.format("Batch yuceZuojia ERROR:{%s} "
  80. , ex.getLocalizedMessage()));
  81. }
  82. }
  83. private static DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
  84. private static DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
  85. public static void processWaterData(List<Map<String, Object>> resHourList) {
  86. List<List<Map<String, Object>>> trendGroups = new ArrayList<>();
  87. List<Map<String, Object>> currentGroup = new ArrayList<>();
  88. for (int i = 0; i < resHourList.size(); i++) {
  89. Map<String, Object> currentMap = resHourList.get(i);
  90. double currentHourData = getDoubleValue(currentMap, "HourForecastActualWaterSupply");
  91. if (i == 0) {
  92. currentGroup.add(currentMap);
  93. } else {
  94. double previousHourData = getDoubleValue(resHourList.get(i - 1), "HourForecastActualWaterSupply");
  95. boolean isIncreasing = currentHourData > previousHourData;
  96. if (currentGroup.size() > 0) {
  97. double lastGroupData = getDoubleValue(currentGroup.get(currentGroup.size() - 1), "HourForecastActualWaterSupply");
  98. boolean wasIncreasing = lastGroupData < currentHourData;
  99. if (isIncreasing != wasIncreasing) {
  100. trendGroups.add(new ArrayList<>(currentGroup));
  101. currentGroup.clear();
  102. }
  103. }
  104. currentGroup.add(currentMap);
  105. }
  106. }
  107. if (!currentGroup.isEmpty()) {
  108. trendGroups.add(currentGroup);
  109. }
  110. for (List<Map<String, Object>> group : trendGroups) {
  111. for (int i = 0; i < group.size() - 1; i++) {
  112. Map<String, Object> currentMap = group.get(i);
  113. double currentHourData = getDoubleValue(currentMap, "HourForecastActualWaterSupply");
  114. double nextHourData = getDoubleValue(group.get(i + 1), "HourForecastActualWaterSupply");
  115. boolean isIncreasing = nextHourData > currentHourData;
  116. double[] splitData = splitHourData(currentHourData, isIncreasing);
  117. // 步骤2:平滑处理
  118. double alpha = 0.3; // 平滑系数
  119. double[] smoothedData = exponentialSmoothing(splitData,alpha);
  120. // 解析日期字符串
  121. LocalDate date = LocalDate.parse(currentMap.get("Date").toString(), DateTimeFormatter.ISO_LOCAL_DATE);
  122. // 解析时间字符串
  123. LocalTime time = LocalTime.parse(currentMap.get("Hour").toString(), DateTimeFormatter.ISO_LOCAL_TIME);
  124. LocalDateTime dateTime = LocalDateTime.of(date, time).minusMinutes(45);
  125. for (int j = 0; j < smoothedData.length; j++) {
  126. Map<String, Object> newMap = new HashMap<>(currentMap);
  127. newMap.put("HourForecastActualWaterSupply", smoothedData[j]);
  128. LocalDateTime quarterHourTime = dateTime.plusMinutes(j * 15);
  129. newMap.put("Date", quarterHourTime.format(dateFormatter));
  130. newMap.put("Hour", quarterHourTime.format(timeFormatter));
  131. // Here you would query the database and insert/update data
  132. // For example:
  133. double originalValue = queryOriginalValue(quarterHourTime);
  134. newMap.put("HourActualWaterSupply", originalValue);
  135. // Retain only specific keys
  136. Map<String, Object> filteredMap = filterMap(newMap, "Date", "Hour", "HourForecastActualWaterSupply", "HourActualWaterSupply", "LastModifyTime", "orgId");
  137. // Insert or update the database
  138. // int insertRes = insertOrUpdateDatabase(filteredMap);
  139. // Handle the result if needed
  140. Integer insertRes = getWaterTapWaterApi().insertOrUpdateTbmHourWaterNew(filteredMap);
  141. if (insertRes < 1) {
  142. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  143. , mStrClassName
  144. , mStrClassName
  145. , String.format("Batch yuceZuojia ERROR:{%s} ",
  146. "新增或修改失败"));
  147. }
  148. }
  149. }
  150. }
  151. }
  152. private static double getDoubleValue(Map<String, Object> map, String key) {
  153. Object value = map.get(key);
  154. return value != null ? Double.parseDouble(value.toString()) : 0.0;
  155. }
  156. //拆分函数
  157. private static double[] splitHourData(double hourData, boolean isIncreasing) {
  158. double[] splitData = new double[4];
  159. if (isIncreasing) {
  160. // 调整比例以实现更平滑的上升曲线
  161. splitData[0] = hourData * 0.24;
  162. splitData[1] = hourData * 0.26;
  163. splitData[2] = hourData * 0.25;
  164. splitData[3] = hourData * 0.25;
  165. } else {
  166. // 调整比例以实现更平滑的下降曲线
  167. splitData[0] = hourData * 0.25;
  168. splitData[1] = hourData * 0.25;
  169. splitData[2] = hourData * 0.26;
  170. splitData[3] = hourData * 0.24;
  171. }
  172. return splitData;
  173. }
  174. //平滑函数
  175. private static double[] smoothData(double[] data) {
  176. double[] smoothedData = new double[data.length];
  177. int windowSize = 3; // 移动平均窗口大小
  178. for (int i = 0; i < data.length; i++) {
  179. double sum = 0;
  180. int count = 0;
  181. for (int j = Math.max(0, i - windowSize / 2); j <= Math.min(data.length - 1, i + windowSize / 2); j++) {
  182. sum += data[j];
  183. count++;
  184. }
  185. smoothedData[i] = sum / count;
  186. }
  187. return smoothedData;
  188. }
  189. //指数平滑
  190. private static double[] exponentialSmoothing(double[] data, double alpha) {
  191. double[] smoothedData = new double[data.length];
  192. smoothedData[0] = data[0]; // 初始化第一个值
  193. for (int i = 1; i < data.length; i++) {
  194. smoothedData[i] = alpha * data[i] + (1 - alpha) * smoothedData[i - 1];
  195. }
  196. return smoothedData;
  197. }
  198. private static double queryOriginalValue(LocalDateTime dateTime) {
  199. // Simulate querying the database for the original value
  200. // Replace with actual database query
  201. return 100.0; // Placeholder value
  202. }
  203. private static Map<String, Object> filterMap(Map<String, Object> map, String... keys) {
  204. Map<String, Object> filteredMap = new LinkedHashMap<>();
  205. for (String key : keys) {
  206. if (map.containsKey(key)) {
  207. filteredMap.put(key, map.get(key));
  208. }
  209. }
  210. return filteredMap;
  211. }
  212. private static Map<String, List<String>> parseConfig(String configstr) {
  213. // Initialize the map to hold the parsed configuration
  214. Map<String, List<String>> configMap = new HashMap<>();
  215. // Get the configuration string
  216. String configString = configstr;
  217. if (configString != null) {
  218. // Split the configuration by different locations
  219. String[] locations = configString.split("},");
  220. for (String location : locations) {
  221. // Find the index of the opening brace
  222. int braceIndex = location.indexOf('{');
  223. if (braceIndex > 0) {
  224. // Extract the location name
  225. String locationName = location.substring(0,braceIndex).trim();
  226. // Extract the tags and split by comma
  227. String tagsString = location.substring(braceIndex + 1).replace("}", "").trim();
  228. List<String> tags = Arrays.asList(tagsString.split(","));
  229. // Add to the map
  230. configMap.put(locationName, tags);
  231. }
  232. }
  233. }
  234. return configMap;
  235. }
  236. //TODO 小时营业所片区用水量(自供+(供入-供出))
  237. //TODO beforHour是小时数,当前时间往前扣多少个小时 按15分钟刻度计算用水量
  238. public static void checkBusinessRecordAllData(int beforHour,String selfconfessStr,String supplyinStr,String confessStr){
  239. //TODO 检查小时用水量
  240. DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  241. try{
  242. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算营业所小时用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  243. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算营业所小时用水量进行中:......");
  244. //获取配置关系
  245. //TODO 自供
  246. Map<String,List<String>> selfconfess = parseConfig(selfconfessStr);
  247. //TODO 供入
  248. Map<String,List<String>> supplyin = parseConfig(supplyinStr);
  249. //TODO 供出
  250. Map<String,List<String>> confess = parseConfig(confessStr);
  251. //TODO 根据当前配置信息item 查询远通数据中的历史数据
  252. //TODO 首先查询当前水厂设备的从昨天之后到得到数据的数据
  253. LocalDateTime today = LocalDateTime.now();
  254. LocalDateTime startDateTime = today.withMinute(0).withSecond(0).withNano(0).minusHours(beforHour);
  255. //TODO 需计算的循环天数
  256. //TODO 此循环天数每一天所查的是所有设备每小时数据合
  257. LocalDateTime newStartDateTime = startDateTime;
  258. String startDate = newStartDateTime.format(formater);
  259. String endDate = today.withMinute(1).withSecond(0).format(formater);
  260. final CountDownLatch latch = new CountDownLatch(selfconfess.keySet().size());
  261. for (String orgName:selfconfess.keySet()) {
  262. try {
  263. ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
  264. @Override
  265. public void function() {
  266. for (int i = 0; i < Integer.valueOf(String.valueOf(Duration.between(startDateTime, today.withSecond(0)).toMinutes() / 15)); i++) {
  267. String startTime = newStartDateTime.minusMinutes(-(i * 15)).format(formater);
  268. String endTime = newStartDateTime.minusMinutes(-1).minusMinutes(-((i + 1) * 15)).format(formater);//查询时间加一分钟
  269. Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
  270. recordAllEntity.put("org_name", orgName);//水厂
  271. recordAllEntity.put("time", newStartDateTime.minusMinutes(-((i + 1) * 15)).format(formater));//采集时间(小时的最后时间)
  272. recordAllEntity.put("value", null);
  273. recordAllEntity.put("value_tag", "water");
  274. recordAllEntity.put("collcation_tag_array", "");
  275. //TODO 此循环计算该小时所有设备的用水量
  276. //TODO 先查各个营业所的自供值
  277. List<String> tagTags = selfconfess.get(orgName);//自供
  278. List<String> tagTags2 = supplyin.get(orgName);//供入
  279. List<String> tagTags3 = confess.get(orgName);//供出
  280. Double selfValue = null;
  281. Double supplyInValue = null;
  282. Double confessValue = null;
  283. try {
  284. selfValue = selfconfess(tagTags, startDate, endDate, startTime, endTime);//自供
  285. supplyInValue = selfconfess(tagTags2, startDate, endDate, startTime, endTime);//供入
  286. confessValue = selfconfess(tagTags3, startDate, endDate, startTime, endTime);//供出
  287. } catch (Exception ex) {
  288. }
  289. Double value = null;//总计算值
  290. if (selfValue != null) {
  291. if (supplyInValue != null && confessValue == null) {
  292. value = selfValue + supplyInValue;
  293. } else if (supplyInValue != null && confessValue != null) {
  294. value = selfValue + supplyInValue - confessValue;
  295. } else if (supplyInValue == null && confessValue == null) {
  296. value = selfValue;
  297. } else if (supplyInValue == null && confessValue != null) {
  298. value = selfValue - confessValue;
  299. }
  300. }
  301. recordAllEntity.put("value", value);
  302. System.out.println("营业所片区" + orgName + "值:" + value + ",采集时间:" + newStartDateTime.minusMinutes(-((i + 1) * 15)).format(formater));
  303. List<Map<String, Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllListNew(1, 0,
  304. " WHERE org_name = '" + recordAllEntity.get("org_name")
  305. + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
  306. if (CollectionUtils.isEmpty(queryWaterRecord)) {
  307. // //TODO 说明不存在,进行插入
  308. if (!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
  309. int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAllNew(" (" +
  310. "'" + recordAllEntity.get("org_name") + "'," +
  311. "'" + recordAllEntity.get("time") + "'," +
  312. "'" + recordAllEntity.get("value") + "'," +
  313. "'" + recordAllEntity.get("value_tag") + "'," +
  314. "'" + recordAllEntity.get("collcation_tag_array") + "'" +
  315. ") ");
  316. if (insertCode < 0) {
  317. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  318. , mStrClassName
  319. , mStrClassName
  320. , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
  321. FastJsonUtil.toJSON(recordAllEntity)));
  322. }
  323. }
  324. } else {
  325. //TODO 说明存在,进行修改
  326. if (!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
  327. int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAllNew(String.valueOf(value), " WHERE " +
  328. "(" +
  329. " org_name = '" + recordAllEntity.get("org_name") + "' and" +
  330. " \"time\" = '" + recordAllEntity.get("time") + "' and" +
  331. " value_tag = '" + recordAllEntity.get("value_tag") + "'" +
  332. ") ");
  333. if (updateCode < 0) {
  334. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  335. , mStrClassName
  336. , mStrClassName
  337. , String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ",
  338. FastJsonUtil.toJSON(recordAllEntity)));
  339. }
  340. }
  341. }
  342. }
  343. latch.countDown();
  344. }
  345. });
  346. }catch(Exception ex){
  347. }
  348. }
  349. latch.await();
  350. }catch(Exception ex){
  351. ex.printStackTrace();
  352. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  353. , mStrClassName
  354. , mStrClassName
  355. , String.format("Batch checkBusinessRecordAllData ERROR:{%s} "
  356. , ex.getLocalizedMessage()));
  357. }
  358. }
  359. //TODO 计算总值
  360. private static Double selfconfess(List<String> tagTags,String startDate,String endDate,String startTime,String endTime) throws Exception{
  361. Double value = null;
  362. if (tagTags==null){
  363. return value;
  364. }
  365. for (String tagTag : tagTags) {
  366. // 定义字符串日期时间的格式
  367. DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  368. // 解析字符串以创建 LocalDateTime 实例
  369. LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
  370. if (dateTime.getHour()==1&&("OPC_SC.HSS.HSS1.AI.LJLL1.F_CV".equals(tagTag) ||
  371. "OPC_SC.HSS.HSS1.AI.LJLL2.F_CV".equals(tagTag) ||
  372. "OPC_SC.HSS.HSS1.AI.LJLL3.F_CV".equals(tagTag))) {
  373. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  374. " WHERE TAG_CODE = '" + tagTag + "' " +
  375. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  376. " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
  377. if(itemCount!=null&&itemCount>0) {
  378. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  379. " AND TAG_CODE = '" + tagTag + "' " +
  380. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  381. " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  382. " order by QCQUISITION_TIME");
  383. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
  384. if (value == null) {
  385. value = 0.00;
  386. }
  387. value += tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
  388. }
  389. }
  390. }else {
  391. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  392. " WHERE TAG_CODE = '" + tagTag + "' " +
  393. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  394. " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
  395. if (itemCount != null && itemCount > 0) {
  396. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  397. " AND TAG_CODE = '" + tagTag + "' " +
  398. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  399. " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  400. " order by QCQUISITION_TIME");
  401. //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量
  402. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
  403. Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null;
  404. Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
  405. if (firstValue != null && lastValue != null) {
  406. //到此处是该小时一个设备的用水量已加上
  407. if (value == null) {
  408. value = 0.00;
  409. }
  410. value += Math.abs(lastValue - firstValue);
  411. }
  412. }
  413. }
  414. }
  415. }
  416. return value;
  417. }
  418. //TODO 15分钟 分区用水量 每天整点10分检查并写入,检查范围为当前时间到昨天23点
  419. public static void checkRecordAllDataBusinessFenqu(int beforHour){
  420. //TODO 检查小时用水量
  421. DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  422. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算15分钟用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  423. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算15分钟用水量进行中:......");
  424. List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList("WHERE 1=1 AND org_name not like '%营业所' AND org_name!='北碚水厂' AND org_name!='渝中区水厂'");
  425. if (!CollectionUtils.isEmpty(configList)) {
  426. //TODO 按照组织机构分组
  427. Map<Object, List<Map<String, Object>>> groupedData =
  428. configList.stream().collect(Collectors.groupingBy(item -> item.get("org_name")));
  429. final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
  430. for (Object key:groupedData.keySet()){
  431. try {
  432. ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
  433. @Override
  434. public void function() {
  435. //TODO 根据当前配置信息item 查询远通数据中的历史数据
  436. //TODO 首先查询当前水厂设备的从昨天之后到得到数据的数据
  437. LocalDateTime today = LocalDateTime.now();
  438. LocalDateTime startDateTime = today.withMinute(0).withSecond(0).withNano(0).minusHours(beforHour);
  439. //TODO 需计算的循环天数
  440. //TODO 此循环天数每一天所查的是所有设备每小时数据合
  441. LocalDateTime newStartDateTime = startDateTime;
  442. String startDate = newStartDateTime.format(formater);
  443. String endDate = today.withMinute(1).withSecond(0).format(formater);
  444. List<Map<String, Object>> deviceList = groupedData.get(key);
  445. //TODO 循环获取该天该水厂每个设备数据
  446. //TODO 查询当前天日期内每小时的设备数据
  447. for (int i = 0; i < Integer.valueOf(String.valueOf(Duration.between(startDateTime, today.withSecond(0)).toMinutes()/15)); i++) {
  448. String startTime = newStartDateTime.minusMinutes(-(i * 15)).format(formater);
  449. String endTime = newStartDateTime.minusMinutes(-1).minusMinutes(-((i + 1)*15)).format(formater);//查询时间加一分钟
  450. Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
  451. recordAllEntity.put("org_name", key.toString());//水厂
  452. recordAllEntity.put("time", newStartDateTime.minusMinutes(-((i + 1)*15)).format(formater));//采集时间(小时的最后时间)
  453. recordAllEntity.put("value", null);
  454. recordAllEntity.put("value_tag", "water");
  455. recordAllEntity.put("collcation_tag_array", FastJsonUtil.toJSON(deviceList));
  456. Double value = null;
  457. //TODO 此循环计算该小时所有设备的用水量
  458. for (Map<String, Object> item : deviceList) {
  459. //TODO 和尚山水厂的三个OPC的设备0点到1点的数据特殊处理,直接取最后一条数据作为0点到1点的用水量
  460. // 定义字符串日期时间的格式
  461. DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  462. // 解析字符串以创建 LocalDateTime 实例
  463. LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
  464. if (dateTime.getHour()==1&&("OPC_SC.HSS.HSS1.AI.LJLL1.F_CV".equals(item.get("collcation_tag").toString()) ||
  465. "OPC_SC.HSS.HSS1.AI.LJLL2.F_CV".equals(item.get("collcation_tag").toString()) ||
  466. "OPC_SC.HSS.HSS1.AI.LJLL3.F_CV".equals(item.get("collcation_tag").toString()))) {
  467. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  468. " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
  469. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  470. " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
  471. if(itemCount!=null&&itemCount>0) {
  472. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  473. " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
  474. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  475. " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  476. " order by QCQUISITION_TIME");
  477. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
  478. if (value == null) {
  479. value = 0.00;
  480. }
  481. value += tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
  482. }
  483. }
  484. }else {
  485. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  486. " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
  487. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  488. " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
  489. if (itemCount != null && itemCount > 0) {
  490. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  491. " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
  492. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  493. " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  494. " order by QCQUISITION_TIME");
  495. //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量
  496. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
  497. Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null;
  498. Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
  499. if (firstValue != null && lastValue != null) {
  500. //到此处是该小时一个设备的用水量已加上
  501. if (value == null) {
  502. value = 0.00;
  503. }
  504. value += Math.abs(lastValue - firstValue);
  505. }
  506. }
  507. }
  508. }
  509. }
  510. recordAllEntity.put("value", value);
  511. List<Map<String, Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllListNew(1, 0,
  512. " WHERE org_name = '" + recordAllEntity.get("org_name")
  513. + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
  514. if (CollectionUtils.isEmpty(queryWaterRecord)) {
  515. //TODO 说明不存在,进行插入
  516. if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
  517. int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAllNew(" (" +
  518. "'" + recordAllEntity.get("org_name") + "'," +
  519. "'" + recordAllEntity.get("time") + "'," +
  520. "'" + recordAllEntity.get("value") + "'," +
  521. "'" + recordAllEntity.get("value_tag") + "'," +
  522. "'" + recordAllEntity.get("collcation_tag_array") + "'" +
  523. ") ");
  524. if (insertCode < 0) {
  525. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  526. , mStrClassName
  527. , mStrClassName
  528. , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
  529. FastJsonUtil.toJSON(recordAllEntity)));
  530. }
  531. }
  532. }else{
  533. //TODO 说明存在,进行修改
  534. if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
  535. int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAllNew(String.valueOf(value)," WHERE " +
  536. "(" +
  537. " org_name = '" + recordAllEntity.get("org_name") + "' and" +
  538. " \"time\" = '" + recordAllEntity.get("time") + "' and" +
  539. " value_tag = '" + recordAllEntity.get("value_tag") + "'" +
  540. ") ");
  541. if (updateCode < 0) {
  542. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  543. , mStrClassName
  544. , mStrClassName
  545. , String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ",
  546. FastJsonUtil.toJSON(recordAllEntity)));
  547. }
  548. }
  549. }
  550. }
  551. latch.countDown();
  552. }
  553. });
  554. }catch(Exception ex){}
  555. }
  556. try{latch.await();}catch(Exception ex){}
  557. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算15分钟用水量结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  558. }
  559. }
  560. //TODO 小时用水量 每天整点10分检查并写入,检查范围为当前时间到昨天23点
  561. public static void checkRecordAllData(int beforHour){
  562. //TODO 检查小时用水量
  563. DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  564. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算小时用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  565. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算小时用水量进行中:......");
  566. List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList("WHERE 1=1 AND org_name not like '%营业所' AND org_name!='北碚水厂' AND org_name!='渝中区水厂'");
  567. if (!CollectionUtils.isEmpty(configList)) {
  568. //TODO 按照组织机构分组
  569. Map<Object, List<Map<String, Object>>> groupedData =
  570. configList.stream().collect(Collectors.groupingBy(item -> item.get("org_name")));
  571. final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
  572. for (Object key:groupedData.keySet()){
  573. try {
  574. ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
  575. @Override
  576. public void function() {
  577. //TODO 根据当前配置信息item 查询远通数据中的历史数据
  578. //TODO 首先查询当前水厂设备的从昨天之后到得到数据的数据
  579. LocalDateTime today = LocalDateTime.now();
  580. LocalDateTime startDateTime = today.withMinute(0).withSecond(0).withNano(0).minusHours(beforHour);
  581. //TODO 需计算的循环天数
  582. //TODO 此循环天数每一天所查的是所有设备每小时数据合
  583. LocalDateTime newStartDateTime = startDateTime;
  584. String startDate = newStartDateTime.format(formater);
  585. String endDate = today.withMinute(1).withSecond(0).format(formater);
  586. List<Map<String, Object>> deviceList = groupedData.get(key);
  587. //TODO 循环获取该天该水厂每个设备数据
  588. //TODO 查询当前天日期内每小时的设备数据
  589. for (int i = 0; i < Integer.valueOf(String.valueOf(Duration.between(startDateTime, today.withSecond(0)).toMinutes()/15)); i++) {
  590. String startTime = newStartDateTime.minusHours(-i).format(formater);
  591. String endTime = newStartDateTime.minusMinutes(-1).minusHours(-(i+1)).format(formater);//查询时间加一分钟
  592. Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
  593. recordAllEntity.put("org_name", key.toString());//水厂
  594. recordAllEntity.put("time", newStartDateTime.minusHours(-(i+1)).format(formater));//采集时间(小时的最后时间)
  595. recordAllEntity.put("value", null);
  596. recordAllEntity.put("value_tag", "water");
  597. recordAllEntity.put("collcation_tag_array", FastJsonUtil.toJSON(deviceList));
  598. Double value = null;
  599. //TODO 此循环计算该小时所有设备的用水量
  600. for (Map<String, Object> item : deviceList) {
  601. //TODO 和尚山水厂的三个OPC的设备0点到1点的数据特殊处理,直接取最后一条数据作为0点到1点的用水量
  602. // 定义字符串日期时间的格式
  603. DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  604. // 解析字符串以创建 LocalDateTime 实例
  605. LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
  606. if (dateTime.getHour()==1&&("OPC_SC.HSS.HSS1.AI.LJLL1.F_CV".equals(item.get("collcation_tag").toString()) ||
  607. "OPC_SC.HSS.HSS1.AI.LJLL2.F_CV".equals(item.get("collcation_tag").toString()) ||
  608. "OPC_SC.HSS.HSS1.AI.LJLL3.F_CV".equals(item.get("collcation_tag").toString()))) {
  609. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  610. " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
  611. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  612. " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
  613. if(itemCount!=null&&itemCount>0) {
  614. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  615. " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
  616. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  617. " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  618. " order by QCQUISITION_TIME");
  619. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
  620. if (value == null) {
  621. value = 0.00;
  622. }
  623. value += tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
  624. }
  625. }
  626. }else {
  627. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  628. " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
  629. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  630. " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
  631. if (itemCount != null && itemCount > 0) {
  632. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  633. " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
  634. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  635. " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  636. " order by QCQUISITION_TIME");
  637. //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量
  638. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
  639. Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null;
  640. Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
  641. if (firstValue != null && lastValue != null) {
  642. //到此处是该小时一个设备的用水量已加上
  643. if (value == null) {
  644. value = 0.00;
  645. }
  646. value += Math.abs(lastValue - firstValue);
  647. }
  648. }
  649. }
  650. }
  651. }
  652. recordAllEntity.put("value", value);
  653. List<Map<String, Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllList(1, 0,
  654. " WHERE org_name = '" + recordAllEntity.get("org_name")
  655. + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
  656. if (CollectionUtils.isEmpty(queryWaterRecord)) {
  657. //TODO 说明不存在,进行插入
  658. if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
  659. int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAll(" (" +
  660. "'" + recordAllEntity.get("org_name") + "'," +
  661. "'" + recordAllEntity.get("time") + "'," +
  662. "'" + recordAllEntity.get("value") + "'," +
  663. "'" + recordAllEntity.get("value_tag") + "'," +
  664. "'" + recordAllEntity.get("collcation_tag_array") + "'" +
  665. ") ");
  666. if (insertCode < 0) {
  667. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  668. , mStrClassName
  669. , mStrClassName
  670. , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
  671. FastJsonUtil.toJSON(recordAllEntity)));
  672. }
  673. }
  674. }else{
  675. //TODO 说明存在,进行修改
  676. if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
  677. int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAll(String.valueOf(value)," WHERE " +
  678. "(" +
  679. " org_name = '" + recordAllEntity.get("org_name") + "' and" +
  680. " \"time\" = '" + recordAllEntity.get("time") + "' and" +
  681. " value_tag = '" + recordAllEntity.get("value_tag") + "'" +
  682. ") ");
  683. if (updateCode < 0) {
  684. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  685. , mStrClassName
  686. , mStrClassName
  687. , String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ",
  688. FastJsonUtil.toJSON(recordAllEntity)));
  689. }
  690. }
  691. }
  692. }
  693. latch.countDown();
  694. }
  695. });
  696. }catch(Exception ex){}
  697. }
  698. try{latch.await();}catch(Exception ex){}
  699. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制-计算小时用水量结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  700. }
  701. }
  702. //TODO 原始历史数据 每天整点15分检查并写入,检查范围为当前时间到昨天
  703. public static void checkRecordData(){
  704. //TODO ① 首先查询水厂设备配置信息
  705. try {
  706. List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList("WHERE 1=1 AND org_name not like '%营业所' AND org_name!='北碚水厂' AND org_name!='渝中区水厂'");
  707. if(!CollectionUtils.isEmpty(configList)){
  708. //声明总数据的数据数组
  709. List<Map<String,Object>> newRecordAll = Collections.synchronizedList(new LinkedList<Map<String,Object>>());
  710. //TODO ②开启多线程并发处理各水厂设备的数据
  711. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制--开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  712. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制--进行中:......");
  713. final CountDownLatch latch = new CountDownLatch(configList.size());
  714. for(Map<String,Object> item:configList){
  715. try{
  716. ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
  717. @Override
  718. public void function() {
  719. //TODO 根据当前配置信息item 查询远通数据中的历史数据
  720. //TODO 首先查询当前水厂设备的昨天0点之后到当前时间最新数据的数据
  721. LocalDateTime today = LocalDateTime.now();
  722. LocalDateTime yesterday = today.minusDays(1).withHour(0).withMinute(0).withSecond(0).withNano(0);
  723. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(" WHERE TAG_CODE = '"+item.get("collcation_tag")+"' " +
  724. " and QCQUISITION_TIME >= to_date('"+yesterday.format(formater)+"', 'yyyy-mm-dd hh24:mi:ss')");
  725. if(itemCount!=null&&itemCount>0) {
  726. //TODO 优化 以分页方式查询所有,初始分页行数定为2000查询速率较好
  727. int pageNum = itemCount%2000==0?itemCount/2000:(itemCount/2000)+1;//总页数
  728. Integer limit = 2000;
  729. if(pageNum<=1){
  730. limit = itemCount;//说明总数比第一页小
  731. }
  732. for (int i = 0; i < pageNum; i++) {
  733. Integer offset = i*limit;
  734. //tapWaterHistoryList 为远通水量数据源
  735. //TODO 因为是orcale数据库 分页方式以伪列rownum为分页条件,因此limit实际上永远比offset大2000,因此只要不是第一页,limit=offset+2000
  736. Integer limitNew = 0;
  737. if(i>0){
  738. limitNew = offset+2000;
  739. }else{
  740. limitNew = 2000;
  741. }
  742. List<Map<String,Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(limitNew,offset," " +
  743. "AND TAG_CODE = '"+item.get("collcation_tag")+"' " +
  744. "and QCQUISITION_TIME >= to_date('"+yesterday.format(formater)+"', 'yyyy-mm-dd hh24:mi:ss')");
  745. if(!CollectionUtils.isEmpty(tapWaterHistoryList)){
  746. //TODO 循环远通水量数据列表,查询数据不存在的话就插入
  747. for (int j = 0; j < tapWaterHistoryList.size(); j++) {
  748. List<Map<String,Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordList(1,0,
  749. " WHERE collcation_tag = '"+tapWaterHistoryList.get(j).get("TAG_CODE")
  750. +"' AND org_name = '"+tapWaterHistoryList.get(j).get("NAME")+"' AND time = '"+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+"'");
  751. if(CollectionUtils.isEmpty(queryWaterRecord)){
  752. //TODO 说明没插入过本系统,执行插入
  753. String extend = " ('"+tapWaterHistoryList.get(j).get("TAG_CODE")+"','"+tapWaterHistoryList.get(j).get("NAME")+"',"
  754. + ""+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+","
  755. + "'"+tapWaterHistoryList.get(j).get("VAL")+"',"
  756. + TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("UPDATE_TIME").toString())
  757. +") ";
  758. int insertCode = getWaterTapWaterApi().insertWaterCollectionRecord(extend);
  759. if(insertCode<=0){
  760. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  761. , mStrClassName
  762. , mStrClassName
  763. , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
  764. extend));
  765. }
  766. }
  767. }
  768. }
  769. }
  770. }
  771. latch.countDown();
  772. }
  773. });
  774. }catch(Exception ex){
  775. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  776. , mStrClassName
  777. , mStrClassName
  778. , String.format("Batch initTapWaterDataThread 水厂:{%s},标识:{%s} ERROR:{%s} ",
  779. item.get("org_name"),
  780. item.get("collcation_tag")
  781. , ex.getLocalizedMessage()));
  782. }
  783. }
  784. latch.await();
  785. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"检查机制--结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  786. }
  787. }catch(Exception ex){
  788. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  789. , mStrClassName
  790. , mStrClassName
  791. , String.format("Batch initTapWaterData ERROR:{%s} "
  792. , ex.getLocalizedMessage()));
  793. }
  794. }
  795. //TODO 初始化添加对比远通数据
  796. public static void initTapWaterData(){
  797. //TODO ① 首先查询水厂设备配置信息
  798. try {
  799. List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList("WHERE 1=1 AND org_name not like '%营业所' AND org_name!='北碚水厂' AND org_name!='渝中区水厂'");
  800. if(!CollectionUtils.isEmpty(configList)){
  801. //声明总数据的数据数组
  802. List<Map<String,Object>> newRecordAll = Collections.synchronizedList(new LinkedList<Map<String,Object>>());
  803. //TODO ②开启多线程并发处理各水厂设备的数据
  804. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"比对远通原始数据开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  805. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"进行中:......");
  806. final CountDownLatch latch = new CountDownLatch(configList.size());
  807. LocalDateTime startTimeLocal = LocalDateTime.now().minusDays(1).minusHours(1);
  808. for(Map<String,Object> item:configList){
  809. try{
  810. ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
  811. @Override
  812. public void function() {
  813. //TODO 根据当前配置信息item 查询远通数据中的历史数据
  814. //TODO 首先查询当前水厂设备的2025-01-01之后到最新数据的数据总数,然后分页形式获取数据
  815. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(" WHERE TAG_CODE = '"+item.get("collcation_tag")+"' and QCQUISITION_TIME >= to_date('"+startTimeLocal.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"', 'yyyy-mm-dd hh24:mi:ss')");
  816. if(itemCount!=null&&itemCount>0) {
  817. //TODO 优化 以分页方式查询所有,初始分页行数定为2000查询速率较好
  818. int pageNum = itemCount%2000==0?itemCount/2000:(itemCount/2000)+1;//总页数
  819. Integer limit = 2000;
  820. if(pageNum<=1){
  821. limit = itemCount;//说明总数比第一页小
  822. }
  823. for (int i = 0; i < pageNum; i++) {
  824. Integer offset = i*limit;
  825. //tapWaterHistoryList 为远通水量数据源
  826. //TODO 因为是orcale数据库 分页方式以伪列rownum为分页条件,因此limit实际上永远比offset大2000,因此只要不是第一页,limit=offset+2000
  827. Integer limitNew = 0;
  828. if(i>0){
  829. limitNew = offset+2000;
  830. }else{
  831. limitNew = 2000;
  832. }
  833. List<Map<String,Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(limitNew,offset," AND TAG_CODE = '"+item.get("collcation_tag")+"' and QCQUISITION_TIME >= to_date('"+startTimeLocal.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"', 'yyyy-mm-dd hh24:mi:ss')");
  834. if(!CollectionUtils.isEmpty(tapWaterHistoryList)){
  835. //TODO 循环远通水量数据列表,查询数据不存在的话就插入
  836. for (int j = 0; j < tapWaterHistoryList.size(); j++) {
  837. List<Map<String,Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordList(1,0,
  838. " WHERE collcation_tag = '"+tapWaterHistoryList.get(j).get("TAG_CODE")
  839. +"' AND org_name = '"+tapWaterHistoryList.get(j).get("NAME")+"' AND time = '"+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+"'");
  840. if(CollectionUtils.isEmpty(queryWaterRecord)){
  841. //TODO 说明没插入过本系统,执行插入
  842. String extend = " ('"+tapWaterHistoryList.get(j).get("TAG_CODE")+"','"+tapWaterHistoryList.get(j).get("NAME")+"',"
  843. + ""+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+","
  844. + "'"+tapWaterHistoryList.get(j).get("VAL")+"',"
  845. + TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("UPDATE_TIME").toString())
  846. +") ";
  847. int insertCode = getWaterTapWaterApi().insertWaterCollectionRecord(extend);
  848. if(insertCode<=0){
  849. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  850. , mStrClassName
  851. , mStrClassName
  852. , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
  853. extend));
  854. }
  855. }
  856. }
  857. }
  858. }
  859. }
  860. latch.countDown();
  861. }
  862. });
  863. }catch(Exception ex){
  864. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  865. , mStrClassName
  866. , mStrClassName
  867. , String.format("Batch initTapWaterDataThread 水厂:{%s},标识:{%s} ERROR:{%s} ",
  868. item.get("org_name"),
  869. item.get("collcation_tag")
  870. , ex.getLocalizedMessage()));
  871. }
  872. }
  873. latch.await();
  874. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"比对远通原始数据结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  875. }
  876. }catch(Exception ex){
  877. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  878. , mStrClassName
  879. , mStrClassName
  880. , String.format("Batch initTapWaterData ERROR:{%s} "
  881. , ex.getLocalizedMessage()));
  882. }
  883. }
  884. //TODO 初始化添加计算水厂所有设备每日的每小时用水量计算
  885. public static void initWaterCollecationReacordAll(){
  886. //TODO ① 首先查询水厂设备配置信息
  887. try {
  888. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"计算小时用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  889. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"计算小时用水量进行中:......");
  890. List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList("WHERE 1=1 AND org_name not like '%营业所' AND org_name!='北碚水厂' AND org_name!='渝中区水厂'");
  891. if (!CollectionUtils.isEmpty(configList)) {
  892. //TODO 按照组织机构分组
  893. Map<Object, List<Map<String, Object>>> groupedData =
  894. configList.stream().collect(Collectors.groupingBy(item -> item.get("org_name")));
  895. final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
  896. //TODO 外层循环组织机构
  897. for (Object key:groupedData.keySet()){
  898. try{
  899. ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
  900. @Override
  901. public void function() {
  902. //TODO 根据当前配置信息item 查询远通数据中的历史数据
  903. //TODO 首先查询当前水厂设备的从2025-01-01之后到得到数据
  904. // LocalDateTime startDateTime = LocalDateTime.now().withMinute(0).withSecond(0)
  905. // .minusDays(1).minusHours(1);
  906. LocalDateTime startDateTime = LocalDateTime.now().withHour(0).withMinute(0).withSecond(0)
  907. .minusDays(3);
  908. //TODO 需计算的循环天数
  909. Long days = 0L;
  910. days = Duration.between(startDateTime, LocalDateTime.now()).toDays();
  911. //TODO 此循环天数每一天所查的是所有设备每小时数据合
  912. final CountDownLatch latch2 = new CountDownLatch(days.intValue());
  913. for(Long k = 0L;k<days;k++) {
  914. LocalDateTime newStartDateTime = startDateTime.minusDays(-k.intValue());
  915. String startDate = newStartDateTime.format(formater);
  916. String endDate = newStartDateTime.withMinute(1).minusDays(-1).format(formater);
  917. try {
  918. ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
  919. @Override
  920. public void function() {
  921. List<Map<String, Object>> deviceList = groupedData.get(key);
  922. //TODO 循环获取该天该水厂每个设备数据
  923. //TODO 查询当前天日期内每15分钟的设备数据
  924. for (int i = 0; i < 96; i++) {
  925. String startTime = newStartDateTime.minusHours(-i).format(formater);
  926. String endTime = newStartDateTime.minusMinutes(-1).minusHours(-(i+1)).format(formater);//查询时间加一分钟
  927. Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
  928. recordAllEntity.put("org_name", key.toString());//水厂
  929. recordAllEntity.put("time", newStartDateTime.minusHours(-(i+1)).format(formater));//采集时间(小时的最后时间)
  930. recordAllEntity.put("value", null);
  931. recordAllEntity.put("value_tag", "water");
  932. recordAllEntity.put("collcation_tag_array", FastJsonUtil.toJSON(deviceList));
  933. Double value = null;
  934. //TODO 此循环计算该小时所有设备的用水量
  935. for (Map<String, Object> item : deviceList) {
  936. //TODO 和尚山水厂的三个OPC的设备0点到1点的数据特殊处理,直接取最后一条数据作为0点到1点的用水量
  937. // 定义字符串日期时间的格式
  938. DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  939. // 解析字符串以创建 LocalDateTime 实例
  940. LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
  941. if (dateTime.getHour()==1&&("OPC_SC.HSS.HSS1.AI.LJLL1.F_CV".equals(item.get("collcation_tag").toString()) ||
  942. "OPC_SC.HSS.HSS1.AI.LJLL2.F_CV".equals(item.get("collcation_tag").toString()) ||
  943. "OPC_SC.HSS.HSS1.AI.LJLL3.F_CV".equals(item.get("collcation_tag").toString()))) {
  944. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  945. " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
  946. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  947. " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
  948. if(itemCount!=null&&itemCount>0) {
  949. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  950. " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
  951. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  952. " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  953. " order by QCQUISITION_TIME");
  954. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
  955. if (value == null) {
  956. value = 0.00;
  957. }
  958. value += tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
  959. }
  960. }
  961. }else {
  962. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  963. " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
  964. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  965. " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
  966. if (itemCount != null && itemCount > 0) {
  967. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  968. " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
  969. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  970. " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  971. " order by QCQUISITION_TIME");
  972. //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量
  973. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
  974. Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null;
  975. Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
  976. if (firstValue != null && lastValue != null) {
  977. //到此处是该小时一个设备的用水量已加上
  978. if (value == null) {
  979. value = 0.00;
  980. }
  981. value += Math.abs(lastValue - firstValue);
  982. }
  983. }
  984. }
  985. }
  986. }
  987. recordAllEntity.put("value", value);
  988. List<Map<String, Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllList(1, 0,
  989. " WHERE org_name = '" + recordAllEntity.get("org_name")
  990. + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
  991. if (CollectionUtils.isEmpty(queryWaterRecord)) {
  992. //TODO 说明不存在,进行插入
  993. if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
  994. int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAll(" (" +
  995. "'" + recordAllEntity.get("org_name") + "'," +
  996. "'" + recordAllEntity.get("time") + "'," +
  997. "'" + recordAllEntity.get("value") + "'," +
  998. "'" + recordAllEntity.get("value_tag") + "'," +
  999. "'" + recordAllEntity.get("collcation_tag_array") + "'" +
  1000. ") ");
  1001. if (insertCode < 0) {
  1002. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  1003. , mStrClassName
  1004. , mStrClassName
  1005. , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
  1006. FastJsonUtil.toJSON(recordAllEntity)));
  1007. }
  1008. }
  1009. }else{
  1010. //TODO 说明存在,进行修改
  1011. if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
  1012. int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAll(String.valueOf(value)," WHERE " +
  1013. "(" +
  1014. " org_name = '" + recordAllEntity.get("org_name") + "' and" +
  1015. " \"time\" = '" + recordAllEntity.get("time") + "' and" +
  1016. " value_tag = '" + recordAllEntity.get("value_tag") + "'" +
  1017. ") ");
  1018. if (updateCode < 0) {
  1019. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  1020. , mStrClassName
  1021. , mStrClassName
  1022. , String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ",
  1023. FastJsonUtil.toJSON(recordAllEntity)));
  1024. }
  1025. }
  1026. }
  1027. }
  1028. latch2.countDown();
  1029. }
  1030. });
  1031. }catch(Exception ex){
  1032. }
  1033. }
  1034. try {
  1035. latch2.await();
  1036. }catch(Exception ex){
  1037. }
  1038. latch.countDown();
  1039. }
  1040. });
  1041. }catch(Exception ex){
  1042. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  1043. , mStrClassName
  1044. , mStrClassName
  1045. , String.format("Batch" +
  1046. " initWaterReacordAll ERROR:{%s} "
  1047. , ex.getLocalizedMessage()));
  1048. }
  1049. }
  1050. latch.await();
  1051. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"计算小时用水量结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  1052. }
  1053. }catch(Exception ex){
  1054. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  1055. , mStrClassName
  1056. , mStrClassName
  1057. , String.format("Batch initWaterCollecationReacordAll ERROR:{%s} "
  1058. , ex.getLocalizedMessage()));
  1059. }
  1060. }
  1061. //TODO 初始化添加计算泵每日的小时数据
  1062. public static void initWaterPumpReacordAll(String startFindTime){
  1063. //TODO ① 首先查询水厂设备配置信息
  1064. try {
  1065. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"添加小时泵数据开始时间:"+ TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  1066. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"添加小时泵数据起始时间:("+startFindTime+")进行中:......");
  1067. List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterPumpCollectionConfigList(null);
  1068. if (!CollectionUtils.isEmpty(configList)) {
  1069. //TODO 按照设备分组
  1070. Map<Object, List<Map<String, Object>>> groupedData =
  1071. configList.stream().collect(Collectors.groupingBy(item -> item.get("device_code")));
  1072. final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
  1073. //TODO 外层循环组织机构
  1074. for (Object key:groupedData.keySet()){
  1075. try{
  1076. new Thread(() -> {
  1077. //TODO 根据当前配置信息item 查询远通数据中的历史数据
  1078. //TODO 首先查询当前水厂设备的从startFindTime之后到得到数据
  1079. LocalDateTime startDateTime = LocalDateTime.parse(startFindTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  1080. //TODO 需计算的循环天数
  1081. Long days = 0L;
  1082. days = Duration.between(startDateTime, LocalDateTime.now()).toDays();
  1083. days+=1;
  1084. //TODO 此循环天数每一天所查的是所有设备每小时数据合
  1085. final CountDownLatch latch2 = new CountDownLatch(days.intValue());
  1086. for(Long k = 0L;k<days;k++) {
  1087. LocalDateTime newStartDateTime = startDateTime.minusDays(-k.intValue());
  1088. String startDate = newStartDateTime.format(formater);
  1089. String endDate = newStartDateTime.withMinute(1).minusDays(-1).format(formater);
  1090. try {
  1091. new Thread(() -> {
  1092. List<Map<String, Object>> deviceList = groupedData.get(key);
  1093. //TODO 循环获取该天该水厂每个设备数据
  1094. //TODO 查询当前天日期内每小时的设备数据
  1095. for (int i = 0; i < 24; i++) {
  1096. String startTime = newStartDateTime.withHour(i).format(formater);
  1097. //TODO 需要加个05分把endTime的整点数据查出来
  1098. String endTime = newStartDateTime.withHour(i).minusHours(-1).withMinute(5).withSecond(0).format(formater);
  1099. //TODO 此循环计算该小时所有设备的泵数据
  1100. for (Map<String, Object> item : deviceList) {
  1101. // 定义字符串日期时间的格式
  1102. DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  1103. // 解析字符串以创建 LocalDateTime 实例
  1104. LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
  1105. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  1106. " WHERE 1=1 AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
  1107. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  1108. " and QCQUISITION_TIME <= to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  1109. " AND ABS(" +
  1110. " (QCQUISITION_TIME - TRUNC(QCQUISITION_TIME, 'HH')) * 24 * 60 " +
  1111. " ) <= 5");//查询 -- 采集时间接近整点,误差在5分钟内
  1112. if (itemCount != null && itemCount > 0) {
  1113. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  1114. " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
  1115. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  1116. " and QCQUISITION_TIME <= to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  1117. " AND ABS( " +
  1118. " (QCQUISITION_TIME - TRUNC(QCQUISITION_TIME, 'HH')) * 24 * 60 " +
  1119. " ) <= 5 " +
  1120. " order by QCQUISITION_TIME");
  1121. //TODO 这里需要注意泵数据的采集配置表设备号可能有多个,因此,要判断对应的standard_code做对应处理
  1122. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() >= 1) {
  1123. //TODO 数据库操作
  1124. for (Map<String,Object> mapEntity:tapWaterHistoryList) {
  1125. Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
  1126. recordAllEntity.put("device_code", groupedData.get(key).get(0).get("device_code"));
  1127. recordAllEntity.put("time", LocalDateTime.parse(mapEntity.get("QCQUISITION_TIME").toString(), formatter)
  1128. .withMinute(0).withSecond(0).format(formatter));//采集时间(小时的最后时间)
  1129. if(item.get("standard_code").equals("active_energy")){
  1130. recordAllEntity.put("active_energy",mapEntity.get("VAL"));
  1131. }
  1132. if(item.get("standard_code").equals("startup_state")){
  1133. recordAllEntity.put("startup_state",mapEntity.get("VAL"));
  1134. }
  1135. if(item.get("standard_code").equals("phase_a_current")){
  1136. if(ObjectUtils.isEmpty(mapEntity.get("VAL"))||"0".equals(mapEntity.get("VAL").toString())) {
  1137. recordAllEntity.put("startup_state", 0);
  1138. }else{
  1139. recordAllEntity.put("startup_state", 1);
  1140. }
  1141. }
  1142. //TODO 直接调用插入或者新增方法
  1143. int insertCode = getWaterTapWaterApi().insertOrUpdateWaterPumpRecordAll(recordAllEntity);
  1144. if (insertCode < 0) {
  1145. System.out.print(String.format("Batch insertOrUpdateWaterPumpRecordAll 未成功:{%s} ",
  1146. JSONObject.toJSON(recordAllEntity)));
  1147. }
  1148. }
  1149. }
  1150. }
  1151. }
  1152. }
  1153. latch2.countDown();
  1154. }).start();
  1155. }catch(Exception ex){
  1156. }
  1157. }
  1158. try {
  1159. latch2.await();
  1160. }catch(Exception ex){
  1161. }
  1162. latch.countDown();
  1163. }).start();
  1164. }catch(Exception ex){
  1165. System.out.print(String.format("Batch" +
  1166. " initWaterReacordAll ERROR:{%s} "
  1167. , ex.getLocalizedMessage()));
  1168. }
  1169. }
  1170. latch.await();
  1171. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO,mStrClassName, mStrClassName,"添加小时泵数据检查机制("+startFindTime+")结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  1172. }
  1173. }catch(Exception ex){
  1174. System.out.print(String.format("Batch initWaterCollecationReacordAll ERROR:{%s} "
  1175. , ex.getLocalizedMessage()));
  1176. }
  1177. }
  1178. }