|
|
@@ -18,10 +18,7 @@ import org.influxdb.dto.Point;
|
|
|
import org.springframework.http.HttpMethod;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
+import java.util.*;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
@@ -67,39 +64,35 @@ public class DataCollector {
|
|
|
//请求地址
|
|
|
String url = endpoint.getUrl() + ScadaPlatformMetadata.Uri.REAL_TIME_DATA;
|
|
|
|
|
|
- //数据集合
|
|
|
- List<ScadaPlatformData> allItems = new ArrayList<>();
|
|
|
//设备
|
|
|
List<Device> devices = deviceRegistry.findAll(ScadaPlatformMetadata.NAME);
|
|
|
-
|
|
|
//遍历设备,并请求数据
|
|
|
- for (Device device : devices) {
|
|
|
+ List<ScadaPlatformData> dates = devices.parallelStream().map(device -> {
|
|
|
LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME
|
|
|
, String.format("开始拉去数据,采集标签:%s",
|
|
|
device.getTags().stream().map(DeviceTag::getTag).collect(Collectors.joining(","))));
|
|
|
|
|
|
//参数
|
|
|
Map<String, Object> params = ScadaPlatformMetadata.getRealTimeDataParams(Collections.singletonList(device));
|
|
|
+
|
|
|
//请求结果项
|
|
|
- List<ScadaPlatformData> items;
|
|
|
try {
|
|
|
- items = callingUtil.requestList(url, HttpMethod.GET, params, Collections.emptyList(),
|
|
|
+ List<ScadaPlatformData> items = callingUtil.requestList(url, HttpMethod.GET, params, Collections.emptyList(),
|
|
|
new TypeReference<ScadaPlatformDataResult<List<ScadaPlatformData>>>() {
|
|
|
});
|
|
|
LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME
|
|
|
, String.format("拉取数据成功,数据量:%d", items.size()));
|
|
|
+ return items;
|
|
|
} catch (SelfException e) {
|
|
|
LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, BIZ_TYPE, CLASS_NAME
|
|
|
, String.format("拉取数据失败 error:%s", e)
|
|
|
);
|
|
|
- continue;
|
|
|
+ return new ArrayList<ScadaPlatformData>();
|
|
|
}
|
|
|
-
|
|
|
- allItems.addAll(items);
|
|
|
- }
|
|
|
+ }).flatMap(Collection::stream).collect(Collectors.toList());
|
|
|
|
|
|
//构建influxdb
|
|
|
- List<Point> points = allItems.parallelStream().map(d -> d.toPoint(devices)).collect(Collectors.toList());
|
|
|
+ List<Point> points = dates.parallelStream().map(d -> d.toPoint(devices)).collect(Collectors.toList());
|
|
|
//写入influxDb
|
|
|
influxDbUtil.write(points);
|
|
|
|