Browse Source

修改采集频率,优化influxdb写入

欧阳劲驰 2 weeks ago
parent
commit
47a91e5e04

+ 44 - 51
custom-gateway-core/src/main/java/com/shkpr/service/customgateway/core/utils/InfluxDbUtil.java

@@ -49,73 +49,28 @@ public class InfluxDbUtil {
     }
 
     /**
-     * 获取series
-     *
-     * @param queryResult 查询结果
-     * @return series
-     */
-    public static QueryResult.Series getSeries(QueryResult queryResult) {
-        if (queryResult == null) return null;
-
-        //获取结果集合
-        List<QueryResult.Result> results = queryResult.getResults();
-        if (results == null || results.isEmpty()) return null;
-
-        //获取第一个结果
-        QueryResult.Result result = results.get(0);
-        if (result == null) return null;
-
-        //获取series
-        List<QueryResult.Series> series = result.getSeries();
-        if (series == null || series.isEmpty()) return null;
-
-        //获取第一个series
-        return series.get(0);
-    }
-
-    /**
-     * 获取值集合
-     *
-     * @param queryResult 查询结果
-     * @return 值集合
-     */
-    public static List<List<Object>> getValues(QueryResult queryResult) {
-        //获取series
-        QueryResult.Series firstSeries = getSeries(queryResult);
-        if (firstSeries == null) return Collections.emptyList();
-
-        //获取值集合
-        List<List<Object>> values = firstSeries.getValues();
-        return values != null ? values : Collections.emptyList();
-    }
-
-    /**
-     * 插入
+     * 写入
      *
      * @param point 点
-     * @return 插入状态
      */
-    public Boolean insert(Point point) {
+    public void write(Point point) {
         try {
             for (InfluxDbClient influxDb : influxDBClients.clients()) {
                 influxDb.write(point);
             }
-            return true;
         } catch (Exception e) {
             LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, BIZ_TYPE, CLASS_NAME
                     , String.format("插入InfluxDb失败 error:%s", e)
             );
-            return false;
         }
     }
 
     /**
-     * 批量插
+     * 写入
      *
      * @param points 批量点
-     * @return 插入状态
      */
-    public Boolean insertBatch(List<Point> points) {
+    public void write(List<Point> points) {
         try {
             LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME
                     , String.format("开始批量写入InfluxDb,数据量:%d", points.size()));
@@ -134,12 +89,10 @@ public class InfluxDbUtil {
             long end = System.currentTimeMillis();
             LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME
                     , String.format("批量写入InfluxDb成功 用时(毫秒):%d", (end - begin)));
-            return true;
         } catch (Exception e) {
             LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, BIZ_TYPE, CLASS_NAME
                     , String.format("插入InfluxDb失败 error:%s", e)
             );
-            return false;
         }
     }
 
@@ -229,4 +182,44 @@ public class InfluxDbUtil {
         return dates;
     }
 
+    /**
+     * 获取series
+     *
+     * @param queryResult 查询结果
+     * @return series
+     */
+    private QueryResult.Series getSeries(QueryResult queryResult) {
+        if (queryResult == null) return null;
+
+        //获取结果集合
+        List<QueryResult.Result> results = queryResult.getResults();
+        if (results == null || results.isEmpty()) return null;
+
+        //获取第一个结果
+        QueryResult.Result result = results.get(0);
+        if (result == null) return null;
+
+        //获取series
+        List<QueryResult.Series> series = result.getSeries();
+        if (series == null || series.isEmpty()) return null;
+
+        //获取第一个series
+        return series.get(0);
+    }
+
+    /**
+     * 获取值集合
+     *
+     * @param queryResult 查询结果
+     * @return 值集合
+     */
+    private List<List<Object>> getValues(QueryResult queryResult) {
+        //获取series
+        QueryResult.Series firstSeries = getSeries(queryResult);
+        if (firstSeries == null) return Collections.emptyList();
+
+        //获取值集合
+        List<List<Object>> values = firstSeries.getValues();
+        return values != null ? values : Collections.emptyList();
+    }
 }

+ 11 - 3
custom-gateway-zydma/src/main/java/com/shkpr/service/customgateway/zydma/components/IotCollector.java

@@ -105,6 +105,9 @@ public class IotCollector {
         LocalDateTime beginTime = LocalDateTime.now().truncatedTo(ChronoUnit.HOURS).minusHours(previousHours);
         LocalDateTime endTime = LocalDateTime.now().truncatedTo(ChronoUnit.HOURS);
 
+        //结果
+        List<Point> results = new ArrayList<>();
+
         //===================按设备类型遍历===================
         for (DeviceMapping deviceMapping : DeviceMapping.values()) {
             //参数
@@ -128,6 +131,7 @@ public class IotCollector {
             List<Device> devices = deviceRegistry.findAll();
             //===================按设备遍历===================
             for (Device device : devices) {
+                if (!snGroup.containsKey(device.getDeviceSn())) continue;
                 //当前设备项
                 List<IotPlatformData> deviceItems = snGroup.get(device.getDeviceSn());
 
@@ -146,13 +150,17 @@ public class IotCollector {
                     List<IotPlatformData.Data> dates = tagItems.stream()
                             .flatMap(it -> it.getDataModel().stream())
                             .collect(Collectors.toList());
-                    //===================写入influxdb===================
-                    List<Point> points = dates.stream().map(it -> dataToPoint(it, device, tag)).collect(Collectors.toList());
-                    influxDbUtil.insertBatch(points);
+                    //===================构建influxdb===================
+                    results.addAll(dates.stream()
+                            .map(it -> dataToPoint(it, device, tag))
+                            .collect(Collectors.toList()));
                 }
             }
         }
 
+        //写入influxdb
+        influxDbUtil.write(results);
+
         long end = System.currentTimeMillis();
         LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME
                 , String.format("结束采集流量数据 用时(毫秒):%d", (end - begin))

+ 2 - 2
custom-gateway-zydma/src/main/java/com/shkpr/service/customgateway/zydma/manager/IotManager.java

@@ -37,9 +37,9 @@ public class IotManager {
     /**
      * 小时任务
      */
-    @Scheduled(cron = "0 15 0,6,12,18 * * ?")
+    @Scheduled(cron = "0 */15 0,6,12,18 * * ?")
     public void minuteTask() {
         //采集流量数据
-        taskScheduler.execute(() -> iotCollector.collectFlow(12));
+        taskScheduler.execute(() -> iotCollector.collectFlow(2 * 6));
     }
 }