Bläddra i källkod

更新最新镜像配置, 更新当阳数据采集至influxdb历史数据补全机制

1037015548@qq.com 9 månader sedan
förälder
incheckning
d086eef3b1

+ 115 - 2
dc3-gateway/src/main/java/io/github/pnoker/gateway/bizmgr/KprDangyangWaterBizFun.java

@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.TypeReference;
+import io.github.pnoker.gateway.comtool.ThreadHistoryTask;
 import io.github.pnoker.gateway.comtool.ThreadPoolTaskTool;
 import io.github.pnoker.gateway.comtool.ThreadTask;
 import io.github.pnoker.gateway.comtool.TimeTool;
@@ -117,6 +118,96 @@ public class KprDangyangWaterBizFun {
         }
     }
 
+
+    //TODO 获取一个月历史数据
+    public static void checkOneMonth(){
+        try{
+            if(!checkApplicationParam()){
+                log.error("获取配置参数错误");
+                return;
+            }
+            Map<String, String> headers = new HashMap<>();
+            headers.put("Authorization", "Bearer "+dangyangToken);
+            for (int i = 0; i <deviceType.size(); i++) {
+                List<Map<String,Object>> totalList = new ArrayList<>();//当前类型设备列表总集
+                //TODO 外层循环是要查一个类型的所有设备列表
+                JSONObject itemType = deviceType.getJSONObject(i);
+
+                Map<String,String> paramsTotal = new HashMap<>();
+                paramsTotal.put("pageNo","1");
+                paramsTotal.put("pageSize","1");
+                paramsTotal.put("deviceType",itemType.getString("key"));
+
+                JSONObject totalRes = JSONObject.parseObject(HttpUtil.sendGet(
+                        listUrl,paramsTotal,headers));
+                if(totalRes!=null&&totalRes.getInteger("code")==0
+                        &&!totalRes.getJSONObject("data")
+                        .getJSONArray("list").isEmpty()) {
+                    //TODO 优化 以分页方式查询所有,初始分页行数定为200查询速率较好
+                    int nTotals = totalRes.getJSONObject("data").getInteger("total");
+                    int pageNum = nTotals % 200 == 0 ? nTotals / 200 : (nTotals / 200) + 1;//总页数
+                    Integer limit = 200;
+                    if (pageNum <= 1) {
+                        limit = nTotals;//说明总数比第一页小
+                    }
+                    for (int k = 1; k <= pageNum; k++) {
+                        Map<String, String> paramsPage = new HashMap<>();
+                        paramsPage.put("pageNo", String.valueOf(k));
+                        paramsPage.put("pageSize", String.valueOf(limit));
+                        paramsPage.put("deviceType", itemType.getString("key"));
+                        JSONObject pageRes = JSONObject.parseObject(HttpUtil.sendGet(
+                                listUrl, paramsPage, headers));
+                        if (pageRes != null && pageRes.getInteger("code") == 0 && !pageRes.getJSONObject("data").getJSONArray("list").isEmpty()) {
+                            //TODO 将分页获取的列表数据添加至总集中
+                            totalList.addAll(JSON.parseObject(pageRes.getJSONObject("data").getJSONArray("list").toJSONString()
+                                    , new TypeReference<List<Map<String, Object>>>() {
+                                    }));
+                        } else {
+                            log.error(TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(), TimeTool.TIMESTAMP_FORMAT)
+                                    + "获取设备列表失败:" + itemType.getString("key") + ";" + (pageRes == null ? null : pageRes.toJSONString())
+                                    + ";param:" + JSON.toJSONString(paramsPage));
+                        }
+                    }
+
+                    List<Map<String, Object>> nullFrequencyList = totalList.stream()
+                            .filter(map -> !map.containsKey("collectionFrequency") || map.get("collectionFrequency") == null)
+                            .collect(Collectors.toList());
+                    // 输出结果
+                    nullFrequencyList.forEach(System.out::println);
+
+                    //TODO 获取到当前类型的设备列表总集后,查询总集的历史数据,并添加到infulxdb
+                    // 根据 采集频率 collectionFrequency (秒)字段进行分组
+                    // 要根据采集频率取建立定时任务
+                    Map<Integer, List<Map<String, Object>>> groupedByFrequency = totalList.stream()
+                            .collect(Collectors.groupingBy(map -> {
+                                Object frequency = map.get("collectionFrequency");
+                                return (frequency == null) ? 0 : (Integer) frequency;
+                            }));
+                    //TODO 创建任务池
+                    ThreadPoolTaskScheduler scheduler = ThreadPoolTaskTool.createTaskScheduler(groupedByFrequency.keySet().size(),
+                            "schedule_dangyang_history_"+itemType.getString("key"));
+
+                    for (Integer groupKey:groupedByFrequency.keySet()){
+                        String taskId = itemType.getString("key")+"_history"+"_"+groupKey;
+                        //TODO 任务时间间隔/分 = 采集频率(秒)/60
+                        Integer fc = groupKey;
+                        if(groupKey<60){
+                            fc = 60;
+                        }
+                        Integer taskTime = fc/60;
+                        ThreadHistoryTask threadHistoryTask = new ThreadHistoryTask(taskId,taskTime
+                                ,itemType.getString("key"),groupedByFrequency.get(groupKey));
+                        ThreadPoolTaskTool.scheduleCronHistroyTask(scheduler,taskId,threadHistoryTask,"0 59 9 * * ?");
+                    }
+                }
+            }
+        }catch(Exception ex){
+            ex.printStackTrace();
+            log.error(TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)
+                    +"获取历史数据异常:"+ex.getLocalizedMessage());
+        }
+    }
+
     //TODO 获取实时数据
     /**
      * ①首先查询所有的设备列表
@@ -170,18 +261,24 @@ public class KprDangyangWaterBizFun {
                     }
 
                     //TODO 放入定时线程执行设备数据备份
-                    new Timer().schedule(new TimerTask() {
+                    Timer timer = new Timer();
+                    timer.schedule(new TimerTask() {
                         @Override
                         public void run() {
                             createExcel(itemType.getString("key"), totalList);
+                            scheduleHourlyTask(timer,itemType.getString("key"), totalList);
                         }
                     }, 5000);
 
+
                     //TODO 获取到当前类型的设备列表总集后,查询总集的实时数据,并添加到infulxdb
                     // 根据 采集频率 collectionFrequency (秒)字段进行分组
                     // 要根据采集频率取建立定时任务
                     Map<Integer, List<Map<String, Object>>> groupedByFrequency = totalList.stream()
-                            .collect(Collectors.groupingBy(map -> (Integer) map.get("collectionFrequency")));
+                            .collect(Collectors.groupingBy(map -> {
+                                Object frequency = map.get("collectionFrequency");
+                                return (frequency == null) ? 0 : (Integer) frequency;
+                            }));
                     //TODO 创建任务池
                     ThreadPoolTaskScheduler scheduler = ThreadPoolTaskTool.createTaskScheduler(groupedByFrequency.keySet().size(),
                             "schedule_dangyang_"+itemType.getString("key"));
@@ -210,6 +307,22 @@ public class KprDangyangWaterBizFun {
         }
     }
 
+    //TODO 每小时更新设备列表信息excel
+    private static void scheduleHourlyTask(Timer timer,String key,List<Map<String,Object>> totalList) {
+        // 创建一个每小时执行的任务
+        TimerTask hourlyTask = new TimerTask() {
+            @Override
+            public void run() {
+                createExcel(key, totalList);
+            }
+        };
+
+        // 设置任务每小时执行一次
+        long delay = 0; // 不延迟,立即开始
+        long period = 3600000; // 每小时执行一次(3600000毫秒 = 1小时)
+        timer.scheduleAtFixedRate(hourlyTask, delay, period);
+    }
+
     public static void createExcel(String deviceType,List<Map<String,Object>> totalList){
         // 创建一个新的工作簿
         Workbook workbook = new XSSFWorkbook();

+ 10 - 1
dc3-gateway/src/main/java/io/github/pnoker/gateway/comtool/ScheduleTaskMgr.java

@@ -172,7 +172,7 @@ public class ScheduleTaskMgr {
 //    }
 
     @PostConstruct
-    public void initTest(){
+    public void initReal(){
         new Timer().schedule(new TimerTask() {
             @Override
             public void run() {
@@ -180,4 +180,13 @@ public class ScheduleTaskMgr {
             }
         },10000);
     }
+    @PostConstruct
+    public void initHistory(){
+        new Timer().schedule(new TimerTask() {
+            @Override
+            public void run() {
+                KprDangyangWaterBizFun.checkOneMonth();
+            }
+        },11000);
+    }
 }

+ 237 - 0
dc3-gateway/src/main/java/io/github/pnoker/gateway/comtool/ThreadHistoryTask.java

@@ -0,0 +1,237 @@
+package io.github.pnoker.gateway.comtool;
+
+import com.alibaba.fastjson.JSONObject;
+import io.github.pnoker.gateway.bizmgr.KprDangyangWaterBizFun;
+import io.github.pnoker.gateway.bizmgr.baseInit.KprBaseInitFun;
+import io.github.pnoker.gateway.utils.HttpUtil;
+import org.influxdb.dto.Point;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.time.*;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * @ClassName ThreadTask
+ * @Description: TODO 当阳历史数据任务线程
+ * @Author LX
+ * @Date 2024/9/3
+ * @Version V1.0
+ **/
+public class ThreadHistoryTask implements Runnable{
+    private static final Logger log = LoggerFactory.getLogger(ThreadHistoryTask.class);
+
+    private final String taskId;
+
+    private final Integer collectionFrequency;//采集频率
+
+    private String deviceType;
+
+    private List<Map<String,Object>> deviceList = new ArrayList<>();
+
+
+
+    public ThreadHistoryTask(String taskId, Integer collectionFrequency, String deviceType, List<Map<String,Object>> deviceList) {
+        this.taskId = taskId;
+        this.collectionFrequency = collectionFrequency;
+        this.deviceType = deviceType;
+        this.deviceList = deviceList;
+    }
+
+    @Override
+    public void run() {
+        try {
+            LocalDateTime nowTime = LocalDateTime.now();
+            String nowTimeStr = nowTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+            String oneMonthStr = nowTime.minusMonths(1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+            List<JSONObject> deviceRealtimeDataList = new ArrayList<>();
+            int pageSize = 1000;
+            for (Map<String, Object> map : deviceList) {
+                int pageNo = 1;
+                while (true) {
+                    JSONObject devicerealtimeDataTotal = null;
+                    try {
+                        Map<String, String> paramRealtime = new HashMap<>();
+                        paramRealtime.put("deviceType", deviceType);
+                        paramRealtime.put("deviceCode", String.valueOf(map.get("deviceCode")));
+                        paramRealtime.put("pageNo", String.valueOf(pageNo));
+                        paramRealtime.put("pageSize", String.valueOf(pageSize));
+                        paramRealtime.put("startTime", oneMonthStr);
+                        paramRealtime.put("endTime", nowTimeStr);
+                        Map<String, String> headers = new HashMap<>();
+                        headers.put("Authorization", "Bearer " + KprDangyangWaterBizFun.dangyangToken);
+                        devicerealtimeDataTotal = JSONObject.parseObject(HttpUtil.sendGet(
+                                KprDangyangWaterBizFun.hisDataListUrl, paramRealtime, headers));
+                    } catch (Exception ex) {
+                        log.error("历史任务线程" + taskId + " 调用异常:" + ex.getLocalizedMessage());
+                    }
+                    if (devicerealtimeDataTotal == null || devicerealtimeDataTotal.getInteger("code") != 0 ||
+                            devicerealtimeDataTotal.getJSONObject("data") == null
+                            || devicerealtimeDataTotal.getJSONObject("data").getJSONArray("list") == null
+                            || devicerealtimeDataTotal.getJSONObject("data").getJSONArray("list").isEmpty()) {
+                        break;
+                    }
+                    try {
+                        //TODO 查询数据不为空,插入infulxdb
+                        for (int i = 0; i <devicerealtimeDataTotal.getJSONObject("data")
+                                .getJSONArray("list").size(); i++) {
+                            JSONObject deviceRealtimeData = devicerealtimeDataTotal.getJSONObject("data")
+                                    .getJSONArray("list").getJSONObject(i);
+                            adjustTime(deviceRealtimeData, collectionFrequency);//整点rand处理
+                            deviceRealtimeDataList.add(deviceRealtimeData);
+                        }
+                    } catch (Exception ex) {
+                        log.error("历史任务线程" + taskId + " " + String.valueOf(map.get("deviceCode")) + "查询异常:" + ex.getLocalizedMessage() + ";resJson:" + devicerealtimeDataTotal.toJSONString());
+                    }
+                    pageNo++;
+                }
+            }
+            //TODO 开始插入数据库
+            //第三方对应的字段集
+            List<String> params = KprBaseInitFun.getInstance().dangyangParams.get(deviceType);
+            for(JSONObject jsonObject : deviceRealtimeDataList){
+                //deviceType为表名
+                //time为当前数据time的纳秒时间戳,已经做过整点处理了
+                Point pointNanos = createPointFromJson(deviceType,jsonObject,params);
+                KprDangyangWaterBizFun.infulxDbUtil.insert(pointNanos);
+            }
+
+        }catch(Exception ex){
+            ex.printStackTrace();
+        }
+    }
+
+    public static Point createPointFromJson(String deviceType, JSONObject jsonObject,List<String> params) {
+        // 获取时间戳
+        long nanoTimestamp = convertToNanoTimestamp(jsonObject.getString("time"));
+
+        // 创建 Point.Builder 对象
+        Point.Builder pointBuilder = Point.measurement(KprBaseInitFun.getInstance().measurementMap.get(deviceType))
+                .tag("dev_id",jsonObject.getString("deviceCode"))
+                .time(nanoTimestamp, TimeUnit.NANOSECONDS);
+
+        // 遍历 JSON 对象的键值对,并添加到 Point.Builder 中
+        for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
+            String key = entry.getKey();
+            if (!key.equals("time")&&!key.equals("deviceCode")) {
+                processField(pointBuilder,key, entry.getValue(),params);
+            }
+        }
+
+        // 构建 Point 对象
+        try {
+            return pointBuilder.build();
+        }catch(Exception ex){
+
+        }
+        return null;
+    }
+
+    //指定列处理
+    private static void processField(Point.Builder pointBuilder, String key, Object value,List<String> params) {
+        if (value == null) {
+            return; // 跳过空值
+        }
+        //String[]长度如果为0,则说明本系统没有对应字段,则用它本身,反之用第二个元素作为字段名
+        List<String[]> newParams = params.stream()
+                .map(s -> s.split("___"))
+                .collect(Collectors.toList());
+
+        for (String[] clies:newParams){
+            boolean exists = Arrays.asList(clies).contains(key);
+            if(exists){
+                String newKey = clies.length>1?clies[1]:clies[0];
+                if (value instanceof Integer) {
+                    pointBuilder.addField(newKey, (Integer) value);
+                } else if (value instanceof Long) {
+                    pointBuilder.addField(newKey, (Long) value);
+                } else if (value instanceof Boolean) {
+                    pointBuilder.addField(newKey, (Boolean) value);
+                } else if (value instanceof String) {
+                    pointBuilder.addField(newKey, (String) value);
+                }else if (value instanceof BigDecimal) {
+                    pointBuilder.addField(newKey, ((BigDecimal) value).doubleValue());
+                }else{
+                    pointBuilder.addField(newKey, (String.valueOf(value)) );
+                }
+                break;
+            }
+        }
+
+//        switch (key) {
+//            case "flow":
+//                    pointBuilder.addField("flow_cur", ((BigDecimal) value).doubleValue());
+//                break;
+//            default:
+//                if (value instanceof Integer) {
+//                    pointBuilder.addField(key, (Integer) value);
+//                } else if (value instanceof Long) {
+//                    pointBuilder.addField(key, (Long) value);
+//                } else if (value instanceof Boolean) {
+//                    pointBuilder.addField(key, (Boolean) value);
+//                } else if (value instanceof String) {
+//                    pointBuilder.addField(key, (String) value);
+//                }else if (value instanceof BigDecimal) {
+//                    pointBuilder.addField(key, ((BigDecimal) value).doubleValue());
+//                }
+//                else {
+////                    System.err.println("Unsupported data type for key " + key + ": " + value.getClass().getName());
+//                }
+//                break;
+//        }
+    }
+
+
+    //获取CST 纳秒时间戳
+    private static long convertToNanoTimestamp(String dateTimeString) {
+        // 定义日期时间格式
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+        // 解析字符串为 LocalDateTime 对象
+        LocalDateTime dateTime = LocalDateTime.parse(dateTimeString, formatter);
+        // 指定时区为CST(Asia/Shanghai)
+        ZoneId shanghaiZoneId = ZoneId.of("Asia/Shanghai");
+        // 将LocalDateTime对象转换为ZonedDateTime对象
+        ZonedDateTime zonedDateTime = dateTime.atZone(shanghaiZoneId);
+        // 转换为秒级别的时间戳
+        long epochSecond = dateTime.toEpochSecond(ZoneOffset.UTC);
+        // 将ZonedDateTime对象转换为Instant对象(UTC时间点)
+        Instant instant = zonedDateTime.toInstant();
+        // 计算从1970年1月1日00:00:00 UTC以来的纳秒数
+        long nanosecondsSinceEpoch = ChronoUnit.NANOS.between(Instant.EPOCH, instant);
+        return nanosecondsSinceEpoch;
+    }
+
+    private static final SimpleDateFormat outputDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    public static void adjustTime(JSONObject jsonObject, int collectionFrequency) throws ParseException {
+        String timeStr = jsonObject.getString("time");
+        Long oriLong =  TimeTool.convertDateStr2UTC(timeStr);
+        Date originalDate = new Date(oriLong);
+
+        // 计算最近的整分钟时间
+        Date roundedDate = getNearestTime(originalDate, collectionFrequency);
+
+        // 格式化回字符串
+        String adjustedTimeStr = outputDateFormat.format(roundedDate);
+
+        // 更新 JSON 对象中的时间
+        jsonObject.put("time", adjustedTimeStr);
+    }
+
+    private static Date getNearestTime(Date date, int collectionFrequency) {
+        long milliseconds = date.getTime();
+        long minutes = milliseconds / (60 * 1000);
+        long remainder = minutes % collectionFrequency;
+        long adjustment = remainder < collectionFrequency / 2 ? -remainder : collectionFrequency - remainder;
+
+        // 调整到最近的整分钟时间
+        long adjustedMinutes = minutes + adjustment;
+        return new Date(adjustedMinutes * 60 * 1000);
+    }
+}

+ 5 - 0
dc3-gateway/src/main/java/io/github/pnoker/gateway/comtool/ThreadPoolTaskTool.java

@@ -39,4 +39,9 @@ public class ThreadPoolTaskTool {
         ScheduledFuture<?> future = taskScheduler.schedule(threadTask, cronTrigger);
         scheduledTasks.put(taskId, future);
     }
+    public static void scheduleCronHistroyTask(ThreadPoolTaskScheduler taskScheduler,String taskId,ThreadHistoryTask threadTask,String cronExpression) {
+        CronTrigger cronTrigger = new CronTrigger(cronExpression);
+        ScheduledFuture<?> future = taskScheduler.schedule(threadTask, cronTrigger);
+        scheduledTasks.put(taskId, future);
+    }
 }

+ 1 - 1
dc3-gateway/src/main/java/io/github/pnoker/gateway/comtool/ThreadTask.java

@@ -24,7 +24,7 @@ import java.util.stream.Collectors;
 
 /**
  * @ClassName ThreadTask
- * @Description: TODO 当阳任务线程
+ * @Description: TODO 当阳实时数据任务线程
  * @Author LX
  * @Date 2024/9/3
  * @Version V1.0