|
@@ -0,0 +1,145 @@
|
|
|
+package io.github.pnoker.gateway.bizmgr;
|
|
|
+
|
|
|
+import io.github.pnoker.gateway.dbdao.DBMgrProxy;
|
|
|
+import io.github.pnoker.gateway.dbdao.tangyinSource.service.TangyinPressDataService;
|
|
|
+import io.github.pnoker.gateway.utils.InfulxDbUtil;
|
|
|
+import io.github.pnoker.gateway.utils.InfulxTyDbUtil;
|
|
|
+import org.influxdb.dto.QueryResult;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+import java.text.SimpleDateFormat;
|
|
|
+import java.time.Instant;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.time.ZoneOffset;
|
|
|
+import java.time.ZonedDateTime;
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @ClassName KprTangyinWaterBizFun
|
|
|
+ * @Description: TODO 汤阴二期相关业务功能类
|
|
|
+ * @Author LX
|
|
|
+ * @Date 2025/6/4
|
|
|
+ * @Version V1.0
|
|
|
+ **/
|
|
|
+public class KprTangyinWaterBizFun {
|
|
|
+ private static final Logger log = LoggerFactory.getLogger(KprTangyinWaterBizFun.class);
|
|
|
+
|
|
|
+ public static InfulxTyDbUtil infulxTyDbUtil = null;//infulx工具类对象
|
|
|
+
|
|
|
+ public static TangyinPressDataService getApi(){
|
|
|
+ return DBMgrProxy.getInstance().applyTangyinPressDataService();
|
|
|
+ }
|
|
|
+ private static long formaterUTCnano(LocalDateTime localDateTime){
|
|
|
+ // 转换为 Instant(基于 UTC 时间)
|
|
|
+ Instant instant = localDateTime.toInstant(ZoneOffset.UTC);
|
|
|
+
|
|
|
+ // 获取纳秒时间戳
|
|
|
+ long nanoTimestamp = instant.getEpochSecond() * 1_000_000_000L + instant.getNano();
|
|
|
+ return nanoTimestamp;
|
|
|
+ }
|
|
|
+ //TODO 查询一年的数据调用selectPressCollecation
|
|
|
+ public static void selectOneYear(){
|
|
|
+ log.info("启动时执行一年压力数据操作");
|
|
|
+ log.info("进行中.....");
|
|
|
+ // 当前时间
|
|
|
+ LocalDateTime now = LocalDateTime.now();
|
|
|
+ // 一年前的时间
|
|
|
+ LocalDateTime startDateTime = now.minusYears(1);
|
|
|
+
|
|
|
+ // 创建Map来存储时间范围
|
|
|
+ Map<LocalDateTime, LocalDateTime> timeRanges = new LinkedHashMap<>();
|
|
|
+
|
|
|
+ // 初始化开始时间
|
|
|
+ LocalDateTime currentStart = startDateTime; // 一年前
|
|
|
+ for (int i = 0; i < 12; i++) {
|
|
|
+ // 当前时间段的结束时间:当前月的最后一天 23:59:59
|
|
|
+ LocalDateTime currentEnd = currentStart.plusMonths(1).minusSeconds(1);
|
|
|
+
|
|
|
+ // 将开始和结束时间存储到Map中
|
|
|
+ timeRanges.put(currentStart, currentEnd);
|
|
|
+
|
|
|
+ // 将开始时间推进到下一个月
|
|
|
+ currentStart = currentStart.plusMonths(1);
|
|
|
+ }
|
|
|
+
|
|
|
+ final CountDownLatch latch = new CountDownLatch(timeRanges.keySet().size());
|
|
|
+ for (LocalDateTime startTime:timeRanges.keySet()){
|
|
|
+ new Thread(() -> {
|
|
|
+ long startLong = formaterUTCnano(startTime);
|
|
|
+ long endLong = formaterUTCnano(timeRanges.get(startTime));
|
|
|
+ String queryInfluxdbSql = "SELECT time,dev_id,press_cur FROM WaterMeter WHERE " +
|
|
|
+ " time >= "+startLong+" and time<="+endLong+" ORDER BY time desc tz('Asia/Shanghai')";
|
|
|
+ log.info(startTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"---"+
|
|
|
+ timeRanges.get(startTime).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"进行中.....");
|
|
|
+ selectPressCollecation(queryInfluxdbSql);
|
|
|
+ log.info(startTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"---"+
|
|
|
+ timeRanges.get(startTime).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))+"已完成");
|
|
|
+ latch.countDown();
|
|
|
+ }).start();
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ latch.await();
|
|
|
+ }catch(Exception ex){
|
|
|
+
|
|
|
+ }
|
|
|
+ log.info("启动时执行一年压力数据操作完成");
|
|
|
+ }
|
|
|
+
|
|
|
+ //TODO 查询压力数据并插入到pgsql数据库
|
|
|
+ public static void selectPressCollecation(String queryInfluxdbSql){
|
|
|
+ try{
|
|
|
+ if(infulxTyDbUtil!=null){
|
|
|
+ log.info("查询压力并插入到pgsql开始,SQL语句:"+queryInfluxdbSql);
|
|
|
+ QueryResult queryResult = infulxTyDbUtil.queryDatabase(queryInfluxdbSql);
|
|
|
+ List<QueryResult.Series> seriesList = queryResult.getResults().get(0).getSeries();
|
|
|
+ if (seriesList != null) {
|
|
|
+ for (QueryResult.Series series : seriesList) {
|
|
|
+ List<List<Object>> values = series.getValues();
|
|
|
+ for (List<Object> value : values) {
|
|
|
+ try {
|
|
|
+ // 解析字段
|
|
|
+ String timeString = (String) value.get(0); // 时间戳
|
|
|
+ String devId = (String) value.get(1); // 设备ID
|
|
|
+ Double pressCur = (Double) value.get(2); // 当前压力
|
|
|
+
|
|
|
+ // 解析ISO 8601格式的日期时间
|
|
|
+ ZonedDateTime zonedDateTime = ZonedDateTime.parse(timeString);
|
|
|
+
|
|
|
+ // 转换为LocalDateTime(去除时区信息)
|
|
|
+ LocalDateTime localDateTime = zonedDateTime.toLocalDateTime();
|
|
|
+
|
|
|
+ // 定义目标格式
|
|
|
+ DateTimeFormatter targetFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
+
|
|
|
+ // 格式化为目标格式字符串
|
|
|
+ String formattedDate = localDateTime.format(targetFormatter);
|
|
|
+
|
|
|
+ // 打印结果
|
|
|
+ System.out.println("Formatted Date: " + formattedDate + ", Dev ID: " + devId + ", Press Cur: " + pressCur);
|
|
|
+ //TODO 将数据插入或修改到pgsql
|
|
|
+ Map<String,Object> map = new HashMap<>();
|
|
|
+ map.put("collecation_time",formattedDate);
|
|
|
+ map.put("dev_id",devId);
|
|
|
+ map.put("press_cur",pressCur);
|
|
|
+ int resultDb = getApi().insertOrUpdateTbmHourLevel(map);
|
|
|
+ if(!(resultDb>0)){
|
|
|
+ log.error("function selectPressCollecation insertOrUpdate error:"+resultDb);
|
|
|
+ }
|
|
|
+ } catch (Exception ex) {
|
|
|
+ ex.printStackTrace();
|
|
|
+ log.error("function selectPressCollecation forEach error:"+ex.getLocalizedMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.info("function selectPressCollecation success:"+queryInfluxdbSql);
|
|
|
+ }
|
|
|
+ }catch (Exception ex){
|
|
|
+ log.error("function selectPressCollecation error:"+ex.getLocalizedMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|