|
|
@@ -27,10 +27,7 @@ import org.springframework.stereotype.Component;
|
|
|
import java.io.IOException;
|
|
|
import java.time.LocalDateTime;
|
|
|
import java.time.temporal.ChronoUnit;
|
|
|
-import java.util.Arrays;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.Set;
|
|
|
+import java.util.*;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
import static com.shkpr.service.customgateway.zydma.constants.IotPlatformMetadata.*;
|
|
|
@@ -109,9 +106,9 @@ public class IotCollector {
|
|
|
LocalDateTime endTime = LocalDateTime.now().truncatedTo(ChronoUnit.HOURS);
|
|
|
|
|
|
//===================按设备类型遍历===================
|
|
|
- for (DeviceTypeMapping deviceTypeMapping : DeviceTypeMapping.values()) {
|
|
|
+ for (DeviceMapping deviceMapping : DeviceMapping.values()) {
|
|
|
//参数
|
|
|
- Map<String, Object> params = getHistoryDataParams(deviceTypeMapping, beginTime, endTime);
|
|
|
+ Map<String, Object> params = getHistoryDataParams(deviceMapping, beginTime, endTime);
|
|
|
//请求结果项
|
|
|
List<IotPlatformData> items = callingUtil.request(url, HttpMethod.POST, params, headers,
|
|
|
new TypeReference<IotPlatformResult<List<IotPlatformData>>>() {
|
|
|
@@ -123,7 +120,9 @@ public class IotCollector {
|
|
|
Map<String, List<IotPlatformData>> snGroup = items.stream()
|
|
|
.collect(Collectors.groupingBy(IotPlatformData::getCode));
|
|
|
//注册设备
|
|
|
- registryDevices(deviceTypeMapping, snGroup);
|
|
|
+ registryDevices(deviceMapping, snGroup);
|
|
|
+ //更新设备
|
|
|
+ updateDevices(deviceMapping, snGroup);
|
|
|
|
|
|
//设备列表
|
|
|
List<Device> devices = deviceRegistry.findAll();
|
|
|
@@ -163,10 +162,10 @@ public class IotCollector {
|
|
|
/**
|
|
|
* 注册设备
|
|
|
*
|
|
|
- * @param deviceTypeMapping 设备类型映射
|
|
|
- * @param snGroup 数据集合
|
|
|
+ * @param deviceMapping 设备类型映射
|
|
|
+ * @param snGroup 数据集合
|
|
|
*/
|
|
|
- private void registryDevices(DeviceTypeMapping deviceTypeMapping, Map<String, List<IotPlatformData>> snGroup) {
|
|
|
+ private void registryDevices(DeviceMapping deviceMapping, Map<String, List<IotPlatformData>> snGroup) {
|
|
|
//需要添加的设备
|
|
|
Set<String> addedDevices = deviceRegistry.findAddedDevices(snGroup.keySet());
|
|
|
//处理需要添加的设备:生成设备,并注册
|
|
|
@@ -178,11 +177,74 @@ public class IotCollector {
|
|
|
//生成设备
|
|
|
.map(sn -> deviceIdGenerator.generateDevice(
|
|
|
Devices.AREA_CODE,
|
|
|
- deviceTypeMapping.getType(),
|
|
|
- deviceTypeMapping.defaultTags(),
|
|
|
+ deviceMapping.getKind(),
|
|
|
+ buildTag(deviceMapping, snGroup.get(sn)),
|
|
|
sn, snGroup.get(sn).get(0).getName()
|
|
|
))
|
|
|
//注册
|
|
|
.forEach(deviceRegistry::registerDevice);
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 注册设备
|
|
|
+ *
|
|
|
+ * @param deviceMapping 设备类型映射
|
|
|
+ * @param snGroup 数据集合
|
|
|
+ */
|
|
|
+ private void updateDevices(DeviceMapping deviceMapping, Map<String, List<IotPlatformData>> snGroup) {
|
|
|
+ //构建设备信息,k:远传id,v:标签集合
|
|
|
+ Map<String, Set<String>> deviceInfo = snGroup.entrySet().stream().collect(Collectors.toMap(
|
|
|
+ Map.Entry::getKey, entry ->
|
|
|
+ entry.getValue().stream()
|
|
|
+ //转表集合
|
|
|
+ .map(item ->
|
|
|
+ //获取匹配的映射
|
|
|
+ deviceMapping.getFieldMappings().stream()
|
|
|
+ .filter(fieldMapping ->
|
|
|
+ Objects.equals(item.getSensorName(), fieldMapping.getMapping()))
|
|
|
+ .findFirst()
|
|
|
+ )
|
|
|
+ .filter(Optional::isPresent)
|
|
|
+ //获取字段key
|
|
|
+ .map(optional -> optional.get().getField().getKey())
|
|
|
+ .collect(Collectors.toSet())
|
|
|
+ ));
|
|
|
+ //需要添加的设备
|
|
|
+ Set<Device> updatedDevices = deviceRegistry.findUpdatedDevices(deviceInfo);
|
|
|
+ updatedDevices.forEach(device -> {
|
|
|
+ List<IotPlatformData> items = snGroup.get(device.getDeviceSn());
|
|
|
+
|
|
|
+ device.setTags(buildTag(deviceMapping, items));
|
|
|
+
|
|
|
+ deviceRegistry.updateDevice(device);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 构建采集标签
|
|
|
+ *
|
|
|
+ * @param deviceMapping 设备映射
|
|
|
+ * @param items 结果项
|
|
|
+ * @return 采集标签集合
|
|
|
+ */
|
|
|
+ private List<DeviceTag> buildTag(DeviceMapping deviceMapping, List<IotPlatformData> items) {
|
|
|
+ //传感器名称
|
|
|
+ Set<String> sensorNames = items.stream()
|
|
|
+ .map(IotPlatformData::getSensorName)
|
|
|
+ .collect(Collectors.toSet());
|
|
|
+
|
|
|
+ //构建标签
|
|
|
+ return deviceMapping.getFieldMappings().stream()
|
|
|
+ //过滤存在的标签
|
|
|
+ .filter(fieldMapping -> sensorNames.contains(fieldMapping.getMapping()))
|
|
|
+ //转换为采集标签
|
|
|
+ .map(fieldMapping -> new DeviceTag(
|
|
|
+ fieldMapping.getMapping(),
|
|
|
+ deviceMapping.getProtocol().getKey(),
|
|
|
+ deviceMapping.getKind().getMeasurement(),
|
|
|
+ fieldMapping.getField().getKey(),
|
|
|
+ fieldMapping.getValueType().getKey()
|
|
|
+ ))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ }
|
|
|
}
|