123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- package io.github.pnoker.gateway.bizmgr;
- import com.alibaba.fastjson.JSONObject;
- import io.github.pnoker.gateway.comtool.TimeTool;
- import io.github.pnoker.gateway.dbdao.DBMgrProxy;
- import io.github.pnoker.gateway.dbdao.zilaishuiSource.service.ZilaishuiRealListService;
- import org.influxdb.dto.QueryResult;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.util.CollectionUtils;
- import org.springframework.util.ObjectUtils;
- import java.time.Duration;
- import java.time.LocalDateTime;
- import java.time.format.DateTimeFormatter;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.CountDownLatch;
- import java.util.stream.Collectors;
- /**
- * @ClassName KprZilaishuiLevelBizFun
- * @Description: TODO 市自来水水位预测
- * @Author LX
- * @Date 2024/12/11
- * @Version V1.0
- **/
- public class KprZilaishuiLevelBizFun {
- private static final Logger log = LoggerFactory.getLogger(KprZilaishuiLevelBizFun.class);
- private final static String mStrClassName = "KprZilaishuiLevelBizFun";
- private final static String EMPTY_NULL = "NULL";
- public static ZilaishuiRealListService getZilaishuiApi(){
- return DBMgrProxy.getInstance().applyZilaishuiDbApi();
- }
- static DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- //TODO 初始化添加计算每个液位计站点每日的每小时液位量计算
- public static void initWaterLevelReacordAll(String startFindTime){
- //TODO ① 首先查询水厂设备配置信息
- try {
- System.out.println("计算小时液位开始时间:"+ TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- System.out.println("计算小时液位起始时间:("+startFindTime+")进行中:......");
- List<Map<String, Object>> configList = getZilaishuiApi().getWaterLevelCollectionConfigList(null);
- if (!CollectionUtils.isEmpty(configList)) {
- //TODO 按照设备分组
- Map<Object, List<Map<String, Object>>> groupedData =
- configList.stream().collect(Collectors.groupingBy(item -> item.get("device_code")));
- final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
- //TODO 外层循环组织机构
- for (Object key:groupedData.keySet()){
- try{
- new Thread(() -> {
- //TODO 根据当前配置信息item 查询远通数据中的历史数据
- //TODO 首先查询当前水厂设备的从startFindTime之后到得到数据
- LocalDateTime startDateTime = LocalDateTime.parse(startFindTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
- //TODO 需计算的循环天数
- Long days = 0L;
- days = Duration.between(startDateTime, LocalDateTime.now()).toDays();
- days+=1;
- //TODO 此循环天数每一天所查的是所有设备每小时数据合
- final CountDownLatch latch2 = new CountDownLatch(days.intValue());
- for(Long k = 0L;k<days;k++) {
- LocalDateTime newStartDateTime = startDateTime.minusDays(-k.intValue());
- String startDate = newStartDateTime.format(formater);
- String endDate = newStartDateTime.minusDays(-1).format(formater);
- try {
- new Thread(() -> {
- List<Map<String, Object>> deviceList = groupedData.get(key);
- //TODO 循环获取该天该水厂每个设备数据
- //TODO 查询当前天日期内每小时的设备数据
- for (int i = 0; i < 24; i++) {
- String startTime = newStartDateTime.withHour(i).format(formater);
- //TODO 需要加个05分把endTime的整点数据查出来
- String endTime = newStartDateTime.minusHours(-(i + 1)).withMinute(5).withSecond(0).format(formater);
- //TODO 此循环计算该小时所有设备的用水量
- for (Map<String, Object> item : deviceList) {
- // 定义字符串日期时间的格式
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- // 解析字符串以创建 LocalDateTime 实例
- LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
- Integer itemCount = getZilaishuiApi().getTabWaterHistoryCount(
- " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME <= to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " AND ABS(" +
- " (QCQUISITION_TIME - TRUNC(QCQUISITION_TIME, 'HH')) * 24 * 60 " +
- " ) <= 5");//查询 -- 采集时间接近整点,误差在5分钟内
- if (itemCount != null && itemCount > 0) {
- List<Map<String, Object>> tapWaterHistoryList = getZilaishuiApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
- " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
- " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " and QCQUISITION_TIME <= to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
- " AND ABS( " +
- " (QCQUISITION_TIME - TRUNC(QCQUISITION_TIME, 'HH')) * 24 * 60 " +
- " ) <= 5 " +
- " order by QCQUISITION_TIME");
- //TODO 直接插入即可 查询已经筛选出两条了
- if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
- //TODO 数据库操作
- for (Map<String,Object> mapEntity:tapWaterHistoryList) {
- Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
- recordAllEntity.put("device_code", groupedData.get(key).get(0).get("device_code"));
- recordAllEntity.put("value", mapEntity.get("VAL"));
- recordAllEntity.put("time", LocalDateTime.parse(mapEntity.get("QCQUISITION_TIME").toString(), formatter)
- .withMinute(0).withSecond(0).format(formatter));//采集时间(小时的最后时间)
- recordAllEntity.put("value_tag", "level");
- List<Map<String, Object>> queryLevelRecord = getZilaishuiApi().getWaterLevelRecordAllList(1, 0,
- " WHERE device_code = '" + recordAllEntity.get("device_code")
- + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
- if (CollectionUtils.isEmpty(queryLevelRecord)) {
- //TODO 说明不存在,进行插入
- if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
- int insertCode = getZilaishuiApi().insertWaterLevelRecordAll(" (" +
- "'" + recordAllEntity.get("device_code") + "'," +
- "'" + recordAllEntity.get("value") + "'," +
- "'" + recordAllEntity.get("value_tag") + "'," +
- "'" + recordAllEntity.get("time") + "'" +
- ") ");
- if (insertCode < 0) {
- log.error(String.format("Batch insertWaterLevelRecordAll 未成功:{%s} ",
- JSONObject.toJSON(recordAllEntity)));
- }
- }
- }else{
- //TODO 说明存在,进行修改
- if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
- int updateCode = getZilaishuiApi().updateWaterLevelRecordAll(String.valueOf(recordAllEntity.get("value"))," WHERE " +
- "(" +
- " device_code = '" + recordAllEntity.get("device_code") + "' and" +
- " \"time\" = '" + recordAllEntity.get("time") + "' and" +
- " value_tag = '" + recordAllEntity.get("value_tag") + "'" +
- ") ");
- if (updateCode < 0) {
- log.error(String.format("Batch updateWaterLevelRecordAll 未成功:{%s} ",
- JSONObject.toJSON(recordAllEntity)));
- }
- }
- }
- }
- }
- }
- }
- }
- latch2.countDown();
- }).start();
- }catch(Exception ex){
- }
- }
- try {
- latch2.await();
- }catch(Exception ex){
- }
- latch.countDown();
- }).start();
- }catch(Exception ex){
- log.error(String.format("Batch" +
- " initWaterReacordAll ERROR:{%s} "
- , ex.getLocalizedMessage()));
- }
- }
- latch.await();
- System.out.println("计算小时液位检查机制("+startFindTime+")结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
- }
- }catch(Exception ex){
- log.error(String.format("Batch initWaterCollecationReacordAll ERROR:{%s} "
- , ex.getLocalizedMessage()));
- }
- }
- public static void insertDailyData(){
- KprZilaishuiLevelBizFun.getZilaishuiApi().insertDailyData();
- }
- public static void insertForecastData(){
- KprZilaishuiLevelBizFun.getZilaishuiApi().insertForecastData();
- }
- }
|