KprAimTapWaterBizFun.java 54 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684
  1. package com.shkpr.service.aimodelpower.bizmgr;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.global.base.log.LogLevelFlag;
  4. import com.global.base.log.LogPrintMgr;
  5. import com.global.base.tools.FastJsonUtil;
  6. import com.shkpr.service.aimodelpower.commtools.TimeTool;
  7. import com.shkpr.service.aimodelpower.dbdao.DBMgrProxy;
  8. import com.shkpr.service.aimodelpower.dbdao.shizilaishuiDataSource.service.intef.WaterTapWaterService;
  9. import com.shkpr.service.aimodelpower.dto.TraceRunnable;
  10. import com.shkpr.service.aimodelpower.globalmgr.ThreadTaskMgr;
  11. import com.shkpr.service.aimodelpower.globalmgr.TraceLogMgr;
  12. import org.springframework.util.CollectionUtils;
  13. import org.springframework.util.ObjectUtils;
  14. import java.time.Duration;
  15. import java.time.LocalDateTime;
  16. import java.time.format.DateTimeFormatter;
  17. import java.util.*;
  18. import java.util.concurrent.CountDownLatch;
  19. import java.util.stream.Collectors;
  20. /**
  21. * @ClassName KprAimTapWaterBizFun
  22. * @Description: TODO
  23. * @Author LX
  24. * @Date 2024/5/22
  25. * @Version V1.0
  26. **/
  27. public class KprAimTapWaterBizFun {
  28. private static final String MSG_SUCCESS = "success.";
  29. private static final String MSG_FAILED = "failed.";
  30. private static final String mStrClassName = "KprAimTapWaterBizFun";
  31. private static final String EMPTY_NULL = "NULL";
  32. public static WaterTapWaterService getWaterTapWaterApi(){
  33. return DBMgrProxy.getInstance().applyWaterTapWaterService();
  34. }
  35. static DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  36. //TODO 小时用水量 每天整点10分检查并写入,检查范围为当前时间到昨天23点
  37. public static void checkRecordAllData(){
  38. //TODO 检查小时用水量
  39. DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  40. System.out.println("检查机制-计算小时用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  41. System.out.println("检查机制-计算小时用水量进行中:......");
  42. List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList(null);
  43. if (!CollectionUtils.isEmpty(configList)) {
  44. //TODO 按照组织机构分组
  45. Map<Object, List<Map<String, Object>>> groupedData =
  46. configList.stream().collect(Collectors.groupingBy(item -> item.get("org_name")));
  47. final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
  48. for (Object key:groupedData.keySet()){
  49. try {
  50. ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
  51. @Override
  52. public void function() {
  53. //TODO 根据当前配置信息item 查询远通数据中的历史数据
  54. //TODO 首先查询当前水厂设备的从昨天之后到得到数据的数据
  55. LocalDateTime today = LocalDateTime.now();
  56. LocalDateTime startDateTime = today.withHour(0).withMinute(0).withSecond(0).withNano(0).minusHours(1);
  57. //TODO 需计算的循环天数
  58. //TODO 此循环天数每一天所查的是所有设备每小时数据合
  59. LocalDateTime newStartDateTime = startDateTime;
  60. String startDate = newStartDateTime.format(formater);
  61. String endDate = today.withMinute(0).withSecond(0).format(formater);
  62. List<Map<String, Object>> deviceList = groupedData.get(key);
  63. //TODO 循环获取该天该水厂每个设备数据
  64. //TODO 查询当前天日期内每小时的设备数据
  65. for (int i = 0; i < Integer.valueOf(String.valueOf(Duration.between(startDateTime, today.withMinute(0).withSecond(0)).toHours())); i++) {
  66. String startTime = newStartDateTime.minusHours(-i).format(formater);
  67. String endTime = newStartDateTime.minusHours(-(i + 1)).format(formater);
  68. Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
  69. recordAllEntity.put("org_name", key.toString());//水厂
  70. recordAllEntity.put("time", endTime);//采集时间(小时的最后时间)
  71. recordAllEntity.put("value", null);
  72. recordAllEntity.put("value_tag", "water");
  73. recordAllEntity.put("collcation_tag_array", FastJsonUtil.toJSON(deviceList));
  74. Double value = null;
  75. //TODO 此循环计算该小时所有设备的用水量
  76. for (Map<String, Object> item : deviceList) {
  77. //TODO 和尚山水厂的三个OPC的设备0点到1点的数据特殊处理,直接取最后一条数据作为0点到1点的用水量
  78. // 定义字符串日期时间的格式
  79. DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  80. // 解析字符串以创建 LocalDateTime 实例
  81. LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
  82. if (dateTime.getHour()==1&&("OPC_SC.HSS.HSS1.AI.LJLL1.F_CV".equals(item.get("collcation_tag").toString()) ||
  83. "OPC_SC.HSS.HSS1.AI.LJLL2.F_CV".equals(item.get("collcation_tag").toString()) ||
  84. "OPC_SC.HSS.HSS1.AI.LJLL3.F_CV".equals(item.get("collcation_tag").toString()))) {
  85. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  86. " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
  87. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  88. " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
  89. if(itemCount!=null&&itemCount>0) {
  90. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  91. " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
  92. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  93. " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  94. " order by QCQUISITION_TIME");
  95. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
  96. if (value == null) {
  97. value = 0.00;
  98. }
  99. value += tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
  100. }
  101. }
  102. }else {
  103. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  104. " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
  105. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  106. " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
  107. if (itemCount != null && itemCount > 0) {
  108. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  109. " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
  110. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  111. " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  112. " order by QCQUISITION_TIME");
  113. //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量
  114. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
  115. Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null;
  116. Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
  117. if (firstValue != null && lastValue != null) {
  118. //到此处是该小时一个设备的用水量已加上
  119. if (value == null) {
  120. value = 0.00;
  121. }
  122. value += Math.abs(lastValue - firstValue);
  123. }
  124. }
  125. }
  126. }
  127. }
  128. recordAllEntity.put("value", value);
  129. List<Map<String, Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllList(1, 0,
  130. " WHERE org_name = '" + recordAllEntity.get("org_name")
  131. + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
  132. if (CollectionUtils.isEmpty(queryWaterRecord)) {
  133. //TODO 说明不存在,进行插入
  134. if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
  135. int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAll(" (" +
  136. "'" + recordAllEntity.get("org_name") + "'," +
  137. "'" + recordAllEntity.get("time") + "'," +
  138. "'" + recordAllEntity.get("value") + "'," +
  139. "'" + recordAllEntity.get("value_tag") + "'," +
  140. "'" + recordAllEntity.get("collcation_tag_array") + "'" +
  141. ") ");
  142. if (insertCode < 0) {
  143. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  144. , mStrClassName
  145. , mStrClassName
  146. , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
  147. FastJsonUtil.toJSON(recordAllEntity)));
  148. }
  149. }
  150. }else{
  151. //TODO 说明存在,进行修改
  152. if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
  153. int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAll(String.valueOf(value)," WHERE " +
  154. "(" +
  155. " org_name = '" + recordAllEntity.get("org_name") + "' and" +
  156. " \"time\" = '" + recordAllEntity.get("time") + "' and" +
  157. " value_tag = '" + recordAllEntity.get("value_tag") + "'" +
  158. ") ");
  159. if (updateCode < 0) {
  160. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  161. , mStrClassName
  162. , mStrClassName
  163. , String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ",
  164. FastJsonUtil.toJSON(recordAllEntity)));
  165. }
  166. }
  167. }
  168. }
  169. latch.countDown();
  170. }
  171. });
  172. }catch(Exception ex){}
  173. }
  174. try{latch.await();}catch(Exception ex){}
  175. System.out.println("检查机制-计算小时用水量结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  176. }
  177. }
  178. //TODO 原始历史数据 每天整点15分检查并写入,检查范围为当前时间到昨天
  179. public static void checkRecordData(){
  180. //TODO ① 首先查询水厂设备配置信息
  181. try {
  182. List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList(null);
  183. if(!CollectionUtils.isEmpty(configList)){
  184. //声明总数据的数据数组
  185. List<Map<String,Object>> newRecordAll = Collections.synchronizedList(new LinkedList<Map<String,Object>>());
  186. //TODO ②开启多线程并发处理各水厂设备的数据
  187. System.out.println("检查机制--开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  188. System.out.println("检查机制--进行中:......");
  189. final CountDownLatch latch = new CountDownLatch(configList.size());
  190. for(Map<String,Object> item:configList){
  191. try{
  192. ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
  193. @Override
  194. public void function() {
  195. //TODO 根据当前配置信息item 查询远通数据中的历史数据
  196. //TODO 首先查询当前水厂设备的昨天0点之后到当前时间最新数据的数据
  197. LocalDateTime today = LocalDateTime.now();
  198. LocalDateTime yesterday = today.minusDays(1).withHour(0).withMinute(0).withSecond(0).withNano(0);
  199. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(" WHERE TAG_CODE = '"+item.get("collcation_tag")+"' " +
  200. " and QCQUISITION_TIME >= to_date('"+yesterday.format(formater)+"', 'yyyy-mm-dd hh24:mi:ss')");
  201. if(itemCount!=null&&itemCount>0) {
  202. //TODO 优化 以分页方式查询所有,初始分页行数定为2000查询速率较好
  203. int pageNum = itemCount%2000==0?itemCount/2000:(itemCount/2000)+1;//总页数
  204. Integer limit = 2000;
  205. if(pageNum<=1){
  206. limit = itemCount;//说明总数比第一页小
  207. }
  208. for (int i = 0; i < pageNum; i++) {
  209. Integer offset = i*limit;
  210. //tapWaterHistoryList 为远通水量数据源
  211. //TODO 因为是orcale数据库 分页方式以伪列rownum为分页条件,因此limit实际上永远比offset大2000,因此只要不是第一页,limit=offset+2000
  212. Integer limitNew = 0;
  213. if(i>0){
  214. limitNew = offset+2000;
  215. }else{
  216. limitNew = 2000;
  217. }
  218. List<Map<String,Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(limitNew,offset," " +
  219. "AND TAG_CODE = '"+item.get("collcation_tag")+"' " +
  220. "and QCQUISITION_TIME >= to_date('"+yesterday.format(formater)+"', 'yyyy-mm-dd hh24:mi:ss')");
  221. if(!CollectionUtils.isEmpty(tapWaterHistoryList)){
  222. //TODO 循环远通水量数据列表,查询数据不存在的话就插入
  223. for (int j = 0; j < tapWaterHistoryList.size(); j++) {
  224. List<Map<String,Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordList(1,0,
  225. " WHERE collcation_tag = '"+tapWaterHistoryList.get(j).get("TAG_CODE")
  226. +"' AND org_name = '"+tapWaterHistoryList.get(j).get("NAME")+"' AND time = '"+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+"'");
  227. if(CollectionUtils.isEmpty(queryWaterRecord)){
  228. //TODO 说明没插入过本系统,执行插入
  229. String extend = " ('"+tapWaterHistoryList.get(j).get("TAG_CODE")+"','"+tapWaterHistoryList.get(j).get("NAME")+"',"
  230. + ""+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+","
  231. + "'"+tapWaterHistoryList.get(j).get("VAL")+"',"
  232. + TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("UPDATE_TIME").toString())
  233. +") ";
  234. int insertCode = getWaterTapWaterApi().insertWaterCollectionRecord(extend);
  235. if(insertCode<=0){
  236. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  237. , mStrClassName
  238. , mStrClassName
  239. , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
  240. extend));
  241. }
  242. }
  243. }
  244. }
  245. }
  246. }
  247. latch.countDown();
  248. }
  249. });
  250. }catch(Exception ex){
  251. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  252. , mStrClassName
  253. , mStrClassName
  254. , String.format("Batch initTapWaterDataThread 水厂:{%s},标识:{%s} ERROR:{%s} ",
  255. item.get("org_name"),
  256. item.get("collcation_tag")
  257. , ex.getLocalizedMessage()));
  258. }
  259. }
  260. latch.await();
  261. System.out.println("检查机制--结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  262. }
  263. }catch(Exception ex){
  264. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  265. , mStrClassName
  266. , mStrClassName
  267. , String.format("Batch initTapWaterData ERROR:{%s} "
  268. , ex.getLocalizedMessage()));
  269. }
  270. }
  271. //TODO 初始化添加对比远通数据
  272. public static void initTapWaterData(){
  273. //TODO ① 首先查询水厂设备配置信息
  274. try {
  275. List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList(null);
  276. if(!CollectionUtils.isEmpty(configList)){
  277. //声明总数据的数据数组
  278. List<Map<String,Object>> newRecordAll = Collections.synchronizedList(new LinkedList<Map<String,Object>>());
  279. //TODO ②开启多线程并发处理各水厂设备的数据
  280. System.out.println("比对远通原始数据开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  281. System.out.println("进行中:......");
  282. final CountDownLatch latch = new CountDownLatch(configList.size());
  283. for(Map<String,Object> item:configList){
  284. try{
  285. ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
  286. @Override
  287. public void function() {
  288. //TODO 根据当前配置信息item 查询远通数据中的历史数据
  289. //TODO 首先查询当前水厂设备的2025-01-01之后到最新数据的数据总数,然后分页形式获取数据
  290. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(" WHERE TAG_CODE = '"+item.get("collcation_tag")+"' and QCQUISITION_TIME >= to_date('2025-01-01 00:00:00', 'yyyy-mm-dd hh24:mi:ss')");
  291. if(itemCount!=null&&itemCount>0) {
  292. //TODO 优化 以分页方式查询所有,初始分页行数定为2000查询速率较好
  293. int pageNum = itemCount%2000==0?itemCount/2000:(itemCount/2000)+1;//总页数
  294. Integer limit = 2000;
  295. if(pageNum<=1){
  296. limit = itemCount;//说明总数比第一页小
  297. }
  298. for (int i = 0; i < pageNum; i++) {
  299. Integer offset = i*limit;
  300. //tapWaterHistoryList 为远通水量数据源
  301. //TODO 因为是orcale数据库 分页方式以伪列rownum为分页条件,因此limit实际上永远比offset大2000,因此只要不是第一页,limit=offset+2000
  302. Integer limitNew = 0;
  303. if(i>0){
  304. limitNew = offset+2000;
  305. }else{
  306. limitNew = 2000;
  307. }
  308. List<Map<String,Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(limitNew,offset," AND TAG_CODE = '"+item.get("collcation_tag")+"' and QCQUISITION_TIME >= to_date('2025-01-01 00:00:00', 'yyyy-mm-dd hh24:mi:ss')");
  309. if(!CollectionUtils.isEmpty(tapWaterHistoryList)){
  310. //TODO 循环远通水量数据列表,查询数据不存在的话就插入
  311. for (int j = 0; j < tapWaterHistoryList.size(); j++) {
  312. List<Map<String,Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordList(1,0,
  313. " WHERE collcation_tag = '"+tapWaterHistoryList.get(j).get("TAG_CODE")
  314. +"' AND org_name = '"+tapWaterHistoryList.get(j).get("NAME")+"' AND time = '"+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+"'");
  315. if(CollectionUtils.isEmpty(queryWaterRecord)){
  316. //TODO 说明没插入过本系统,执行插入
  317. String extend = " ('"+tapWaterHistoryList.get(j).get("TAG_CODE")+"','"+tapWaterHistoryList.get(j).get("NAME")+"',"
  318. + ""+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+","
  319. + "'"+tapWaterHistoryList.get(j).get("VAL")+"',"
  320. + TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("UPDATE_TIME").toString())
  321. +") ";
  322. int insertCode = getWaterTapWaterApi().insertWaterCollectionRecord(extend);
  323. if(insertCode<=0){
  324. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  325. , mStrClassName
  326. , mStrClassName
  327. , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
  328. extend));
  329. }
  330. }
  331. }
  332. }
  333. }
  334. }
  335. latch.countDown();
  336. }
  337. });
  338. }catch(Exception ex){
  339. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  340. , mStrClassName
  341. , mStrClassName
  342. , String.format("Batch initTapWaterDataThread 水厂:{%s},标识:{%s} ERROR:{%s} ",
  343. item.get("org_name"),
  344. item.get("collcation_tag")
  345. , ex.getLocalizedMessage()));
  346. }
  347. }
  348. latch.await();
  349. System.out.println("比对远通原始数据结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  350. }
  351. }catch(Exception ex){
  352. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  353. , mStrClassName
  354. , mStrClassName
  355. , String.format("Batch initTapWaterData ERROR:{%s} "
  356. , ex.getLocalizedMessage()));
  357. }
  358. }
  359. //TODO 初始化添加计算水厂所有设备每日的每小时用水量计算
  360. public static void initWaterCollecationReacordAll(){
  361. //TODO ① 首先查询水厂设备配置信息
  362. try {
  363. System.out.println("计算小时用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  364. System.out.println("计算小时用水量进行中:......");
  365. List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList(null);
  366. if (!CollectionUtils.isEmpty(configList)) {
  367. //TODO 按照组织机构分组
  368. Map<Object, List<Map<String, Object>>> groupedData =
  369. configList.stream().collect(Collectors.groupingBy(item -> item.get("org_name")));
  370. final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
  371. //TODO 外层循环组织机构
  372. for (Object key:groupedData.keySet()){
  373. try{
  374. ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
  375. @Override
  376. public void function() {
  377. //TODO 根据当前配置信息item 查询远通数据中的历史数据
  378. //TODO 首先查询当前水厂设备的从2025-01-01之后到得到数据
  379. LocalDateTime startDateTime = LocalDateTime.of(2025, 01, 01, 00, 00, 00);
  380. //TODO 需计算的循环天数
  381. Long days = 0L;
  382. days = Duration.between(startDateTime, LocalDateTime.now()).toDays();
  383. //TODO 此循环天数每一天所查的是所有设备每小时数据合
  384. final CountDownLatch latch2 = new CountDownLatch(days.intValue());
  385. for(Long k = 0L;k<days;k++) {
  386. LocalDateTime newStartDateTime = startDateTime.minusDays(-k.intValue());
  387. String startDate = newStartDateTime.format(formater);
  388. String endDate = newStartDateTime.minusDays(-1).format(formater);
  389. try {
  390. ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
  391. @Override
  392. public void function() {
  393. List<Map<String, Object>> deviceList = groupedData.get(key);
  394. //TODO 循环获取该天该水厂每个设备数据
  395. //TODO 查询当前天日期内每小时的设备数据
  396. for (int i = 0; i < 24; i++) {
  397. String startTime = newStartDateTime.withHour(i).format(formater);
  398. String endTime = newStartDateTime.minusHours(-(i + 1)).format(formater);
  399. Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
  400. recordAllEntity.put("org_name", key.toString());//水厂
  401. recordAllEntity.put("time", endTime);//采集时间(小时的最后时间)
  402. recordAllEntity.put("value", null);
  403. recordAllEntity.put("value_tag", "water");
  404. recordAllEntity.put("collcation_tag_array", FastJsonUtil.toJSON(deviceList));
  405. Double value = null;
  406. //TODO 此循环计算该小时所有设备的用水量
  407. for (Map<String, Object> item : deviceList) {
  408. //TODO 和尚山水厂的三个OPC的设备0点到1点的数据特殊处理,直接取最后一条数据作为0点到1点的用水量
  409. // 定义字符串日期时间的格式
  410. DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  411. // 解析字符串以创建 LocalDateTime 实例
  412. LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
  413. if (dateTime.getHour()==1&&("OPC_SC.HSS.HSS1.AI.LJLL1.F_CV".equals(item.get("collcation_tag").toString()) ||
  414. "OPC_SC.HSS.HSS1.AI.LJLL2.F_CV".equals(item.get("collcation_tag").toString()) ||
  415. "OPC_SC.HSS.HSS1.AI.LJLL3.F_CV".equals(item.get("collcation_tag").toString()))) {
  416. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  417. " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
  418. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  419. " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
  420. if(itemCount!=null&&itemCount>0) {
  421. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  422. " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
  423. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  424. " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  425. " order by QCQUISITION_TIME");
  426. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
  427. if (value == null) {
  428. value = 0.00;
  429. }
  430. value += tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
  431. }
  432. }
  433. }else {
  434. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  435. " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
  436. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  437. " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
  438. if (itemCount != null && itemCount > 0) {
  439. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  440. " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
  441. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  442. " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  443. " order by QCQUISITION_TIME");
  444. //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量
  445. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
  446. Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null;
  447. Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
  448. if (firstValue != null && lastValue != null) {
  449. //到此处是该小时一个设备的用水量已加上
  450. if (value == null) {
  451. value = 0.00;
  452. }
  453. value += Math.abs(lastValue - firstValue);
  454. }
  455. }
  456. }
  457. }
  458. }
  459. recordAllEntity.put("value", value);
  460. List<Map<String, Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllList(1, 0,
  461. " WHERE org_name = '" + recordAllEntity.get("org_name")
  462. + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
  463. if (CollectionUtils.isEmpty(queryWaterRecord)) {
  464. //TODO 说明不存在,进行插入
  465. if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
  466. int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAll(" (" +
  467. "'" + recordAllEntity.get("org_name") + "'," +
  468. "'" + recordAllEntity.get("time") + "'," +
  469. "'" + recordAllEntity.get("value") + "'," +
  470. "'" + recordAllEntity.get("value_tag") + "'," +
  471. "'" + recordAllEntity.get("collcation_tag_array") + "'" +
  472. ") ");
  473. if (insertCode < 0) {
  474. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  475. , mStrClassName
  476. , mStrClassName
  477. , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
  478. FastJsonUtil.toJSON(recordAllEntity)));
  479. }
  480. }
  481. }else{
  482. //TODO 说明存在,进行修改
  483. if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
  484. int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAll(String.valueOf(value)," WHERE " +
  485. "(" +
  486. " org_name = '" + recordAllEntity.get("org_name") + "' and" +
  487. " \"time\" = '" + recordAllEntity.get("time") + "' and" +
  488. " value_tag = '" + recordAllEntity.get("value_tag") + "'" +
  489. ") ");
  490. if (updateCode < 0) {
  491. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  492. , mStrClassName
  493. , mStrClassName
  494. , String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ",
  495. FastJsonUtil.toJSON(recordAllEntity)));
  496. }
  497. }
  498. }
  499. }
  500. latch2.countDown();
  501. }
  502. });
  503. }catch(Exception ex){
  504. }
  505. }
  506. try {
  507. latch2.await();
  508. }catch(Exception ex){
  509. }
  510. latch.countDown();
  511. }
  512. });
  513. }catch(Exception ex){
  514. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  515. , mStrClassName
  516. , mStrClassName
  517. , String.format("Batch" +
  518. " initWaterReacordAll ERROR:{%s} "
  519. , ex.getLocalizedMessage()));
  520. }
  521. }
  522. latch.await();
  523. System.out.println("计算小时用水量结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  524. }
  525. }catch(Exception ex){
  526. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
  527. , mStrClassName
  528. , mStrClassName
  529. , String.format("Batch initWaterCollecationReacordAll ERROR:{%s} "
  530. , ex.getLocalizedMessage()));
  531. }
  532. }
  533. //TODO 初始化添加计算泵每日的小时数据
  534. public static void initWaterPumpReacordAll(String startFindTime){
  535. //TODO ① 首先查询水厂设备配置信息
  536. try {
  537. System.out.println("添加小时泵数据开始时间:"+ TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  538. System.out.println("添加小时泵数据起始时间:("+startFindTime+")进行中:......");
  539. List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterPumpCollectionConfigList(null);
  540. if (!CollectionUtils.isEmpty(configList)) {
  541. //TODO 按照设备分组
  542. Map<Object, List<Map<String, Object>>> groupedData =
  543. configList.stream().collect(Collectors.groupingBy(item -> item.get("device_code")));
  544. final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
  545. //TODO 外层循环组织机构
  546. for (Object key:groupedData.keySet()){
  547. try{
  548. new Thread(() -> {
  549. //TODO 根据当前配置信息item 查询远通数据中的历史数据
  550. //TODO 首先查询当前水厂设备的从startFindTime之后到得到数据
  551. LocalDateTime startDateTime = LocalDateTime.parse(startFindTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  552. //TODO 需计算的循环天数
  553. Long days = 0L;
  554. days = Duration.between(startDateTime, LocalDateTime.now()).toDays();
  555. days+=1;
  556. //TODO 此循环天数每一天所查的是所有设备每小时数据合
  557. final CountDownLatch latch2 = new CountDownLatch(days.intValue());
  558. for(Long k = 0L;k<days;k++) {
  559. LocalDateTime newStartDateTime = startDateTime.minusDays(-k.intValue());
  560. String startDate = newStartDateTime.format(formater);
  561. String endDate = newStartDateTime.minusDays(-1).format(formater);
  562. try {
  563. new Thread(() -> {
  564. List<Map<String, Object>> deviceList = groupedData.get(key);
  565. //TODO 循环获取该天该水厂每个设备数据
  566. //TODO 查询当前天日期内每小时的设备数据
  567. for (int i = 0; i < 24; i++) {
  568. String startTime = newStartDateTime.withHour(i).format(formater);
  569. //TODO 需要加个05分把endTime的整点数据查出来
  570. String endTime = newStartDateTime.minusHours(-(i + 1)).withMinute(5).withSecond(0).format(formater);
  571. //TODO 此循环计算该小时所有设备的泵数据
  572. for (Map<String, Object> item : deviceList) {
  573. // 定义字符串日期时间的格式
  574. DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  575. // 解析字符串以创建 LocalDateTime 实例
  576. LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
  577. Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
  578. " WHERE 1=1 AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
  579. " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  580. " and QCQUISITION_TIME <= to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
  581. " AND ABS(" +
  582. " (QCQUISITION_TIME - TRUNC(QCQUISITION_TIME, 'HH')) * 24 * 60 " +
  583. " ) <= 5");//查询 -- 采集时间接近整点,误差在5分钟内
  584. if (itemCount != null && itemCount > 0) {
  585. List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
  586. " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
  587. " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  588. " and QCQUISITION_TIME <= to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
  589. " AND ABS( " +
  590. " (QCQUISITION_TIME - TRUNC(QCQUISITION_TIME, 'HH')) * 24 * 60 " +
  591. " ) <= 5 " +
  592. " order by QCQUISITION_TIME");
  593. //TODO 这里需要注意泵数据的采集配置表设备号可能有多个,因此,要判断对应的standard_code做对应处理
  594. if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() >= 1) {
  595. //TODO 数据库操作
  596. for (Map<String,Object> mapEntity:tapWaterHistoryList) {
  597. Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
  598. recordAllEntity.put("device_code", groupedData.get(key).get(0).get("device_code"));
  599. recordAllEntity.put("time", LocalDateTime.parse(mapEntity.get("QCQUISITION_TIME").toString(), formatter)
  600. .withMinute(0).withSecond(0).format(formatter));//采集时间(小时的最后时间)
  601. if(item.get("standard_code").equals("active_energy")){
  602. recordAllEntity.put("active_energy",mapEntity.get("VAL"));
  603. }
  604. if(item.get("standard_code").equals("startup_state")){
  605. recordAllEntity.put("startup_state",mapEntity.get("VAL"));
  606. }
  607. if(item.get("standard_code").equals("phase_a_current")){
  608. if(ObjectUtils.isEmpty(mapEntity.get("VAL"))||"0".equals(mapEntity.get("VAL").toString())) {
  609. recordAllEntity.put("startup_state", 0);
  610. }else{
  611. recordAllEntity.put("startup_state", 1);
  612. }
  613. }
  614. //TODO 直接调用插入或者新增方法
  615. int insertCode = getWaterTapWaterApi().insertOrUpdateWaterPumpRecordAll(recordAllEntity);
  616. if (insertCode < 0) {
  617. System.out.print(String.format("Batch insertOrUpdateWaterPumpRecordAll 未成功:{%s} ",
  618. JSONObject.toJSON(recordAllEntity)));
  619. }
  620. }
  621. }
  622. }
  623. }
  624. }
  625. latch2.countDown();
  626. }).start();
  627. }catch(Exception ex){
  628. }
  629. }
  630. try {
  631. latch2.await();
  632. }catch(Exception ex){
  633. }
  634. latch.countDown();
  635. }).start();
  636. }catch(Exception ex){
  637. System.out.print(String.format("Batch" +
  638. " initWaterReacordAll ERROR:{%s} "
  639. , ex.getLocalizedMessage()));
  640. }
  641. }
  642. latch.await();
  643. System.out.println("添加小时泵数据检查机制("+startFindTime+")结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
  644. }
  645. }catch(Exception ex){
  646. System.out.print(String.format("Batch initWaterCollecationReacordAll ERROR:{%s} "
  647. , ex.getLocalizedMessage()));
  648. }
  649. }
  650. }