ソースを参照

优化influxDb采集数据解析

欧阳劲驰 3 ヶ月 前
コミット
8a5b7ce2e7

+ 4 - 7
src/main/java/com/shkpr/service/alambizplugin/bizservice/PipeBurstDataBizService.java

@@ -11,7 +11,9 @@ import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
+import java.util.Comparator;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * 爆管数据service
@@ -56,13 +58,8 @@ public class PipeBurstDataBizService {
         long begin = System.currentTimeMillis();
 
         //读取数据
-        List<PipeBurstDataPressCur> dates = influxDbUtil.query(InfluxdbMetadata.SQL.READ_PRESS_CUR_SQL, PipeBurstDataPressCur.class);
-        LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
-                , String.format(
-                        "读取InfluxDb成功,数据量:%s"
-                        , dates.size()
-                )
-        );
+        List<PipeBurstDataPressCur> dates = influxDbUtil.query(InfluxdbMetadata.SQL.READ_PRESS_CUR_SQL, PipeBurstDataPressCur.class)
+                .stream().sorted(Comparator.comparing(PipeBurstDataPressCur::getTime)).collect(Collectors.toList());
 
         //批量合并
         Boolean upserted = dataPressCurService.upsertAll(dates);

+ 21 - 7
src/main/java/com/shkpr/service/alambizplugin/commtools/InfluxDbUtil.java

@@ -20,6 +20,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.time.LocalDateTime;
 import java.time.ZonedDateTime;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -155,6 +156,13 @@ public class InfluxDbUtil {
         QueryResult queryResult = influxDb.query(new Query(sql, properties.getDatabase()));
         QueryResult.Series series = getSeries(queryResult);
         if (series == null) return Collections.emptyList();
+        LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
+                , String.format(
+                        "读取InfluxDb成功,数据量:%s,开始解析"
+                        , series.getValues().size()
+                )
+        );
+
         //获取字段
         List<String> columns = series.getColumns();
         //字段映射
@@ -174,13 +182,11 @@ public class InfluxDbUtil {
                                     .findFirst().orElse(-1);
                         }, Function.identity(),
                         (it1, it2) -> it2,
-                        HashMap::new
+                        ConcurrentHashMap::new
                 ));
-        //获取值
-        List<List<Object>> values = series.getValues();
+
         //解析数据
-        List<E> dates = new ArrayList<>();
-        for (List<Object> value : values) {
+        List<E> dates = series.getValues().parallelStream().map(value -> {
             try {
                 //实列化数据
                 E data = clazz.getDeclaredConstructor().newInstance();
@@ -202,14 +208,22 @@ public class InfluxDbUtil {
                     } else fieldEntry.getValue().set(data, item);
                 }
                 //存入数据
-                dates.add(data);
+                return data;
             } catch (InstantiationException | IllegalAccessException | InvocationTargetException |
                      NoSuchMethodException e) {
                 LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
                         , String.format("构建数据失败 error:%s", e)
                 );
+                return null;
             }
-        }
+        }).filter(Objects::nonNull).collect(Collectors.toList());
+
+        LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
+                , String.format(
+                        "解析InfluxDb成功,数据量:%s"
+                        , series.getValues().size()
+                )
+        );
 
         return dates;
     }