|
|
@@ -0,0 +1,219 @@
|
|
|
+package com.shkpr.service.customgateway.jldma.components;
|
|
|
+
|
|
|
+import com.fasterxml.jackson.core.type.TypeReference;
|
|
|
+import com.global.base.log.LogLevelFlag;
|
|
|
+import com.global.base.log.LogPrintMgr;
|
|
|
+import com.shkpr.service.customgateway.core.components.DeviceIdGenerator;
|
|
|
+import com.shkpr.service.customgateway.core.components.DeviceRegistry;
|
|
|
+import com.shkpr.service.customgateway.core.constants.LogFlagBusiType;
|
|
|
+import com.shkpr.service.customgateway.core.domain.CallingEndpoint;
|
|
|
+import com.shkpr.service.customgateway.core.domain.Device;
|
|
|
+import com.shkpr.service.customgateway.core.domain.DeviceTag;
|
|
|
+import com.shkpr.service.customgateway.core.domain.IntegrationKey;
|
|
|
+import com.shkpr.service.customgateway.core.exception.SelfException;
|
|
|
+import com.shkpr.service.customgateway.core.manager.IntegrationKeyManager;
|
|
|
+import com.shkpr.service.customgateway.core.properties.CallingProperties;
|
|
|
+import com.shkpr.service.customgateway.core.service.FunctionInfoService;
|
|
|
+import com.shkpr.service.customgateway.core.service.PersonnelInfoService;
|
|
|
+import com.shkpr.service.customgateway.core.utils.CallingUtil;
|
|
|
+import com.shkpr.service.customgateway.jldma.constants.IotPlatformMetadata;
|
|
|
+import com.shkpr.service.customgateway.jldma.domain.IotPlatformDevice;
|
|
|
+import com.shkpr.service.customgateway.jldma.domain.IotPlatformPage;
|
|
|
+import com.shkpr.service.customgateway.jldma.domain.IotPlatformResult;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.apache.http.message.BasicHeader;
|
|
|
+import org.springframework.http.HttpMethod;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.util.*;
|
|
|
+import java.util.function.Function;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 信息同步器
|
|
|
+ *
|
|
|
+ * @author 欧阳劲驰
|
|
|
+ * @since 1.0.0
|
|
|
+ */
|
|
|
+@Component
|
|
|
+@Slf4j
|
|
|
+public class InfoSynchronizer {
|
|
|
+ /**
|
|
|
+ * log
|
|
|
+ */
|
|
|
+ private static final String CLASS_NAME = "InfoSynchronizer";
|
|
|
+ private static final String BIZ_TYPE = LogFlagBusiType.JIAN_LI_DMA.toStrValue();
|
|
|
+
|
|
|
+ final
|
|
|
+ CallingProperties callingProperties;
|
|
|
+ final
|
|
|
+ IntegrationKeyManager integrationKeyManager;
|
|
|
+ final
|
|
|
+ DeviceRegistry deviceRegistry;
|
|
|
+ final
|
|
|
+ DeviceIdGenerator deviceIdGenerator;
|
|
|
+ final
|
|
|
+ PersonnelInfoService personnelInfoService;
|
|
|
+ final
|
|
|
+ FunctionInfoService functionInfoService;
|
|
|
+ final
|
|
|
+ CallingUtil callingUtil;
|
|
|
+
|
|
|
+ public InfoSynchronizer(CallingProperties callingProperties, IntegrationKeyManager integrationKeyManager, DeviceRegistry deviceRegistry, DeviceIdGenerator deviceIdGenerator, PersonnelInfoService personnelInfoService, FunctionInfoService functionInfoService, CallingUtil callingUtil) {
|
|
|
+ this.callingProperties = callingProperties;
|
|
|
+ this.integrationKeyManager = integrationKeyManager;
|
|
|
+ this.deviceRegistry = deviceRegistry;
|
|
|
+ this.deviceIdGenerator = deviceIdGenerator;
|
|
|
+ this.personnelInfoService = personnelInfoService;
|
|
|
+ this.functionInfoService = functionInfoService;
|
|
|
+ this.callingUtil = callingUtil;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 同步设备信息
|
|
|
+ */
|
|
|
+ public void syncDevices() {
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME
|
|
|
+ , "开始同步设备信息,开始滚动拉取数据");
|
|
|
+ long begin = System.currentTimeMillis();
|
|
|
+
|
|
|
+ //对接点
|
|
|
+ CallingEndpoint endpoint = callingProperties.getEndpoint(IotPlatformMetadata.NAME);
|
|
|
+
|
|
|
+ //按设备类型遍历
|
|
|
+ for (IotPlatformMetadata.DeviceMapping deviceMapping : IotPlatformMetadata.DeviceMapping.values()) {
|
|
|
+ //请求地址
|
|
|
+ String url = endpoint.getUrl() + IotPlatformMetadata.Uri.DEVICE;
|
|
|
+ //请求获取设备
|
|
|
+ List<IotPlatformDevice> items = Collections.emptyList();
|
|
|
+ try {
|
|
|
+ items = callingUtil.requestScroll(url, HttpMethod.GET,
|
|
|
+ new TypeReference<IotPlatformResult<IotPlatformPage<IotPlatformDevice>>>() {
|
|
|
+ }, pageable -> new HashMap<String, String>() {{
|
|
|
+ put(IotPlatformMetadata.Params.PAGE, pageable.getPageNumber() + "");
|
|
|
+ put(IotPlatformMetadata.Params.SIZE, pageable.getPageSize() + "");
|
|
|
+ put(IotPlatformMetadata.Params.PRODUCT_CODE, IotPlatformMetadata.DefaultValues.PRODUCT_CODE);
|
|
|
+ put(IotPlatformMetadata.Params.DEVICE_TYPE, deviceMapping.getMapping());
|
|
|
+ }}, params -> {
|
|
|
+ //获取密钥
|
|
|
+ IntegrationKey key = integrationKeyManager.getKey(IotPlatformMetadata.NAME);
|
|
|
+ //存入请求头
|
|
|
+ return Collections.singletonList(new BasicHeader(key.getAccessKey(), key.getSecretKey()));
|
|
|
+ });
|
|
|
+ } catch (SelfException e) {
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, BIZ_TYPE, CLASS_NAME
|
|
|
+ , String.format("获取设备失败 error:%s", e)
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ //sn映射
|
|
|
+ Map<String, IotPlatformDevice> snMap = items.stream().collect(Collectors.toMap(IotPlatformDevice::getSn, Function.identity(), (it1, it2) -> it2));
|
|
|
+ //注册设备
|
|
|
+ registryDevices(deviceMapping, snMap);
|
|
|
+ //更新设备
|
|
|
+ updateDevices(deviceMapping, snMap);
|
|
|
+ }
|
|
|
+
|
|
|
+ long end = System.currentTimeMillis();
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME
|
|
|
+ , String.format("结束执行同步设备信息,用时(毫秒):%d", (end - begin))
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 注册设备
|
|
|
+ *
|
|
|
+ * @param deviceMapping 设备类型映射
|
|
|
+ * @param snMap 数据集合
|
|
|
+ */
|
|
|
+ private void registryDevices(IotPlatformMetadata.DeviceMapping deviceMapping, Map<String, IotPlatformDevice> snMap) {
|
|
|
+ //需要添加的设备
|
|
|
+ Set<String> addedDevices = deviceRegistry.findAddedDevices(snMap.keySet());
|
|
|
+ //处理需要添加的设备:生成设备,并注册
|
|
|
+ addedDevices.stream()
|
|
|
+ //过滤无效设备
|
|
|
+ .filter(sn -> snMap.get(sn) != null && StringUtils.isNoneBlank(snMap.get(sn).getName()))
|
|
|
+ //排序
|
|
|
+ .sorted()
|
|
|
+ //生成设备
|
|
|
+ .map(sn -> deviceIdGenerator.generateDevice(
|
|
|
+ IotPlatformMetadata.Devices.AREA_CODE,
|
|
|
+ deviceMapping.getKind(),
|
|
|
+ IotPlatformMetadata.NAME,
|
|
|
+ buildTag(deviceMapping, snMap.get(sn)),
|
|
|
+ sn, snMap.get(sn).getName()
|
|
|
+ ))
|
|
|
+ //设置厂家
|
|
|
+ .peek(device -> device.setMfrs(deviceMapping.getName()))
|
|
|
+
|
|
|
+ //注册
|
|
|
+ .forEach(deviceRegistry::registerDevice);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 更新设备
|
|
|
+ *
|
|
|
+ * @param deviceMapping 设备类型映射
|
|
|
+ * @param snMap 数据集合
|
|
|
+ */
|
|
|
+ private void updateDevices(IotPlatformMetadata.DeviceMapping deviceMapping, Map<String, IotPlatformDevice> snMap) {
|
|
|
+ //构建设备信息,k:远传id,v:标签集合
|
|
|
+ Map<String, Set<String>> deviceInfo = snMap.entrySet().stream().collect(Collectors.toMap(
|
|
|
+ Map.Entry::getKey, entry -> {
|
|
|
+ //获取设备属性
|
|
|
+ IotPlatformDevice.SchemaAttr schemaAttr = entry.getValue().getSchemaAttrs().stream()
|
|
|
+ .filter(it -> Objects.equals(it.getDeviceType() + "", deviceMapping.getMapping()))
|
|
|
+ .findFirst().orElse(null);
|
|
|
+ if (schemaAttr == null) return Collections.emptySet();
|
|
|
+ //标签
|
|
|
+ Set<String> tags = schemaAttr.getData().getProperties().keySet();
|
|
|
+ //过滤存在的标签
|
|
|
+ return deviceMapping.getFieldMappings().stream()
|
|
|
+ .filter(fieldMapping -> tags.contains(fieldMapping.getMapping()))
|
|
|
+ .map(fieldMapping -> fieldMapping.getField().getKey())
|
|
|
+ .collect(Collectors.toSet());
|
|
|
+ }
|
|
|
+ ));
|
|
|
+ //需要更新的设备
|
|
|
+ Set<Device> updatedDevices = deviceRegistry.findUpdatedDevices(deviceInfo);
|
|
|
+ updatedDevices.forEach(device -> {
|
|
|
+ IotPlatformDevice item = snMap.get(device.getDeviceSn());
|
|
|
+
|
|
|
+ device.setTags(buildTag(deviceMapping, item));
|
|
|
+
|
|
|
+ deviceRegistry.updateDevice(device);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 构建采集标签
|
|
|
+ *
|
|
|
+ * @param deviceMapping 设备映射
|
|
|
+ * @param item 结果项
|
|
|
+ * @return 采集标签集合
|
|
|
+ */
|
|
|
+ private List<DeviceTag> buildTag(IotPlatformMetadata.DeviceMapping deviceMapping, IotPlatformDevice item) {
|
|
|
+ //获取设备属性
|
|
|
+ IotPlatformDevice.SchemaAttr schemaAttr = item.getSchemaAttrs().stream().filter(it -> Objects.equals(it.getDeviceType() + "", deviceMapping.getMapping()))
|
|
|
+ .findFirst().orElse(null);
|
|
|
+ if (schemaAttr == null) return Collections.emptyList();
|
|
|
+
|
|
|
+ //标签
|
|
|
+ Set<String> tags = schemaAttr.getData().getProperties().keySet();
|
|
|
+
|
|
|
+ //构建标签
|
|
|
+ return deviceMapping.getFieldMappings().stream()
|
|
|
+ //过滤存在的标签
|
|
|
+ .filter(fieldMapping -> tags.contains(fieldMapping.getMapping()))
|
|
|
+ //转换为采集标签
|
|
|
+ .map(fieldMapping -> new DeviceTag(
|
|
|
+ fieldMapping.getMapping(),
|
|
|
+ deviceMapping.getProtocol().getKey(),
|
|
|
+ deviceMapping.getKind().getMeasurement(),
|
|
|
+ fieldMapping.getField().getKey(),
|
|
|
+ fieldMapping.getValueType().getKey()
|
|
|
+ ))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ }
|
|
|
+}
|