Sfoglia il codice sorgente

优化influxDb采集数据解析

欧阳劲驰 2 mesi fa
parent
commit
06cfc6ebeb

+ 21 - 7
src/main/java/com/shkpr/service/alambizplugin/commtools/InfluxDbUtil.java

@@ -20,6 +20,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.time.LocalDateTime;
 import java.time.ZonedDateTime;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -155,6 +156,13 @@ public class InfluxDbUtil {
         QueryResult queryResult = influxDb.query(new Query(sql, properties.getDatabase()));
         QueryResult.Series series = getSeries(queryResult);
         if (series == null) return Collections.emptyList();
+        LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
+                , String.format(
+                        "读取InfluxDb成功,数据量:%s,开始解析"
+                        , series.getValues().size()
+                )
+        );
+
         //获取字段
         List<String> columns = series.getColumns();
         //字段映射
@@ -174,13 +182,11 @@ public class InfluxDbUtil {
                                     .findFirst().orElse(-1);
                         }, Function.identity(),
                         (it1, it2) -> it2,
-                        HashMap::new
+                        ConcurrentHashMap::new
                 ));
-        //获取值
-        List<List<Object>> values = series.getValues();
+
         //解析数据
-        List<E> dates = new ArrayList<>();
-        for (List<Object> value : values) {
+        List<E> dates = series.getValues().parallelStream().map(value -> {
             try {
                 //实列化数据
                 E data = clazz.getDeclaredConstructor().newInstance();
@@ -202,14 +208,22 @@ public class InfluxDbUtil {
                     } else fieldEntry.getValue().set(data, item);
                 }
                 //存入数据
-                dates.add(data);
+                return data;
             } catch (InstantiationException | IllegalAccessException | InvocationTargetException |
                      NoSuchMethodException e) {
                 LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
                         , String.format("构建数据失败 error:%s", e)
                 );
+                return null;
             }
-        }
+        }).filter(Objects::nonNull).collect(Collectors.toList());
+
+        LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
+                , String.format(
+                        "解析InfluxDb成功,数据量:%s"
+                        , series.getValues().size()
+                )
+        );
 
         return dates;
     }