Procházet zdrojové kódy

当阳数据实时数据与历史数据逻辑变更 增加定制同步机制

1037015548@qq.com před 9 měsíci
rodič
revize
86a68ac6bc

+ 98 - 10
dc3-gateway/src/main/java/io/github/pnoker/gateway/bizmgr/KprDangyangWaterBizFun.java

@@ -4,10 +4,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.TypeReference;
-import io.github.pnoker.gateway.comtool.ThreadHistoryTask;
-import io.github.pnoker.gateway.comtool.ThreadPoolTaskTool;
-import io.github.pnoker.gateway.comtool.ThreadTask;
-import io.github.pnoker.gateway.comtool.TimeTool;
+import io.github.pnoker.gateway.comtool.*;
 import io.github.pnoker.gateway.dbdao.DBMgrProxy;
 import io.github.pnoker.gateway.dbdao.services.intef.DeviceKindService;
 import io.github.pnoker.gateway.dbdao.services.intef.TypeDefineService;
@@ -120,7 +117,7 @@ public class KprDangyangWaterBizFun {
     }
 
 
-    //TODO 获取一个月历史数据
+    //TODO 同步历史数据
     public static void checkOneMonth(){
         try{
             if(!checkApplicationParam()){
@@ -193,10 +190,101 @@ public class KprDangyangWaterBizFun {
                         if (groupKey==0||groupKey<0){
                             continue;
                         }
-                        ThreadHistoryTask threadHistoryTask = new ThreadHistoryTask(taskId,groupKey
-                                ,itemType.getString("key"),groupedByFrequency.get(groupKey));
-                        ThreadPoolTaskTool.scheduleHistroyTask(scheduler,taskId,threadHistoryTask,getInitialDelay()
-                                , 24 * 60 * 60);
+//                        if(itemType.getString("key").equals("FLOW_METER")) {//测试用
+                            ThreadHistoryTask threadHistoryTask = new ThreadHistoryTask(taskId, groupKey
+                                    , itemType.getString("key"), groupedByFrequency.get(groupKey));
+                            ThreadPoolTaskTool.scheduleHistroyTask(scheduler, taskId, threadHistoryTask, getInitialDelay()
+                                    , 24 * 60 * 60);
+//                        }
+                    }
+                }
+            }
+        }catch(Exception ex){
+            ex.printStackTrace();
+            log.error(TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)
+                    +"获取历史数据异常:"+ex.getLocalizedMessage());
+        }
+    }
+
+    //TODO 同步定制历史数据 暂定每小时去取一天的数据
+    public static void checkDingzhiHistory(){
+        try{
+            if(!checkApplicationParam()){
+                log.error("获取配置参数错误");
+                return;
+            }
+            Map<String, String> headers = new HashMap<>();
+            headers.put("Authorization", "Bearer "+dangyangToken);
+            for (int i = 0; i <deviceType.size(); i++) {
+                List<Map<String,Object>> totalList = new ArrayList<>();//当前类型设备列表总集
+                //TODO 外层循环是要查一个类型的所有设备列表
+                JSONObject itemType = deviceType.getJSONObject(i);
+
+                Map<String,String> paramsTotal = new HashMap<>();
+                paramsTotal.put("pageNo","1");
+                paramsTotal.put("pageSize","1");
+                paramsTotal.put("deviceType",itemType.getString("key"));
+
+                JSONObject totalRes = JSONObject.parseObject(HttpUtil.sendGet(
+                        listUrl,paramsTotal,headers));
+                if(totalRes!=null&&totalRes.getInteger("code")==0
+                        &&!totalRes.getJSONObject("data")
+                        .getJSONArray("list").isEmpty()) {
+                    //TODO 优化 以分页方式查询所有,初始分页行数定为200查询速率较好
+                    int nTotals = totalRes.getJSONObject("data").getInteger("total");
+                    int pageNum = nTotals % 200 == 0 ? nTotals / 200 : (nTotals / 200) + 1;//总页数
+                    Integer limit = 200;
+                    if (pageNum <= 1) {
+                        limit = nTotals;//说明总数比第一页小
+                    }
+                    for (int k = 1; k <= pageNum; k++) {
+                        Map<String, String> paramsPage = new HashMap<>();
+                        paramsPage.put("pageNo", String.valueOf(k));
+                        paramsPage.put("pageSize", String.valueOf(limit));
+                        paramsPage.put("deviceType", itemType.getString("key"));
+                        JSONObject pageRes = JSONObject.parseObject(HttpUtil.sendGet(
+                                listUrl, paramsPage, headers));
+                        if (pageRes != null && pageRes.getInteger("code") == 0 && !pageRes.getJSONObject("data").getJSONArray("list").isEmpty()) {
+                            //TODO 将分页获取的列表数据添加至总集中
+                            totalList.addAll(JSON.parseObject(pageRes.getJSONObject("data").getJSONArray("list").toJSONString()
+                                    , new TypeReference<List<Map<String, Object>>>() {
+                                    }));
+                        } else {
+                            log.error(TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(), TimeTool.TIMESTAMP_FORMAT)
+                                    + "获取设备列表失败:" + itemType.getString("key") + ";" + (pageRes == null ? null : pageRes.toJSONString())
+                                    + ";param:" + JSON.toJSONString(paramsPage));
+                        }
+                    }
+
+                    List<Map<String, Object>> nullFrequencyList = totalList.stream()
+                            .filter(map -> !map.containsKey("collectionFrequency") || map.get("collectionFrequency") == null)
+                            .collect(Collectors.toList());
+                    // 输出结果
+                    nullFrequencyList.forEach(System.out::println);
+
+                    //TODO 获取到当前类型的设备列表总集后,查询总集的历史数据,并添加到infulxdb
+                    // 根据 采集频率 collectionFrequency (秒)字段进行分组
+                    // 要根据采集频率取建立定时任务
+                    Map<Integer, List<Map<String, Object>>> groupedByFrequency = totalList.stream()
+                            .collect(Collectors.groupingBy(map -> {
+                                Object frequency = map.get("collectionFrequency");
+                                return (frequency == null) ? 0 : (Integer) frequency;
+                            }));
+                    //TODO 创建任务池
+                    ScheduledExecutorService scheduler = ThreadPoolTaskTool.createTaskScheduler(groupedByFrequency.keySet().size());
+
+                    for (Integer groupKey:groupedByFrequency.keySet()){
+                        String taskId = itemType.getString("key")+"_history_dingzhi"+"_"+groupKey;
+                        //TODO 任务时间间隔
+                        if (groupKey==0||groupKey<0){
+                            continue;
+                        }
+//                        if(itemType.getString("key").equals("FLOW_METER")) {//测试用
+                        ThreadDinzhiHistoryTask dinzhiHistoryTask= new ThreadDinzhiHistoryTask(taskId, groupKey
+                                , itemType.getString("key"), groupedByFrequency.get(groupKey));
+                        ThreadPoolTaskTool.scheduleDingzhiHistroyTask(scheduler, taskId, dinzhiHistoryTask, 0L
+                                , 30 * 60);//每半小时执行
+//                        }
                     }
                 }
             }
@@ -212,7 +300,7 @@ public class KprDangyangWaterBizFun {
         Calendar nextRun = (Calendar) now.clone();
 
         // 设置为明天的0点
-        nextRun.set(Calendar.HOUR_OF_DAY, 1);
+        nextRun.set(Calendar.HOUR_OF_DAY, 0);
         nextRun.set(Calendar.MINUTE, 0);
         nextRun.set(Calendar.SECOND, 0);
         nextRun.set(Calendar.MILLISECOND, 0);

+ 9 - 0
dc3-gateway/src/main/java/io/github/pnoker/gateway/comtool/ScheduleTaskMgr.java

@@ -189,4 +189,13 @@ public class ScheduleTaskMgr {
             }
         },11000);
     }
+    @PostConstruct
+    public void initDingzhiHistory(){
+        new Timer().schedule(new TimerTask() {
+            @Override
+            public void run() {
+                KprDangyangWaterBizFun.checkDingzhiHistory();
+            }
+        },12000);
+    }
 }

+ 259 - 0
dc3-gateway/src/main/java/io/github/pnoker/gateway/comtool/ThreadDinzhiHistoryTask.java

@@ -0,0 +1,259 @@
+package io.github.pnoker.gateway.comtool;
+
+import com.alibaba.fastjson.JSONObject;
+import io.github.pnoker.gateway.bizmgr.KprDangyangWaterBizFun;
+import io.github.pnoker.gateway.bizmgr.baseInit.KprBaseInitFun;
+import io.github.pnoker.gateway.utils.HttpUtil;
+import org.influxdb.dto.Point;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.text.SimpleDateFormat;
+import java.time.*;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * @ClassName ThreadTask
+ * @Description: TODO 当阳历史数据定制时段的任务线程
+ * @Author LX
+ * @Date 2024/9/3
+ * @Version V1.0
+ **/
+public class ThreadDinzhiHistoryTask implements Runnable{
+    private static final Logger log = LoggerFactory.getLogger(ThreadDinzhiHistoryTask.class);
+
+    private final String taskId;
+
+    private final Integer collectionFrequency;//采集频率
+
+    private String deviceType;
+
+    private List<Map<String,Object>> deviceList = new ArrayList<>();
+
+
+
+    public ThreadDinzhiHistoryTask(String taskId, Integer collectionFrequency, String deviceType, List<Map<String,Object>> deviceList) {
+        this.taskId = taskId;
+        this.collectionFrequency = collectionFrequency;
+        this.deviceType = deviceType;
+        this.deviceList = deviceList;
+    }
+
+    @Override
+    public void run() {
+        try {
+            // 获取当前时间的整时0分0秒,这里的时主要还是定时器的执行任务时间去控制
+            LocalDateTime now = LocalDateTime.now().withSecond(0);
+
+            // 使用提取的值构造新的LocalDateTime对象
+            LocalDateTime firstDayOfMonthWithCurrentTime = now.minusDays(2);
+
+//            days.add(now);//测试用
+
+            List<JSONObject> deviceRealtimeDataList = new ArrayList<>();
+            for (Map<String, Object> map : deviceList) {
+//                if(String.valueOf(map.get("deviceCode")).equals("106")) {//测试用
+                        JSONObject devicerealtimeDataTotal = null;
+                        try {
+                            Map<String, String> paramRealtime = new HashMap<>();
+                            paramRealtime.put("deviceType", deviceType);
+                            paramRealtime.put("deviceCode", String.valueOf(map.get("deviceCode")));
+//                        paramRealtime.put("pageNo", String.valueOf(pageNo));
+//                        paramRealtime.put("pageSize", String.valueOf(pageSize));
+                            paramRealtime.put("startTime", now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
+                            paramRealtime.put("endTime", firstDayOfMonthWithCurrentTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
+                            paramRealtime.put("interval", String.valueOf((collectionFrequency < 60 ? 60 : collectionFrequency)));
+                            Map<String, String> headers = new HashMap<>();
+                            headers.put("Authorization", "Bearer " + KprDangyangWaterBizFun.dangyangToken);
+                            devicerealtimeDataTotal = JSONObject.parseObject(HttpUtil.sendGet(
+                                    KprDangyangWaterBizFun.hisDataListUrl, paramRealtime, headers));
+//                            System.out.println(taskId + ",日期:" + paramRealtime.get("startTime") +
+//                                    "----" + paramRealtime.get("endTime") + ";" + paramRealtime.get("deviceCode") + "数据:" + devicerealtimeDataTotal.toJSONString());
+                        } catch (Exception ex) {
+                            log.error("定制时间段历史任务线程" + taskId + " 调用异常:" + ex.getLocalizedMessage());
+                        }
+                        if (devicerealtimeDataTotal == null || devicerealtimeDataTotal.getInteger("code") != 0 ||
+                                devicerealtimeDataTotal.getJSONArray("data") == null
+                                || devicerealtimeDataTotal.getJSONArray("data").isEmpty()) {
+                            continue;
+                        }
+                        try {
+                            //TODO 查询数据不为空,插入infulxdb
+                            for (int i = 0; i < devicerealtimeDataTotal.getJSONArray("data").size(); i++) {
+                                JSONObject deviceRealtimeData = devicerealtimeDataTotal.getJSONArray("data").getJSONObject(i);
+//                            adjustTime(deviceRealtimeData, collectionFrequency);//整点rand处理
+                                deviceRealtimeDataList.add(deviceRealtimeData);
+                            }
+                        } catch (Exception ex) {
+                            log.error("定制时间段历史任务线程" + taskId + " " + String.valueOf(map.get("deviceCode")) + "查询异常:" + ex.getLocalizedMessage() + ";resJson:" + devicerealtimeDataTotal.toJSONString());
+                        }
+//                    }
+//                    break;
+            }
+            //TODO 开始插入数据库
+            //第三方对应的字段集
+            List<String> params = KprBaseInitFun.getInstance().dangyangParams.get(deviceType);
+            for(JSONObject jsonObject : deviceRealtimeDataList){
+                //deviceType为表名
+                //time为当前数据time的纳秒时间戳,已经做过整点处理了
+                Point pointNanos = createPointFromJson(deviceType,jsonObject,params);
+                if(pointNanos!=null) {
+                    KprDangyangWaterBizFun.infulxDbUtil.insert(pointNanos);
+                }
+            }
+            System.out.println(taskId+"定时历史数据同步完毕,数据长度:"+deviceRealtimeDataList.size());
+        }catch(Exception ex){
+            ex.printStackTrace();
+        }
+    }
+
+
+    public static LocalDateTime parseStringToDateTime(String time, String format) {
+        DateTimeFormatter df = DateTimeFormatter.ofPattern(format);
+        return LocalDateTime.parse(time, df);
+    }
+
+    public static String getDateTimeToString(LocalDateTime localDateTime, String format) {
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format);
+        return localDateTime.format(formatter);
+    }
+
+
+
+    public static Point createPointFromJson(String deviceType, JSONObject jsonObject,List<String> params) {
+        // 获取时间戳
+        long nanoTimestamp = convertToNanoTimestamp(jsonObject.getString("time"));
+        if(nanoTimestamp==0L){
+            return null;
+        }
+        // 创建 Point.Builder 对象
+        Point.Builder pointBuilder = Point.measurement(KprBaseInitFun.getInstance().measurementMap.get(deviceType))
+                .tag("dev_id",jsonObject.getString("deviceCode"))
+                .time(nanoTimestamp, TimeUnit.NANOSECONDS);
+
+        // 遍历 JSON 对象的键值对,并添加到 Point.Builder 中
+        for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
+            String key = entry.getKey();
+            if (!key.equals("time")&&!key.equals("deviceCode")) {
+                processField(pointBuilder,key, entry.getValue(),params);
+            }
+        }
+        if(!pointBuilder.hasFields()){
+            return null;
+        }
+
+        // 构建 Point 对象
+        try {
+            return pointBuilder.build();
+        }catch(Exception ex){
+
+        }
+        return null;
+    }
+
+    //指定列处理
+    private static void processField(Point.Builder pointBuilder, String key, Object value,List<String> params) {
+        if (value == null) {
+            return; // 跳过空值
+        }
+        //String[]长度如果为0,则说明本系统没有对应字段,则用它本身,反之用第二个元素作为字段名
+        List<String[]> newParams = params.stream()
+                .map(s -> s.split("___"))
+                .collect(Collectors.toList());
+
+        for (String[] clies:newParams){
+            boolean exists = Arrays.asList(clies).contains(key);
+            if(exists){
+                String newKey = clies.length>1?clies[1]:clies[0];
+                if (value instanceof Integer) {
+                    pointBuilder.addField(newKey, (Integer) value);
+                } else if (value instanceof Long) {
+                    pointBuilder.addField(newKey, (Long) value);
+                } else if (value instanceof Boolean) {
+                    pointBuilder.addField(newKey, (Boolean) value);
+                } else if (value instanceof String) {
+                    pointBuilder.addField(newKey, (String) value);
+                }else if (value instanceof BigDecimal) {
+                    pointBuilder.addField(newKey, ((BigDecimal) value).doubleValue());
+                }else{
+                    pointBuilder.addField(newKey, (String.valueOf(value)) );
+                }
+                break;
+            }
+        }
+
+//        switch (key) {
+//            case "flow":
+//                    pointBuilder.addField("flow_cur", ((BigDecimal) value).doubleValue());
+//                break;
+//            default:
+//                if (value instanceof Integer) {
+//                    pointBuilder.addField(key, (Integer) value);
+//                } else if (value instanceof Long) {
+//                    pointBuilder.addField(key, (Long) value);
+//                } else if (value instanceof Boolean) {
+//                    pointBuilder.addField(key, (Boolean) value);
+//                } else if (value instanceof String) {
+//                    pointBuilder.addField(key, (String) value);
+//                }else if (value instanceof BigDecimal) {
+//                    pointBuilder.addField(key, ((BigDecimal) value).doubleValue());
+//                }
+//                else {
+////                    System.err.println("Unsupported data type for key " + key + ": " + value.getClass().getName());
+//                }
+//                break;
+//        }
+    }
+
+
+    //获取CST 纳秒时间戳
+    private static long convertToNanoTimestamp(String dateTimeString) {
+        // 定义日期时间格式
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+        // 解析字符串为 LocalDateTime 对象
+        LocalDateTime dateTime = LocalDateTime.parse(dateTimeString, formatter);
+        // 指定时区为CST(Asia/Shanghai)
+        ZoneId shanghaiZoneId = ZoneId.of("Asia/Shanghai");
+        // 将LocalDateTime对象转换为ZonedDateTime对象
+        ZonedDateTime zonedDateTime = dateTime.atZone(shanghaiZoneId);
+        // 转换为秒级别的时间戳
+        long epochSecond = dateTime.toEpochSecond(ZoneOffset.UTC);
+        // 将ZonedDateTime对象转换为Instant对象(UTC时间点)
+        Instant instant = zonedDateTime.toInstant();
+        // 计算从1970年1月1日00:00:00 UTC以来的纳秒数
+        long nanosecondsSinceEpoch = ChronoUnit.NANOS.between(Instant.EPOCH, instant);
+        long currentTimeNanos = System.currentTimeMillis() * 1_000_000L;
+        if(nanosecondsSinceEpoch>currentTimeNanos){
+            return 0L;
+        }
+        return nanosecondsSinceEpoch;
+    }
+
+    private static final SimpleDateFormat outputDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    public static void adjustTime(JSONObject deviceRealtimeData, int collectionFrequency) {
+        // 获取原始时间字符串
+        String timeString = deviceRealtimeData.getString("time");
+
+        // 定义日期时间格式
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+        // 解析为 LocalDateTime 对象
+        LocalDateTime originalTime = LocalDateTime.parse(timeString, formatter);
+
+        // 计算最接近的时间间隔
+        long seconds = originalTime.getSecond();
+        long adjustedSeconds = Math.round((double) seconds / collectionFrequency) * collectionFrequency;
+
+        // 创建新的时间
+        LocalDateTime adjustedTime = originalTime.truncatedTo(ChronoUnit.MINUTES).plusSeconds(adjustedSeconds);
+
+        // 更新 JSON 对象中的时间
+        deviceRealtimeData.put("time", adjustedTime.format(formatter));
+    }
+
+}

+ 38 - 33
dc3-gateway/src/main/java/io/github/pnoker/gateway/comtool/ThreadHistoryTask.java

@@ -49,7 +49,8 @@ public class ThreadHistoryTask implements Runnable{
     public void run() {
         try {
             // 获取当前时间的0点
-            LocalDateTime now = LocalDateTime.now().minusHours(0).minusMinutes(0).minusSeconds(0);
+            LocalDateTime now = LocalDateTime.now().withHour(0).withMinute(0).withSecond(0);
+//            LocalDateTime now = LocalDateTime.now();//测试用
 
             // 提取当前时间的年、月、时、分、秒
             int year = now.getYear();
@@ -69,45 +70,49 @@ public class ThreadHistoryTask implements Runnable{
                 // 将日期与午夜时间结合,并添加到列表中
                 days.add(date);
             }
+//            days.add(now);//测试用
 
             List<JSONObject> deviceRealtimeDataList = new ArrayList<>();
             for (Map<String, Object> map : deviceList) {
-                for (LocalDateTime day:days){
-                    JSONObject devicerealtimeDataTotal = null;
-                    try {
-                        Map<String, String> paramRealtime = new HashMap<>();
-                        paramRealtime.put("deviceType", deviceType);
-                        paramRealtime.put("deviceCode", String.valueOf(map.get("deviceCode")));
+//                if(String.valueOf(map.get("deviceCode")).equals("106")) {//测试用
+                    for (LocalDateTime day : days) {
+                        JSONObject devicerealtimeDataTotal = null;
+                        try {
+                            Map<String, String> paramRealtime = new HashMap<>();
+                            paramRealtime.put("deviceType", deviceType);
+                            paramRealtime.put("deviceCode", String.valueOf(map.get("deviceCode")));
 //                        paramRealtime.put("pageNo", String.valueOf(pageNo));
 //                        paramRealtime.put("pageSize", String.valueOf(pageSize));
-                        paramRealtime.put("startTime", day.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
-                        paramRealtime.put("endTime", day.minusDays(-2)
-                                .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));//由于长通接口问题,暂时先每次查两天范围的数据
-                        paramRealtime.put("interval", String.valueOf((collectionFrequency<60?60:collectionFrequency)));
-                        Map<String, String> headers = new HashMap<>();
-                        headers.put("Authorization", "Bearer " + KprDangyangWaterBizFun.dangyangToken);
-                        devicerealtimeDataTotal = JSONObject.parseObject(HttpUtil.sendGet(
-                                KprDangyangWaterBizFun.hisDataListUrl, paramRealtime, headers));
-                        System.out.println(taskId+",日期:"+paramRealtime.get("startTime")+
-                                "----"+paramRealtime.get("endTime")+";"+paramRealtime.get("deviceCode")+"数据:"+devicerealtimeDataTotal.toJSONString());
-                    } catch (Exception ex) {
-                        log.error("历史任务线程" + taskId + " 调用异常:" + ex.getLocalizedMessage());
-                    }
-                    if (devicerealtimeDataTotal == null || devicerealtimeDataTotal.getInteger("code") != 0 ||
-                            devicerealtimeDataTotal.getJSONArray("data") == null
-                            || devicerealtimeDataTotal.getJSONArray("data").isEmpty()) {
-                        continue;
-                    }
-                    try {
-                        //TODO 查询数据不为空,插入infulxdb
-                        for (int i = 0; i <devicerealtimeDataTotal.getJSONArray("data").size(); i++) {
-                            JSONObject deviceRealtimeData = devicerealtimeDataTotal.getJSONArray("data").getJSONObject(i);
+                            paramRealtime.put("startTime", day.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
+                            paramRealtime.put("endTime", day.minusDays(-2)
+                                    .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));//由于长通接口问题,暂时先每次查两天范围的数据
+                            paramRealtime.put("interval", String.valueOf((collectionFrequency < 60 ? 60 : collectionFrequency)));
+                            Map<String, String> headers = new HashMap<>();
+                            headers.put("Authorization", "Bearer " + KprDangyangWaterBizFun.dangyangToken);
+                            devicerealtimeDataTotal = JSONObject.parseObject(HttpUtil.sendGet(
+                                    KprDangyangWaterBizFun.hisDataListUrl, paramRealtime, headers));
+//                            System.out.println(taskId + ",日期:" + paramRealtime.get("startTime") +
+//                                    "----" + paramRealtime.get("endTime") + ";" + paramRealtime.get("deviceCode") + "数据:" + devicerealtimeDataTotal.toJSONString());
+                        } catch (Exception ex) {
+                            log.error("历史任务线程" + taskId + " 调用异常:" + ex.getLocalizedMessage());
+                        }
+                        if (devicerealtimeDataTotal == null || devicerealtimeDataTotal.getInteger("code") != 0 ||
+                                devicerealtimeDataTotal.getJSONArray("data") == null
+                                || devicerealtimeDataTotal.getJSONArray("data").isEmpty()) {
+                            continue;
+                        }
+                        try {
+                            //TODO 查询数据不为空,插入infulxdb
+                            for (int i = 0; i < devicerealtimeDataTotal.getJSONArray("data").size(); i++) {
+                                JSONObject deviceRealtimeData = devicerealtimeDataTotal.getJSONArray("data").getJSONObject(i);
 //                            adjustTime(deviceRealtimeData, collectionFrequency);//整点rand处理
-                            deviceRealtimeDataList.add(deviceRealtimeData);
+                                deviceRealtimeDataList.add(deviceRealtimeData);
+                            }
+                        } catch (Exception ex) {
+                            log.error("历史任务线程" + taskId + " " + String.valueOf(map.get("deviceCode")) + "查询异常:" + ex.getLocalizedMessage() + ";resJson:" + devicerealtimeDataTotal.toJSONString());
                         }
-                    } catch (Exception ex) {
-                        log.error("历史任务线程" + taskId + " " + String.valueOf(map.get("deviceCode")) + "查询异常:" + ex.getLocalizedMessage() + ";resJson:" + devicerealtimeDataTotal.toJSONString());
-                    }
+//                    }
+//                    break;
                 }
             }
             //TODO 开始插入数据库

+ 6 - 0
dc3-gateway/src/main/java/io/github/pnoker/gateway/comtool/ThreadPoolTaskTool.java

@@ -16,6 +16,7 @@ import java.util.concurrent.*;
 public class ThreadPoolTaskTool {
     public static Map<String, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();//全局唯一
     public static Map<String, ScheduledFuture<?>> scheduledHistoryTasks = new ConcurrentHashMap<>();//全局唯一
+    public static Map<String, ScheduledFuture<?>> scheduledDingzhiHistoryTasks = new ConcurrentHashMap<>();//全局唯一
 
 
     //TODO 创建任务池
@@ -38,6 +39,11 @@ public class ThreadPoolTaskTool {
         scheduledHistoryTasks.put(taskId, future);
     }
 
+    public static void scheduleDingzhiHistroyTask(ScheduledExecutorService scheduler,String taskId,ThreadDinzhiHistoryTask threadTask,Long initialDelay,Integer collectionFrequency) {
+        ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(threadTask, initialDelay, collectionFrequency, TimeUnit.SECONDS);
+        scheduledDingzhiHistoryTasks.put(taskId, future);
+    }
+
     public static void cancelTask() {
         for(String taskIdKey :scheduledTasks.keySet()){
             ScheduledFuture<?> future = scheduledTasks.get(taskIdKey);

+ 9 - 0
dc3-gateway/src/main/java/io/github/pnoker/gateway/utils/TestUtil.java

@@ -8,6 +8,8 @@ import org.influxdb.dto.Query;
 import org.influxdb.dto.QueryResult;
 
 import java.text.SimpleDateFormat;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -22,6 +24,13 @@ import java.util.concurrent.TimeUnit;
 public class TestUtil {
     private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     public static void main(String[] args) throws Exception{
+        // 获取当前时间的整时0分0秒,这里的时主要还是定时器的执行任务时间去控制
+        LocalDateTime now = LocalDateTime.now().withMinute(0).withSecond(0);
+
+        // 使用提取的值构造新的LocalDateTime对象
+        LocalDateTime firstDayOfMonthWithCurrentTime = now.minusHours(3);
+        System.out.println(now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
+        System.out.println(firstDayOfMonthWithCurrentTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
         JSONObject jsonObject = new JSONObject();
         jsonObject.put("time","2024-09-03 09:54:00");
         Date timeStr = dateFormat.parse(jsonObject.getString("time"));