123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684 |
- package com.shkpr.service.aimodelpower.bizmgr;
- import com.alibaba.fastjson.JSONObject;
- import com.global.base.log.LogLevelFlag;
- import com.global.base.log.LogPrintMgr;
- import com.global.base.tools.FastJsonUtil;
- import com.shkpr.service.aimodelpower.commtools.TimeTool;
- import com.shkpr.service.aimodelpower.dbdao.DBMgrProxy;
- import com.shkpr.service.aimodelpower.dbdao.shizilaishuiDataSource.service.intef.WaterTapWaterService;
- import com.shkpr.service.aimodelpower.dto.TraceRunnable;
- import com.shkpr.service.aimodelpower.globalmgr.ThreadTaskMgr;
- import com.shkpr.service.aimodelpower.globalmgr.TraceLogMgr;
- import org.springframework.util.CollectionUtils;
- import org.springframework.util.ObjectUtils;
- import java.time.Duration;
- import java.time.LocalDateTime;
- import java.time.format.DateTimeFormatter;
- import java.util.*;
- import java.util.concurrent.CountDownLatch;
- import java.util.stream.Collectors;
- /**
- * @ClassName KprAimTapWaterBizFun
- * @Description: TODO
- * @Author LX
- * @Date 2024/5/22
- * @Version V1.0
- **/
- public class KprAimTapWaterBizFun {
- private static final String MSG_SUCCESS = "success.";
- private static final String MSG_FAILED = "failed.";
- private static final String mStrClassName = "KprAimTapWaterBizFun";
- private static final String EMPTY_NULL = "NULL";
- public static WaterTapWaterService getWaterTapWaterApi(){
- return DBMgrProxy.getInstance().applyWaterTapWaterService();
- }
- static DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- //TODO 小时用水量 每天整点10分检查并写入,检查范围为当前时间到昨天23点
- public static void checkRecordAllData(){
- //TODO 检查小时用水量
- DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- System.out.println("检查机制-计算小时用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- System.out.println("检查机制-计算小时用水量进行中:......");
- List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList(null);
- if (!CollectionUtils.isEmpty(configList)) {
- //TODO 按照组织机构分组
- Map<Object, List<Map<String, Object>>> groupedData =
- configList.stream().collect(Collectors.groupingBy(item -> item.get("org_name")));
- final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
- for (Object key:groupedData.keySet()){
- try {
- ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
- @Override
- public void function() {
- //TODO 根据当前配置信息item 查询远通数据中的历史数据
- //TODO 首先查询当前水厂设备的从昨天之后到得到数据的数据
- LocalDateTime today = LocalDateTime.now();
- LocalDateTime startDateTime = today.withHour(0).withMinute(0).withSecond(0).withNano(0).minusHours(1);
- //TODO 需计算的循环天数
- //TODO 此循环天数每一天所查的是所有设备每小时数据合
- LocalDateTime newStartDateTime = startDateTime;
- String startDate = newStartDateTime.format(formater);
- String endDate = today.withMinute(0).withSecond(0).format(formater);
- List<Map<String, Object>> deviceList = groupedData.get(key);
- //TODO 循环获取该天该水厂每个设备数据
- //TODO 查询当前天日期内每小时的设备数据
- for (int i = 0; i < Integer.valueOf(String.valueOf(Duration.between(startDateTime, today.withMinute(0).withSecond(0)).toHours())); i++) {
- String startTime = newStartDateTime.minusHours(-i).format(formater);
- String endTime = newStartDateTime.minusHours(-(i + 1)).format(formater);
- Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
- recordAllEntity.put("org_name", key.toString());//水厂
- recordAllEntity.put("time", endTime);//采集时间(小时的最后时间)
- recordAllEntity.put("value", null);
- recordAllEntity.put("value_tag", "water");
- recordAllEntity.put("collcation_tag_array", FastJsonUtil.toJSON(deviceList));
- Double value = null;
- //TODO 此循环计算该小时所有设备的用水量
- for (Map<String, Object> item : deviceList) {
- //TODO 和尚山水厂的三个OPC的设备0点到1点的数据特殊处理,直接取最后一条数据作为0点到1点的用水量
- // 定义字符串日期时间的格式
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- // 解析字符串以创建 LocalDateTime 实例
- LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
- if (dateTime.getHour()==1&&("OPC_SC.HSS.HSS1.AI.LJLL1.F_CV".equals(item.get("collcation_tag").toString()) ||
- "OPC_SC.HSS.HSS1.AI.LJLL2.F_CV".equals(item.get("collcation_tag").toString()) ||
- "OPC_SC.HSS.HSS1.AI.LJLL3.F_CV".equals(item.get("collcation_tag").toString()))) {
- Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
- " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
- if(itemCount!=null&&itemCount>0) {
- List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
- " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " order by QCQUISITION_TIME");
- if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
- if (value == null) {
- value = 0.00;
- }
- value += tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
- }
- }
- }else {
- Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
- " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
- if (itemCount != null && itemCount > 0) {
- List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
- " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " order by QCQUISITION_TIME");
- //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量
- if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
- Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null;
- Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
- if (firstValue != null && lastValue != null) {
- //到此处是该小时一个设备的用水量已加上
- if (value == null) {
- value = 0.00;
- }
- value += Math.abs(lastValue - firstValue);
- }
- }
- }
- }
- }
- recordAllEntity.put("value", value);
- List<Map<String, Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllList(1, 0,
- " WHERE org_name = '" + recordAllEntity.get("org_name")
- + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
- if (CollectionUtils.isEmpty(queryWaterRecord)) {
- //TODO 说明不存在,进行插入
- if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
- int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAll(" (" +
- "'" + recordAllEntity.get("org_name") + "'," +
- "'" + recordAllEntity.get("time") + "'," +
- "'" + recordAllEntity.get("value") + "'," +
- "'" + recordAllEntity.get("value_tag") + "'," +
- "'" + recordAllEntity.get("collcation_tag_array") + "'" +
- ") ");
- if (insertCode < 0) {
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
- FastJsonUtil.toJSON(recordAllEntity)));
- }
- }
- }else{
- //TODO 说明存在,进行修改
- if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
- int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAll(String.valueOf(value)," WHERE " +
- "(" +
- " org_name = '" + recordAllEntity.get("org_name") + "' and" +
- " \"time\" = '" + recordAllEntity.get("time") + "' and" +
- " value_tag = '" + recordAllEntity.get("value_tag") + "'" +
- ") ");
- if (updateCode < 0) {
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ",
- FastJsonUtil.toJSON(recordAllEntity)));
- }
- }
- }
- }
- latch.countDown();
- }
- });
- }catch(Exception ex){}
- }
- try{latch.await();}catch(Exception ex){}
- System.out.println("检查机制-计算小时用水量结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- }
- }
- //TODO 原始历史数据 每天整点15分检查并写入,检查范围为当前时间到昨天
- public static void checkRecordData(){
- //TODO ① 首先查询水厂设备配置信息
- try {
- List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList(null);
- if(!CollectionUtils.isEmpty(configList)){
- //声明总数据的数据数组
- List<Map<String,Object>> newRecordAll = Collections.synchronizedList(new LinkedList<Map<String,Object>>());
- //TODO ②开启多线程并发处理各水厂设备的数据
- System.out.println("检查机制--开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- System.out.println("检查机制--进行中:......");
- final CountDownLatch latch = new CountDownLatch(configList.size());
- for(Map<String,Object> item:configList){
- try{
- ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
- @Override
- public void function() {
- //TODO 根据当前配置信息item 查询远通数据中的历史数据
- //TODO 首先查询当前水厂设备的昨天0点之后到当前时间最新数据的数据
- LocalDateTime today = LocalDateTime.now();
- LocalDateTime yesterday = today.minusDays(1).withHour(0).withMinute(0).withSecond(0).withNano(0);
- Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(" WHERE TAG_CODE = '"+item.get("collcation_tag")+"' " +
- " and QCQUISITION_TIME >= to_date('"+yesterday.format(formater)+"', 'yyyy-mm-dd hh24:mi:ss')");
- if(itemCount!=null&&itemCount>0) {
- //TODO 优化 以分页方式查询所有,初始分页行数定为2000查询速率较好
- int pageNum = itemCount%2000==0?itemCount/2000:(itemCount/2000)+1;//总页数
- Integer limit = 2000;
- if(pageNum<=1){
- limit = itemCount;//说明总数比第一页小
- }
- for (int i = 0; i < pageNum; i++) {
- Integer offset = i*limit;
- //tapWaterHistoryList 为远通水量数据源
- //TODO 因为是orcale数据库 分页方式以伪列rownum为分页条件,因此limit实际上永远比offset大2000,因此只要不是第一页,limit=offset+2000
- Integer limitNew = 0;
- if(i>0){
- limitNew = offset+2000;
- }else{
- limitNew = 2000;
- }
- List<Map<String,Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(limitNew,offset," " +
- "AND TAG_CODE = '"+item.get("collcation_tag")+"' " +
- "and QCQUISITION_TIME >= to_date('"+yesterday.format(formater)+"', 'yyyy-mm-dd hh24:mi:ss')");
- if(!CollectionUtils.isEmpty(tapWaterHistoryList)){
- //TODO 循环远通水量数据列表,查询数据不存在的话就插入
- for (int j = 0; j < tapWaterHistoryList.size(); j++) {
- List<Map<String,Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordList(1,0,
- " WHERE collcation_tag = '"+tapWaterHistoryList.get(j).get("TAG_CODE")
- +"' AND org_name = '"+tapWaterHistoryList.get(j).get("NAME")+"' AND time = '"+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+"'");
- if(CollectionUtils.isEmpty(queryWaterRecord)){
- //TODO 说明没插入过本系统,执行插入
- String extend = " ('"+tapWaterHistoryList.get(j).get("TAG_CODE")+"','"+tapWaterHistoryList.get(j).get("NAME")+"',"
- + ""+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+","
- + "'"+tapWaterHistoryList.get(j).get("VAL")+"',"
- + TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("UPDATE_TIME").toString())
- +") ";
- int insertCode = getWaterTapWaterApi().insertWaterCollectionRecord(extend);
- if(insertCode<=0){
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
- extend));
- }
- }
- }
- }
- }
- }
- latch.countDown();
- }
- });
- }catch(Exception ex){
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch initTapWaterDataThread 水厂:{%s},标识:{%s} ERROR:{%s} ",
- item.get("org_name"),
- item.get("collcation_tag")
- , ex.getLocalizedMessage()));
- }
- }
- latch.await();
- System.out.println("检查机制--结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- }
- }catch(Exception ex){
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch initTapWaterData ERROR:{%s} "
- , ex.getLocalizedMessage()));
- }
- }
- //TODO 初始化添加对比远通数据
- public static void initTapWaterData(){
- //TODO ① 首先查询水厂设备配置信息
- try {
- List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList(null);
- if(!CollectionUtils.isEmpty(configList)){
- //声明总数据的数据数组
- List<Map<String,Object>> newRecordAll = Collections.synchronizedList(new LinkedList<Map<String,Object>>());
- //TODO ②开启多线程并发处理各水厂设备的数据
- System.out.println("比对远通原始数据开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- System.out.println("进行中:......");
- final CountDownLatch latch = new CountDownLatch(configList.size());
- for(Map<String,Object> item:configList){
- try{
- ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
- @Override
- public void function() {
- //TODO 根据当前配置信息item 查询远通数据中的历史数据
- //TODO 首先查询当前水厂设备的2025-01-01之后到最新数据的数据总数,然后分页形式获取数据
- Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(" WHERE TAG_CODE = '"+item.get("collcation_tag")+"' and QCQUISITION_TIME >= to_date('2025-01-01 00:00:00', 'yyyy-mm-dd hh24:mi:ss')");
- if(itemCount!=null&&itemCount>0) {
- //TODO 优化 以分页方式查询所有,初始分页行数定为2000查询速率较好
- int pageNum = itemCount%2000==0?itemCount/2000:(itemCount/2000)+1;//总页数
- Integer limit = 2000;
- if(pageNum<=1){
- limit = itemCount;//说明总数比第一页小
- }
- for (int i = 0; i < pageNum; i++) {
- Integer offset = i*limit;
- //tapWaterHistoryList 为远通水量数据源
- //TODO 因为是orcale数据库 分页方式以伪列rownum为分页条件,因此limit实际上永远比offset大2000,因此只要不是第一页,limit=offset+2000
- Integer limitNew = 0;
- if(i>0){
- limitNew = offset+2000;
- }else{
- limitNew = 2000;
- }
- List<Map<String,Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(limitNew,offset," AND TAG_CODE = '"+item.get("collcation_tag")+"' and QCQUISITION_TIME >= to_date('2025-01-01 00:00:00', 'yyyy-mm-dd hh24:mi:ss')");
- if(!CollectionUtils.isEmpty(tapWaterHistoryList)){
- //TODO 循环远通水量数据列表,查询数据不存在的话就插入
- for (int j = 0; j < tapWaterHistoryList.size(); j++) {
- List<Map<String,Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordList(1,0,
- " WHERE collcation_tag = '"+tapWaterHistoryList.get(j).get("TAG_CODE")
- +"' AND org_name = '"+tapWaterHistoryList.get(j).get("NAME")+"' AND time = '"+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+"'");
- if(CollectionUtils.isEmpty(queryWaterRecord)){
- //TODO 说明没插入过本系统,执行插入
- String extend = " ('"+tapWaterHistoryList.get(j).get("TAG_CODE")+"','"+tapWaterHistoryList.get(j).get("NAME")+"',"
- + ""+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+","
- + "'"+tapWaterHistoryList.get(j).get("VAL")+"',"
- + TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("UPDATE_TIME").toString())
- +") ";
- int insertCode = getWaterTapWaterApi().insertWaterCollectionRecord(extend);
- if(insertCode<=0){
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
- extend));
- }
- }
- }
- }
- }
- }
- latch.countDown();
- }
- });
- }catch(Exception ex){
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch initTapWaterDataThread 水厂:{%s},标识:{%s} ERROR:{%s} ",
- item.get("org_name"),
- item.get("collcation_tag")
- , ex.getLocalizedMessage()));
- }
- }
- latch.await();
- System.out.println("比对远通原始数据结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- }
- }catch(Exception ex){
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch initTapWaterData ERROR:{%s} "
- , ex.getLocalizedMessage()));
- }
- }
- //TODO 初始化添加计算水厂所有设备每日的每小时用水量计算
- public static void initWaterCollecationReacordAll(){
- //TODO ① 首先查询水厂设备配置信息
- try {
- System.out.println("计算小时用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- System.out.println("计算小时用水量进行中:......");
- List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList(null);
- if (!CollectionUtils.isEmpty(configList)) {
- //TODO 按照组织机构分组
- Map<Object, List<Map<String, Object>>> groupedData =
- configList.stream().collect(Collectors.groupingBy(item -> item.get("org_name")));
- final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
- //TODO 外层循环组织机构
- for (Object key:groupedData.keySet()){
- try{
- ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
- @Override
- public void function() {
- //TODO 根据当前配置信息item 查询远通数据中的历史数据
- //TODO 首先查询当前水厂设备的从2025-01-01之后到得到数据
- LocalDateTime startDateTime = LocalDateTime.of(2025, 01, 01, 00, 00, 00);
- //TODO 需计算的循环天数
- Long days = 0L;
- days = Duration.between(startDateTime, LocalDateTime.now()).toDays();
- //TODO 此循环天数每一天所查的是所有设备每小时数据合
- final CountDownLatch latch2 = new CountDownLatch(days.intValue());
- for(Long k = 0L;k<days;k++) {
- LocalDateTime newStartDateTime = startDateTime.minusDays(-k.intValue());
- String startDate = newStartDateTime.format(formater);
- String endDate = newStartDateTime.minusDays(-1).format(formater);
- try {
- ThreadTaskMgr.runTask(new TraceRunnable(TraceLogMgr.getTraceId()) {
- @Override
- public void function() {
- List<Map<String, Object>> deviceList = groupedData.get(key);
- //TODO 循环获取该天该水厂每个设备数据
- //TODO 查询当前天日期内每小时的设备数据
- for (int i = 0; i < 24; i++) {
- String startTime = newStartDateTime.withHour(i).format(formater);
- String endTime = newStartDateTime.minusHours(-(i + 1)).format(formater);
- Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
- recordAllEntity.put("org_name", key.toString());//水厂
- recordAllEntity.put("time", endTime);//采集时间(小时的最后时间)
- recordAllEntity.put("value", null);
- recordAllEntity.put("value_tag", "water");
- recordAllEntity.put("collcation_tag_array", FastJsonUtil.toJSON(deviceList));
- Double value = null;
- //TODO 此循环计算该小时所有设备的用水量
- for (Map<String, Object> item : deviceList) {
- //TODO 和尚山水厂的三个OPC的设备0点到1点的数据特殊处理,直接取最后一条数据作为0点到1点的用水量
- // 定义字符串日期时间的格式
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- // 解析字符串以创建 LocalDateTime 实例
- LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
- if (dateTime.getHour()==1&&("OPC_SC.HSS.HSS1.AI.LJLL1.F_CV".equals(item.get("collcation_tag").toString()) ||
- "OPC_SC.HSS.HSS1.AI.LJLL2.F_CV".equals(item.get("collcation_tag").toString()) ||
- "OPC_SC.HSS.HSS1.AI.LJLL3.F_CV".equals(item.get("collcation_tag").toString()))) {
- Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
- " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
- if(itemCount!=null&&itemCount>0) {
- List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
- " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " order by QCQUISITION_TIME");
- if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
- if (value == null) {
- value = 0.00;
- }
- value += tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
- }
- }
- }else {
- Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
- " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME < to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
- if (itemCount != null && itemCount > 0) {
- List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
- " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME < to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " order by QCQUISITION_TIME");
- //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量
- if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
- Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null;
- Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
- if (firstValue != null && lastValue != null) {
- //到此处是该小时一个设备的用水量已加上
- if (value == null) {
- value = 0.00;
- }
- value += Math.abs(lastValue - firstValue);
- }
- }
- }
- }
- }
- recordAllEntity.put("value", value);
- List<Map<String, Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllList(1, 0,
- " WHERE org_name = '" + recordAllEntity.get("org_name")
- + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
- if (CollectionUtils.isEmpty(queryWaterRecord)) {
- //TODO 说明不存在,进行插入
- if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
- int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAll(" (" +
- "'" + recordAllEntity.get("org_name") + "'," +
- "'" + recordAllEntity.get("time") + "'," +
- "'" + recordAllEntity.get("value") + "'," +
- "'" + recordAllEntity.get("value_tag") + "'," +
- "'" + recordAllEntity.get("collcation_tag_array") + "'" +
- ") ");
- if (insertCode < 0) {
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch initTapWaterDataThread 未成功:{%s} ",
- FastJsonUtil.toJSON(recordAllEntity)));
- }
- }
- }else{
- //TODO 说明存在,进行修改
- if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
- int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAll(String.valueOf(value)," WHERE " +
- "(" +
- " org_name = '" + recordAllEntity.get("org_name") + "' and" +
- " \"time\" = '" + recordAllEntity.get("time") + "' and" +
- " value_tag = '" + recordAllEntity.get("value_tag") + "'" +
- ") ");
- if (updateCode < 0) {
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ",
- FastJsonUtil.toJSON(recordAllEntity)));
- }
- }
- }
- }
- latch2.countDown();
- }
- });
- }catch(Exception ex){
- }
- }
- try {
- latch2.await();
- }catch(Exception ex){
- }
- latch.countDown();
- }
- });
- }catch(Exception ex){
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch" +
- " initWaterReacordAll ERROR:{%s} "
- , ex.getLocalizedMessage()));
- }
- }
- latch.await();
- System.out.println("计算小时用水量结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- }
- }catch(Exception ex){
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
- , mStrClassName
- , mStrClassName
- , String.format("Batch initWaterCollecationReacordAll ERROR:{%s} "
- , ex.getLocalizedMessage()));
- }
- }
- //TODO 初始化添加计算泵每日的小时数据
- public static void initWaterPumpReacordAll(String startFindTime){
- //TODO ① 首先查询水厂设备配置信息
- try {
- System.out.println("添加小时泵数据开始时间:"+ TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- System.out.println("添加小时泵数据起始时间:("+startFindTime+")进行中:......");
- List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterPumpCollectionConfigList(null);
- if (!CollectionUtils.isEmpty(configList)) {
- //TODO 按照设备分组
- Map<Object, List<Map<String, Object>>> groupedData =
- configList.stream().collect(Collectors.groupingBy(item -> item.get("device_code")));
- final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
- //TODO 外层循环组织机构
- for (Object key:groupedData.keySet()){
- try{
- new Thread(() -> {
- //TODO 根据当前配置信息item 查询远通数据中的历史数据
- //TODO 首先查询当前水厂设备的从startFindTime之后到得到数据
- LocalDateTime startDateTime = LocalDateTime.parse(startFindTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
- //TODO 需计算的循环天数
- Long days = 0L;
- days = Duration.between(startDateTime, LocalDateTime.now()).toDays();
- days+=1;
- //TODO 此循环天数每一天所查的是所有设备每小时数据合
- final CountDownLatch latch2 = new CountDownLatch(days.intValue());
- for(Long k = 0L;k<days;k++) {
- LocalDateTime newStartDateTime = startDateTime.minusDays(-k.intValue());
- String startDate = newStartDateTime.format(formater);
- String endDate = newStartDateTime.minusDays(-1).format(formater);
- try {
- new Thread(() -> {
- List<Map<String, Object>> deviceList = groupedData.get(key);
- //TODO 循环获取该天该水厂每个设备数据
- //TODO 查询当前天日期内每小时的设备数据
- for (int i = 0; i < 24; i++) {
- String startTime = newStartDateTime.withHour(i).format(formater);
- //TODO 需要加个05分把endTime的整点数据查出来
- String endTime = newStartDateTime.minusHours(-(i + 1)).withMinute(5).withSecond(0).format(formater);
- //TODO 此循环计算该小时所有设备的泵数据
- for (Map<String, Object> item : deviceList) {
- // 定义字符串日期时间的格式
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- // 解析字符串以创建 LocalDateTime 实例
- LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
- Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
- " WHERE 1=1 AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME <= to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " AND ABS(" +
- " (QCQUISITION_TIME - TRUNC(QCQUISITION_TIME, 'HH')) * 24 * 60 " +
- " ) <= 5");//查询 -- 采集时间接近整点,误差在5分钟内
- if (itemCount != null && itemCount > 0) {
- List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
- " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME <= to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " AND ABS( " +
- " (QCQUISITION_TIME - TRUNC(QCQUISITION_TIME, 'HH')) * 24 * 60 " +
- " ) <= 5 " +
- " order by QCQUISITION_TIME");
- //TODO 这里需要注意泵数据的采集配置表设备号可能有多个,因此,要判断对应的standard_code做对应处理
- if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() >= 1) {
- //TODO 数据库操作
- for (Map<String,Object> mapEntity:tapWaterHistoryList) {
- Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
- recordAllEntity.put("device_code", groupedData.get(key).get(0).get("device_code"));
- recordAllEntity.put("time", LocalDateTime.parse(mapEntity.get("QCQUISITION_TIME").toString(), formatter)
- .withMinute(0).withSecond(0).format(formatter));//采集时间(小时的最后时间)
- if(item.get("standard_code").equals("active_energy")){
- recordAllEntity.put("active_energy",mapEntity.get("VAL"));
- }
- if(item.get("standard_code").equals("startup_state")){
- recordAllEntity.put("startup_state",mapEntity.get("VAL"));
- }
- if(item.get("standard_code").equals("phase_a_current")){
- if(ObjectUtils.isEmpty(mapEntity.get("VAL"))||"0".equals(mapEntity.get("VAL").toString())) {
- recordAllEntity.put("startup_state", 0);
- }else{
- recordAllEntity.put("startup_state", 1);
- }
- }
- //TODO 直接调用插入或者新增方法
- int insertCode = getWaterTapWaterApi().insertOrUpdateWaterPumpRecordAll(recordAllEntity);
- if (insertCode < 0) {
- System.out.print(String.format("Batch insertOrUpdateWaterPumpRecordAll 未成功:{%s} ",
- JSONObject.toJSON(recordAllEntity)));
- }
- }
- }
- }
- }
- }
- latch2.countDown();
- }).start();
- }catch(Exception ex){
- }
- }
- try {
- latch2.await();
- }catch(Exception ex){
- }
- latch.countDown();
- }).start();
- }catch(Exception ex){
- System.out.print(String.format("Batch" +
- " initWaterReacordAll ERROR:{%s} "
- , ex.getLocalizedMessage()));
- }
- }
- latch.await();
- System.out.println("添加小时泵数据检查机制("+startFindTime+")结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- }
- }catch(Exception ex){
- System.out.print(String.format("Batch initWaterCollecationReacordAll ERROR:{%s} "
- , ex.getLocalizedMessage()));
- }
- }
- }
|