Просмотр исходного кода

scada数据迁移增加并行处理

欧阳劲驰 1 месяц назад
Родитель
Сommit
f26ab9b835

+ 19 - 2
custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/components/DataMigrator.java

@@ -25,6 +25,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 /**
@@ -68,10 +72,13 @@ public class DataMigrator {
 
         //加载传感器配置
         loadSensors();
+        //多线程执行器
+        ExecutorService executor = Executors.newFixedThreadPool(1);
+
         //设备
         List<Device> devices = deviceRegistry.findAll(ScadaPlatformMetadata.NAME);
         //遍历设备,读取数据并写入
-        devices.forEach(device -> {
+        List<Future<?>> futures = devices.stream().map(device -> executor.submit(() -> {
             //遍历tag
             for (DeviceTag tag : device.getTags()) {
                 //传感器code
@@ -99,8 +106,18 @@ public class DataMigrator {
                 //写入influxDb
                 influxDbUtil.write(points);
             }
-        });
+        })).collect(Collectors.toList());
 
+        //等待线程完成
+        for (Future<?> future : futures) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME
+                        , String.format("迁移Scada数据执行异常 error:%s", e)
+                );
+            }
+        }
 
         long end = System.currentTimeMillis();
         LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME