|
@@ -0,0 +1,466 @@
|
|
|
+package com.shkpr.service.aimodelpower.bizmgr;
|
|
|
+
|
|
|
+import com.global.base.log.LogLevelFlag;
|
|
|
+import com.global.base.log.LogPrintMgr;
|
|
|
+import com.global.base.tools.FastJsonUtil;
|
|
|
+import com.shkpr.service.aimodelpower.commtools.TimeTool;
|
|
|
+import com.shkpr.service.aimodelpower.constants.LogFlagBusiType;
|
|
|
+import com.shkpr.service.aimodelpower.dbdao.DBMgrProxy;
|
|
|
+import com.shkpr.service.aimodelpower.dbdao.otherDataSource.service.intef.WaterTapWaterService;
|
|
|
+import com.shkpr.service.aimodelpower.dto.TraceRunnable;
|
|
|
+import com.shkpr.service.aimodelpower.globalmgr.ThreadTaskMgr;
|
|
|
+import com.shkpr.service.aimodelpower.globalmgr.TraceLogMgr;
|
|
|
+import org.apache.commons.collections.list.SynchronizedList;
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
+
|
|
|
+import java.text.DateFormat;
|
|
|
+import java.time.Duration;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @ClassName KprAimTapWaterBizFun
|
|
|
+ * @Description: TODO
|
|
|
+ * @Author LX
|
|
|
+ * @Date 2024/5/22
|
|
|
+ * @Version V1.0
|
|
|
+ **/
|
|
|
+public class KprAimTapWaterBizFun {
|
|
|
+ private static final String MSG_SUCCESS = "success.";
|
|
|
+ private static final String MSG_FAILED = "failed.";
|
|
|
+ private static final String mStrClassName = "KprAimTapWaterBizFun";
|
|
|
+ private static final String EMPTY_NULL = "NULL";
|
|
|
+
|
|
|
+ public static WaterTapWaterService getWaterTapWaterApi(){
|
|
|
+ return DBMgrProxy.getInstance().applyWaterTapWaterService();
|
|
|
+ }
|
|
|
+ static DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
+
|
|
|
+ //TODO 小时用水量 每天整点10分检查并写入,检查范围为当前时间到今天0点
|
|
|
+ public static void checkRecordAllData(){
|
|
|
+ //TODO 检查小时用水量
|
|
|
+ DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
+
|
|
|
+ System.out.println("检查机制-计算小时用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
|
|
|
+ System.out.println("检查机制-计算小时用水量进行中:......");
|
|
|
+ List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList(null);
|
|
|
+ if (!CollectionUtils.isEmpty(configList)) {
|
|
|
+ //TODO 按照组织机构分组
|
|
|
+ Map<Object, List<Map<String, Object>>> groupedData =
|
|
|
+ configList.stream().collect(Collectors.groupingBy(item -> item.get("org_name")));
|
|
|
+ final CountDownLatch latch = new CountDownLatch(configList.size());
|
|
|
+ for (Object key:groupedData.keySet()){
|
|
|
+ try {
|
|
|
+ ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
|
|
|
+ @Override
|
|
|
+ public void function() {
|
|
|
+ //TODO 根据当前配置信息item 查询远通数据中的历史数据
|
|
|
+ //TODO 首先查询当前水厂设备的从昨天之后到得到数据的数据
|
|
|
+ LocalDateTime today = LocalDateTime.now();
|
|
|
+ LocalDateTime startDateTime = today.withHour(0).withMinute(0).withSecond(0).withNano(0);
|
|
|
+
|
|
|
+ //TODO 需计算的循环天数
|
|
|
+
|
|
|
+ //TODO 此循环天数每一天所查的是所有设备每小时数据合
|
|
|
+ LocalDateTime newStartDateTime = startDateTime;
|
|
|
+ String startDate = newStartDateTime.format(formater);
|
|
|
+ String endDate = today.withMinute(0).withSecond(0).format(formater);
|
|
|
+ List<Map<String, Object>> deviceList = groupedData.get(key);
|
|
|
+ //TODO 循环获取该天该水厂每个设备数据
|
|
|
+
|
|
|
+ //TODO 查询当前天日期内每小时的设备数据
|
|
|
+ for (int i = 0; i < Integer.valueOf(String.valueOf(Duration.between(startDateTime, today.withMinute(0).withSecond(0)).toHours())); i++) {
|
|
|
+ String startTime = newStartDateTime.withHour(i).format(formater);
|
|
|
+ String endTime = newStartDateTime.minusHours(-(i + 1)).format(formater);
|
|
|
+
|
|
|
+ Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
|
|
|
+ recordAllEntity.put("org_name", key.toString());//水厂
|
|
|
+ recordAllEntity.put("time", endTime);//采集时间(小时的最后时间)
|
|
|
+ recordAllEntity.put("value", null);
|
|
|
+ recordAllEntity.put("value_tag", "water");
|
|
|
+ recordAllEntity.put("collcation_tag_array", FastJsonUtil.toJSON(deviceList));
|
|
|
+ Double value = null;
|
|
|
+ //TODO 此循环计算该小时所有设备的用水量
|
|
|
+ for (Map<String, Object> item : deviceList) {
|
|
|
+ Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
|
|
|
+ " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
|
|
|
+ " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
|
|
|
+ " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
|
|
|
+ if (itemCount != null && itemCount > 0) {
|
|
|
+ List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
|
|
|
+ " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
|
|
|
+ " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
|
|
|
+ " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
|
|
|
+ " order by QCQUISITION_TIME");
|
|
|
+ //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量
|
|
|
+ if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
|
|
|
+ Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null;
|
|
|
+ Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
|
|
|
+ if (firstValue != null && lastValue != null) {
|
|
|
+ //到此处是该小时一个设备的用水量已加上
|
|
|
+ if (value == null) {
|
|
|
+ value = 0.00;
|
|
|
+ }
|
|
|
+ System.out.println();
|
|
|
+ value += Math.abs(lastValue - firstValue);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ recordAllEntity.put("value", value);
|
|
|
+ List<Map<String, Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllList(1, 0,
|
|
|
+ " WHERE org_name = '" + recordAllEntity.get("org_name")
|
|
|
+ + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
|
|
|
+ if (CollectionUtils.isEmpty(queryWaterRecord)) {
|
|
|
+ //TODO 说明不存在,进行插入
|
|
|
+ int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAll(" (" +
|
|
|
+ "'" + recordAllEntity.get("org_name") + "'," +
|
|
|
+ "'" + recordAllEntity.get("time") + "'," +
|
|
|
+ "'" + recordAllEntity.get("value") + "'," +
|
|
|
+ "'" + recordAllEntity.get("value_tag") + "'," +
|
|
|
+ "'" + recordAllEntity.get("collcation_tag_array") + "'" +
|
|
|
+ ") ");
|
|
|
+ if (insertCode < 0) {
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
|
|
|
+ , mStrClassName
|
|
|
+ , mStrClassName
|
|
|
+ , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
|
|
|
+ FastJsonUtil.toJSON(recordAllEntity)));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ latch.countDown();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }catch(Exception ex){}
|
|
|
+ }
|
|
|
+ try{latch.await();}catch(Exception ex){}
|
|
|
+ System.out.println("检查机制-计算小时用水量结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //TODO 原始历史数据 每天整点15分检查并写入,检查范围为当前时间到昨天
|
|
|
+ public static void checkRecordData(){
|
|
|
+//TODO ① 首先查询水厂设备配置信息
|
|
|
+ try {
|
|
|
+ List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList(null);
|
|
|
+ if(!CollectionUtils.isEmpty(configList)){
|
|
|
+ //声明总数据的数据数组
|
|
|
+ List<Map<String,Object>> newRecordAll = Collections.synchronizedList(new LinkedList<Map<String,Object>>());
|
|
|
+ //TODO ②开启多线程并发处理各水厂设备的数据
|
|
|
+ System.out.println("检查机制--开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
|
|
|
+ System.out.println("检查机制--进行中:......");
|
|
|
+ final CountDownLatch latch = new CountDownLatch(configList.size());
|
|
|
+ for(Map<String,Object> item:configList){
|
|
|
+ try{
|
|
|
+ ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
|
|
|
+ @Override
|
|
|
+ public void function() {
|
|
|
+ //TODO 根据当前配置信息item 查询远通数据中的历史数据
|
|
|
+ //TODO 首先查询当前水厂设备的昨天0点之后到当前时间最新数据的数据
|
|
|
+ LocalDateTime today = LocalDateTime.now();
|
|
|
+ LocalDateTime yesterday = today.minusDays(1).withHour(0).withMinute(0).withSecond(0).withNano(0);
|
|
|
+ Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(" WHERE TAG_CODE = '"+item.get("collcation_tag")+"' " +
|
|
|
+ " and QCQUISITION_TIME >= to_date('"+yesterday.format(formater)+"', 'yyyy-mm-dd hh24:mi:ss')");
|
|
|
+ if(itemCount!=null&&itemCount>0) {
|
|
|
+ //TODO 优化 以分页方式查询所有,初始分页行数定为2000查询速率较好
|
|
|
+ int pageNum = itemCount%2000==0?itemCount/2000:(itemCount/2000)+1;//总页数
|
|
|
+ Integer limit = 2000;
|
|
|
+ if(pageNum<=1){
|
|
|
+ limit = itemCount;//说明总数比第一页小
|
|
|
+ }
|
|
|
+ for (int i = 0; i < pageNum; i++) {
|
|
|
+ Integer offset = i*limit;
|
|
|
+ //tapWaterHistoryList 为远通水量数据源
|
|
|
+ //TODO 因为是orcale数据库 分页方式以伪列rownum为分页条件,因此limit实际上永远比offset大2000,因此只要不是第一页,limit=offset+2000
|
|
|
+ Integer limitNew = 0;
|
|
|
+ if(i>0){
|
|
|
+ limitNew = offset+2000;
|
|
|
+ }else{
|
|
|
+ limitNew = 2000;
|
|
|
+ }
|
|
|
+ List<Map<String,Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(limitNew,offset," " +
|
|
|
+ "AND TAG_CODE = '"+item.get("collcation_tag")+"' " +
|
|
|
+ "and QCQUISITION_TIME >= to_date('"+yesterday.format(formater)+"', 'yyyy-mm-dd hh24:mi:ss')");
|
|
|
+ if(!CollectionUtils.isEmpty(tapWaterHistoryList)){
|
|
|
+ //TODO 循环远通水量数据列表,查询数据不存在的话就插入
|
|
|
+ for (int j = 0; j < tapWaterHistoryList.size(); j++) {
|
|
|
+ List<Map<String,Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordList(1,0,
|
|
|
+ " WHERE collcation_tag = '"+tapWaterHistoryList.get(j).get("TAG_CODE")
|
|
|
+ +"' AND org_name = '"+tapWaterHistoryList.get(j).get("NAME")+"' AND time = '"+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+"'");
|
|
|
+ if(CollectionUtils.isEmpty(queryWaterRecord)){
|
|
|
+ //TODO 说明没插入过本系统,执行插入
|
|
|
+ String extend = " ('"+tapWaterHistoryList.get(j).get("TAG_CODE")+"','"+tapWaterHistoryList.get(j).get("NAME")+"',"
|
|
|
+ + ""+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+","
|
|
|
+ + "'"+tapWaterHistoryList.get(j).get("VAL")+"',"
|
|
|
+ + TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("UPDATE_TIME").toString())
|
|
|
+ +") ";
|
|
|
+ int insertCode = getWaterTapWaterApi().insertWaterCollectionRecord(extend);
|
|
|
+ if(insertCode<=0){
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
|
|
|
+ , mStrClassName
|
|
|
+ , mStrClassName
|
|
|
+ , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
|
|
|
+ extend));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ latch.countDown();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }catch(Exception ex){
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
|
|
|
+ , mStrClassName
|
|
|
+ , mStrClassName
|
|
|
+ , String.format("Batch initTapWaterDataThread 水厂:{%s},标识:{%s} ERROR:{%s} ",
|
|
|
+ item.get("org_name"),
|
|
|
+ item.get("collcation_tag")
|
|
|
+ , ex.getLocalizedMessage()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ latch.await();
|
|
|
+ System.out.println("检查机制--结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
|
|
|
+ }
|
|
|
+ }catch(Exception ex){
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
|
|
|
+ , mStrClassName
|
|
|
+ , mStrClassName
|
|
|
+ , String.format("Batch initTapWaterData ERROR:{%s} "
|
|
|
+ , ex.getLocalizedMessage()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //TODO 初始化添加对比远通数据
|
|
|
+ public static void initTapWaterData(){
|
|
|
+ //TODO ① 首先查询水厂设备配置信息
|
|
|
+ try {
|
|
|
+ List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList(null);
|
|
|
+ if(!CollectionUtils.isEmpty(configList)){
|
|
|
+ //声明总数据的数据数组
|
|
|
+ List<Map<String,Object>> newRecordAll = Collections.synchronizedList(new LinkedList<Map<String,Object>>());
|
|
|
+ //TODO ②开启多线程并发处理各水厂设备的数据
|
|
|
+ System.out.println("开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
|
|
|
+ System.out.println("进行中:......");
|
|
|
+ final CountDownLatch latch = new CountDownLatch(configList.size());
|
|
|
+ for(Map<String,Object> item:configList){
|
|
|
+ try{
|
|
|
+ ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
|
|
|
+ @Override
|
|
|
+ public void function() {
|
|
|
+ //TODO 根据当前配置信息item 查询远通数据中的历史数据
|
|
|
+ //TODO 首先查询当前水厂设备的2023-01-01之后到最新数据的数据总数,然后分页形式获取数据
|
|
|
+ Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(" WHERE TAG_CODE = '"+item.get("collcation_tag")+"' and QCQUISITION_TIME >= to_date('2023-01-01 00:00:00', 'yyyy-mm-dd hh24:mi:ss')");
|
|
|
+ if(itemCount!=null&&itemCount>0) {
|
|
|
+ //TODO 优化 以分页方式查询所有,初始分页行数定为2000查询速率较好
|
|
|
+ int pageNum = itemCount%2000==0?itemCount/2000:(itemCount/2000)+1;//总页数
|
|
|
+ Integer limit = 2000;
|
|
|
+ if(pageNum<=1){
|
|
|
+ limit = itemCount;//说明总数比第一页小
|
|
|
+ }
|
|
|
+ for (int i = 0; i < pageNum; i++) {
|
|
|
+ Integer offset = i*limit;
|
|
|
+ //tapWaterHistoryList 为远通水量数据源
|
|
|
+ //TODO 因为是orcale数据库 分页方式以伪列rownum为分页条件,因此limit实际上永远比offset大2000,因此只要不是第一页,limit=offset+2000
|
|
|
+ Integer limitNew = 0;
|
|
|
+ if(i>0){
|
|
|
+ limitNew = offset+2000;
|
|
|
+ }else{
|
|
|
+ limitNew = 2000;
|
|
|
+ }
|
|
|
+ List<Map<String,Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(limitNew,offset," AND TAG_CODE = '"+item.get("collcation_tag")+"' and QCQUISITION_TIME >= to_date('2023-01-01 00:00:00', 'yyyy-mm-dd hh24:mi:ss')");
|
|
|
+ if(!CollectionUtils.isEmpty(tapWaterHistoryList)){
|
|
|
+ //TODO 循环远通水量数据列表,查询数据不存在的话就插入
|
|
|
+ for (int j = 0; j < tapWaterHistoryList.size(); j++) {
|
|
|
+ List<Map<String,Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordList(1,0,
|
|
|
+ " WHERE collcation_tag = '"+tapWaterHistoryList.get(j).get("TAG_CODE")
|
|
|
+ +"' AND org_name = '"+tapWaterHistoryList.get(j).get("NAME")+"' AND time = '"+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+"'");
|
|
|
+ if(CollectionUtils.isEmpty(queryWaterRecord)){
|
|
|
+ //TODO 说明没插入过本系统,执行插入
|
|
|
+ String extend = " ('"+tapWaterHistoryList.get(j).get("TAG_CODE")+"','"+tapWaterHistoryList.get(j).get("NAME")+"',"
|
|
|
+ + ""+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+","
|
|
|
+ + "'"+tapWaterHistoryList.get(j).get("VAL")+"',"
|
|
|
+ + TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("UPDATE_TIME").toString())
|
|
|
+ +") ";
|
|
|
+ int insertCode = getWaterTapWaterApi().insertWaterCollectionRecord(extend);
|
|
|
+ if(insertCode<=0){
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
|
|
|
+ , mStrClassName
|
|
|
+ , mStrClassName
|
|
|
+ , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
|
|
|
+ extend));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ latch.countDown();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }catch(Exception ex){
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
|
|
|
+ , mStrClassName
|
|
|
+ , mStrClassName
|
|
|
+ , String.format("Batch initTapWaterDataThread 水厂:{%s},标识:{%s} ERROR:{%s} ",
|
|
|
+ item.get("org_name"),
|
|
|
+ item.get("collcation_tag")
|
|
|
+ , ex.getLocalizedMessage()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ latch.await();
|
|
|
+ System.out.println("结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
|
|
|
+ }
|
|
|
+ }catch(Exception ex){
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
|
|
|
+ , mStrClassName
|
|
|
+ , mStrClassName
|
|
|
+ , String.format("Batch initTapWaterData ERROR:{%s} "
|
|
|
+ , ex.getLocalizedMessage()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //TODO 初始化添加计算水厂所有设备每日的每小时用水量计算
|
|
|
+ public static void initWaterCollecationReacordAll(){
|
|
|
+ //TODO ① 首先查询水厂设备配置信息
|
|
|
+ try {
|
|
|
+
|
|
|
+ System.out.println("计算小时用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
|
|
|
+ System.out.println("计算小时用水量进行中:......");
|
|
|
+ List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList(null);
|
|
|
+ if (!CollectionUtils.isEmpty(configList)) {
|
|
|
+ //TODO 按照组织机构分组
|
|
|
+ Map<Object, List<Map<String, Object>>> groupedData =
|
|
|
+ configList.stream().collect(Collectors.groupingBy(item -> item.get("org_name")));
|
|
|
+ final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
|
|
|
+
|
|
|
+ //TODO 外层循环组织机构
|
|
|
+ for (Object key:groupedData.keySet()){
|
|
|
+ try{
|
|
|
+ ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
|
|
|
+ @Override
|
|
|
+ public void function() {
|
|
|
+ //TODO 根据当前配置信息item 查询远通数据中的历史数据
|
|
|
+ //TODO 首先查询当前水厂设备的从2023-01-01之后到得到数据
|
|
|
+ LocalDateTime startDateTime = LocalDateTime.of(2023, 01, 01, 00, 00, 00);
|
|
|
+
|
|
|
+ //TODO 需计算的循环天数
|
|
|
+ Long days = 0L;
|
|
|
+ days = Duration.between(startDateTime, LocalDateTime.now()).toDays();
|
|
|
+
|
|
|
+ //TODO 此循环天数每一天所查的是所有设备每小时数据合
|
|
|
+ final CountDownLatch latch2 = new CountDownLatch(days.intValue());
|
|
|
+ for(Long k = 0L;k<days;k++) {
|
|
|
+ LocalDateTime newStartDateTime = startDateTime.minusDays(-k.intValue());
|
|
|
+ String startDate = newStartDateTime.format(formater);
|
|
|
+ String endDate = newStartDateTime.minusDays(-1).format(formater);
|
|
|
+ try {
|
|
|
+ ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
|
|
|
+ @Override
|
|
|
+ public void function() {
|
|
|
+
|
|
|
+ List<Map<String, Object>> deviceList = groupedData.get(key);
|
|
|
+
|
|
|
+ //TODO 循环获取该天该水厂每个设备数据
|
|
|
+
|
|
|
+ //TODO 查询当前天日期内每小时的设备数据
|
|
|
+ for (int i = 0; i < 24; i++) {
|
|
|
+ String startTime = newStartDateTime.withHour(i).format(formater);
|
|
|
+ String endTime = newStartDateTime.minusHours(-(i + 1)).format(formater);
|
|
|
+
|
|
|
+ Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
|
|
|
+ recordAllEntity.put("org_name", key.toString());//水厂
|
|
|
+ recordAllEntity.put("time", endTime);//采集时间(小时的最后时间)
|
|
|
+ recordAllEntity.put("value", null);
|
|
|
+ recordAllEntity.put("value_tag", "water");
|
|
|
+ recordAllEntity.put("collcation_tag_array", FastJsonUtil.toJSON(deviceList));
|
|
|
+ Double value = null;
|
|
|
+ //TODO 此循环计算该小时所有设备的用水量
|
|
|
+ for (Map<String, Object> item : deviceList) {
|
|
|
+ Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
|
|
|
+ " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
|
|
|
+ " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
|
|
|
+ " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
|
|
|
+ if (itemCount != null && itemCount > 0) {
|
|
|
+ List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
|
|
|
+ " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
|
|
|
+ " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
|
|
|
+ " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
|
|
|
+ " order by QCQUISITION_TIME");
|
|
|
+ //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量
|
|
|
+ if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
|
|
|
+ Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null;
|
|
|
+ Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
|
|
|
+ if (firstValue != null && lastValue != null) {
|
|
|
+ //到此处是该小时一个设备的用水量已加上
|
|
|
+ if (value == null) {
|
|
|
+ value = 0.00;
|
|
|
+ }
|
|
|
+ value += Math.abs(lastValue - firstValue);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ recordAllEntity.put("value", value);
|
|
|
+ List<Map<String, Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllList(1, 0,
|
|
|
+ " WHERE org_name = '" + recordAllEntity.get("org_name")
|
|
|
+ + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
|
|
|
+ if (CollectionUtils.isEmpty(queryWaterRecord)) {
|
|
|
+ //TODO 说明不存在,进行插入
|
|
|
+ int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAll(" (" +
|
|
|
+ "'" + recordAllEntity.get("org_name") + "'," +
|
|
|
+ "'" + recordAllEntity.get("time") + "'," +
|
|
|
+ "'" + recordAllEntity.get("value") + "'," +
|
|
|
+ "'" + recordAllEntity.get("value_tag") + "'," +
|
|
|
+ "'" + recordAllEntity.get("collcation_tag_array") + "'" +
|
|
|
+ ") ");
|
|
|
+ if (insertCode < 0) {
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
|
|
|
+ , mStrClassName
|
|
|
+ , mStrClassName
|
|
|
+ , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
|
|
|
+ FastJsonUtil.toJSON(recordAllEntity)));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ latch2.countDown();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }catch(Exception ex){
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ latch2.await();
|
|
|
+ }catch(Exception ex){
|
|
|
+
|
|
|
+ }
|
|
|
+ latch.countDown();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }catch(Exception ex){
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
|
|
|
+ , mStrClassName
|
|
|
+ , mStrClassName
|
|
|
+ , String.format("Batch initWaterReacordAll ERROR:{%s} "
|
|
|
+ , ex.getLocalizedMessage()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ latch.await();
|
|
|
+ System.out.println("计算小时用水量结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
|
|
|
+ }
|
|
|
+ }catch(Exception ex){
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
|
|
|
+ , mStrClassName
|
|
|
+ , mStrClassName
|
|
|
+ , String.format("Batch initWaterCollecationReacordAll ERROR:{%s} "
|
|
|
+ , ex.getLocalizedMessage()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|