Sfoglia il codice sorgente

filed为空则不插入infulxdb 更改时间间隔秒级round逻辑

1037015548@qq.com 9 mesi fa
parent
commit
2f9e0e3036

+ 33 - 22
dc3-gateway/src/main/java/io/github/pnoker/gateway/bizmgr/KprDangyangWaterBizFun.java

@@ -29,6 +29,7 @@ import org.springframework.util.StringUtils;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -184,23 +185,18 @@ public class KprDangyangWaterBizFun {
                                 return (frequency == null) ? 0 : (Integer) frequency;
                             }));
                     //TODO 创建任务池
-                    ThreadPoolTaskScheduler scheduler = ThreadPoolTaskTool.createTaskScheduler(groupedByFrequency.keySet().size(),
-                            "schedule_dangyang_history_"+itemType.getString("key"));
+                    ScheduledExecutorService scheduler = ThreadPoolTaskTool.createTaskScheduler(groupedByFrequency.keySet().size());
 
                     for (Integer groupKey:groupedByFrequency.keySet()){
                         String taskId = itemType.getString("key")+"_history"+"_"+groupKey;
-                        //TODO 任务时间间隔/分 = 采集频率(秒)/60
-                        Integer fc = groupKey;
-                        if(groupKey<60){
-                            fc = 60;
+                        //TODO 任务时间间隔
+                        if (groupKey==0||groupKey<0){
+                            continue;
                         }
-                        Integer taskTime = fc/60;
-                        ThreadHistoryTask threadHistoryTask = new ThreadHistoryTask(taskId,taskTime
+                        ThreadHistoryTask threadHistoryTask = new ThreadHistoryTask(taskId,groupKey
                                 ,itemType.getString("key"),groupedByFrequency.get(groupKey));
-                        ThreadPoolTaskTool.scheduleCronHistroyTask(scheduler,taskId,threadHistoryTask,"0 0 0 * * * ");
-//                        if(itemType.getString("key").equals("ELEC_CTRL_VALVE")) {
-//                            ThreadPoolTaskTool.scheduleCronHistroyTask(scheduler, taskId, threadHistoryTask, "0 01 12 * * ?");
-//                        }
+                        ThreadPoolTaskTool.scheduleHistroyTask(scheduler,taskId,threadHistoryTask,getInitialDelay()
+                                , 24 * 60 * 60);
                     }
                 }
             }
@@ -211,6 +207,25 @@ public class KprDangyangWaterBizFun {
         }
     }
 
+    private static long getInitialDelay() {
+        Calendar now = Calendar.getInstance();
+        Calendar nextRun = (Calendar) now.clone();
+
+        // 设置为明天的0点
+        nextRun.set(Calendar.HOUR_OF_DAY, 2);
+        nextRun.set(Calendar.MINUTE, 0);
+        nextRun.set(Calendar.SECOND, 0);
+        nextRun.set(Calendar.MILLISECOND, 0);
+
+        // 如果现在已经过了0点,设置为明天的0点
+        if (now.after(nextRun)) {
+            nextRun.add(Calendar.DAY_OF_MONTH, 1);
+        }
+
+        // 计算从现在到下一个0点的延迟时间(以秒为单位)
+        return (nextRun.getTimeInMillis() - now.getTimeInMillis()) / 1000;
+    }
+
     //TODO 获取实时数据
     /**
      * ①首先查询所有的设备列表
@@ -283,21 +298,17 @@ public class KprDangyangWaterBizFun {
                                 return (frequency == null) ? 0 : (Integer) frequency;
                             }));
                     //TODO 创建任务池
-                    ThreadPoolTaskScheduler scheduler = ThreadPoolTaskTool.createTaskScheduler(groupedByFrequency.keySet().size(),
-                            "schedule_dangyang_"+itemType.getString("key"));
+                    ScheduledExecutorService scheduler = ThreadPoolTaskTool.createTaskScheduler(groupedByFrequency.keySet().size());
                     for (Integer groupKey:groupedByFrequency.keySet()){
                         String taskId = itemType.getString("key")+"_"+groupKey;
-                        //TODO 任务时间间隔/分 = 采集频率(秒)/60
-                        Integer fc = groupKey;
-                        if(groupKey<60){
-                            fc = 60;
+                        //TODO 任务时间间隔
+                        if (groupKey==0||groupKey<0){
+                            continue;
                         }
-                        Integer taskTime = fc/60;
-
-                        ThreadTask threadTask = new ThreadTask(taskId,taskTime
+                        ThreadTask threadTask = new ThreadTask(taskId,groupKey
                         ,itemType.getString("key"),groupedByFrequency.get(groupKey));
 //                        if(itemType.getString("key").equals("ELEC_CTRL_VALVE")) {
-                            ThreadPoolTaskTool.scheduleCronTask(scheduler, taskId, threadTask, "0 */" + taskTime + " * * * ?");
+                            ThreadPoolTaskTool.scheduleSecTask(scheduler, taskId, threadTask, groupKey);
 //                        }
                     }
                 }else{

+ 16 - 23
dc3-gateway/src/main/java/io/github/pnoker/gateway/comtool/ThreadHistoryTask.java

@@ -220,32 +220,25 @@ public class ThreadHistoryTask implements Runnable{
     }
 
     private static final SimpleDateFormat outputDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-    public static void adjustTime(JSONObject jsonObject, int collectionFrequency) throws ParseException {
-        String timeStr = jsonObject.getString("time");
-        Long oriLong =  TimeTool.convertDateStr2UTC(timeStr);
-        Date originalDate = new Date(oriLong);
-
-        // 计算最近的整分钟时间
-        Date roundedDate = getNearestTime(originalDate, collectionFrequency);
-        if(roundedDate.after(new Date())){
-            System.out.println();
-        }
+    public static void adjustTime(JSONObject deviceRealtimeData, int collectionFrequency) {
+        // 获取原始时间字符串
+        String timeString = deviceRealtimeData.getString("time");
 
-        // 格式化回字符串
-        String adjustedTimeStr = outputDateFormat.format(roundedDate);
+        // 定义日期时间格式
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
 
-        // 更新 JSON 对象中的时间
-        jsonObject.put("time", adjustedTimeStr);
-    }
+        // 解析为 LocalDateTime 对象
+        LocalDateTime originalTime = LocalDateTime.parse(timeString, formatter);
+
+        // 计算最接近的时间间隔
+        long seconds = originalTime.getSecond();
+        long adjustedSeconds = Math.round((double) seconds / collectionFrequency) * collectionFrequency;
 
-    private static Date getNearestTime(Date date, int collectionFrequency) {
-        long milliseconds = date.getTime();
-        long minutes = milliseconds / (60 * 1000);
-        long remainder = minutes % collectionFrequency;
-        long adjustment = remainder < collectionFrequency / 2 ? -remainder : collectionFrequency - remainder;
+        // 创建新的时间
+        LocalDateTime adjustedTime = originalTime.truncatedTo(ChronoUnit.MINUTES).plusSeconds(adjustedSeconds);
 
-        // 调整到最近的整分钟时间
-        long adjustedMinutes = minutes + adjustment;
-        return new Date(adjustedMinutes * 60 * 1000);
+        // 更新 JSON 对象中的时间
+        deviceRealtimeData.put("time", adjustedTime.format(formatter));
     }
+
 }

+ 8 - 16
dc3-gateway/src/main/java/io/github/pnoker/gateway/comtool/ThreadPoolTaskTool.java

@@ -4,8 +4,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 import org.springframework.scheduling.support.CronTrigger;
 
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.*;
 
 /**
  * @ClassName ThreadPoolTaskTool
@@ -22,26 +21,19 @@ public class ThreadPoolTaskTool {
     /**
      *
      * @param poolSize 任务池大小
-     * @param threadNamePrefix 任务名前缀
      * @return
      */
-    public static ThreadPoolTaskScheduler createTaskScheduler(int poolSize,String threadNamePrefix) {
-        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
-        scheduler.setPoolSize(poolSize);
-        scheduler.setThreadNamePrefix(threadNamePrefix);
-        scheduler.initialize();
+    public static ScheduledExecutorService createTaskScheduler(int poolSize) {
+        ScheduledExecutorService  scheduler =  Executors.newScheduledThreadPool(poolSize);
+
         return scheduler;
     }
-
-    //TODO 根据corn表达式执行指定任务池的任务
-    public static void scheduleCronTask(ThreadPoolTaskScheduler taskScheduler,String taskId,ThreadTask threadTask,String cronExpression) {
-        CronTrigger cronTrigger = new CronTrigger(cronExpression);
-        ScheduledFuture<?> future = taskScheduler.schedule(threadTask, cronTrigger);
+    public static void scheduleSecTask(ScheduledExecutorService scheduler,String taskId,ThreadTask threadTask,Integer collectionFrequency) {
+        ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(threadTask, 0, collectionFrequency, TimeUnit.SECONDS);
         scheduledTasks.put(taskId, future);
     }
-    public static void scheduleCronHistroyTask(ThreadPoolTaskScheduler taskScheduler,String taskId,ThreadHistoryTask threadTask,String cronExpression) {
-        CronTrigger cronTrigger = new CronTrigger(cronExpression);
-        ScheduledFuture<?> future = taskScheduler.schedule(threadTask, cronTrigger);
+    public static void scheduleHistroyTask(ScheduledExecutorService scheduler,String taskId,ThreadHistoryTask threadTask,Long initialDelay,Integer collectionFrequency) {
+        ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(threadTask, initialDelay, collectionFrequency, TimeUnit.SECONDS);
         scheduledTasks.put(taskId, future);
     }
 }

+ 17 - 24
dc3-gateway/src/main/java/io/github/pnoker/gateway/comtool/ThreadTask.java

@@ -75,7 +75,7 @@ public class ThreadTask implements Runnable{
                         //TODO 查询数据不为空,插入infulxdb
                         JSONObject deviceRealtimeData = devicerealtimeDataTotal.getJSONObject("data")
                                 .getJSONArray("list").getJSONObject(0);
-                        adjustTime(deviceRealtimeData, collectionFrequency);//整点rand处理
+                        adjustTime(deviceRealtimeData, collectionFrequency);//时间间隔round处理
                         deviceRealtimeDataList.add(deviceRealtimeData);
                     } else {
                         log.error("任务线程" + taskId + " 执行设备"
@@ -214,32 +214,25 @@ public class ThreadTask implements Runnable{
     }
 
     private static final SimpleDateFormat outputDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-    public static void adjustTime(JSONObject jsonObject, int collectionFrequency) throws ParseException {
-        String timeStr = jsonObject.getString("time");
-        Long oriLong =  TimeTool.convertDateStr2UTC(timeStr);
-        Date originalDate = new Date(oriLong);
-
-        // 计算最近的整分钟时间
-        Date roundedDate = getNearestTime(originalDate, collectionFrequency);
-        if(roundedDate.after(new Date())){
-            System.out.println();
-        }
+    public static void adjustTime(JSONObject deviceRealtimeData, int collectionFrequency) {
+        // 获取原始时间字符串
+        String timeString = deviceRealtimeData.getString("time");
 
-        // 格式化回字符串
-        String adjustedTimeStr = outputDateFormat.format(roundedDate);
+        // 定义日期时间格式
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
 
-        // 更新 JSON 对象中的时间
-        jsonObject.put("time", adjustedTimeStr);
-    }
+        // 解析为 LocalDateTime 对象
+        LocalDateTime originalTime = LocalDateTime.parse(timeString, formatter);
+
+        // 计算最接近的时间间隔
+        long seconds = originalTime.getSecond();
+        long adjustedSeconds = Math.round((double) seconds / collectionFrequency) * collectionFrequency;
 
-    private static Date getNearestTime(Date date, int collectionFrequency) {
-        long milliseconds = date.getTime();
-        long minutes = milliseconds / (60 * 1000);
-        long remainder = minutes % collectionFrequency;
-        long adjustment = remainder < collectionFrequency / 2 ? -remainder : collectionFrequency - remainder;
+        // 创建新的时间
+        LocalDateTime adjustedTime = originalTime.truncatedTo(ChronoUnit.MINUTES).plusSeconds(adjustedSeconds);
 
-        // 调整到最近的整分钟时间
-        long adjustedMinutes = minutes + adjustment;
-        return new Date(adjustedMinutes * 60 * 1000);
+        // 更新 JSON 对象中的时间
+        deviceRealtimeData.put("time", adjustedTime.format(formatter));
     }
+
 }