欧阳劲驰 2 тижнів тому
батько
коміт
9286ccdd6a
17 змінених файлів з 331 додано та 17 видалено
  1. 0 5
      custom-gateway-core/pom.xml
  2. 5 1
      custom-gateway-core/src/main/java/com/shkpr/service/customgateway/core/domain/po/DeviceTags.java
  3. 1 1
      custom-gateway-core/src/main/java/com/shkpr/service/customgateway/core/repository/embedded/AccessKeysRepository.java
  4. 1 1
      custom-gateway-core/src/main/java/com/shkpr/service/customgateway/core/repository/embedded/DeviceSequencesRepository.java
  5. 10 3
      custom-gateway-core/src/main/java/com/shkpr/service/customgateway/core/repository/embedded/DeviceTagsRepository.java
  6. 13 0
      custom-gateway-core/src/main/java/com/shkpr/service/customgateway/core/service/DeviceTagsService.java
  7. 2 1
      custom-gateway-core/src/main/java/com/shkpr/service/customgateway/core/service/impl/DeviceSequencesServiceImpl.java
  8. 61 0
      custom-gateway-core/src/main/java/com/shkpr/service/customgateway/core/service/impl/DeviceTagsServiceImpl.java
  9. 2 1
      custom-gateway-core/src/main/java/com/shkpr/service/customgateway/core/utils/CallingUtil.java
  10. 2 2
      custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/components/DataCollector.java
  11. 93 0
      custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/components/InfoSynchronizer.java
  12. 6 0
      custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/constants/ScadaPlatformMetadata.java
  13. 2 2
      custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/domain/ScadaPlatformResult.java
  14. 40 0
      custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/domain/ScadaPlatformVariables.java
  15. 48 0
      custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/domain/ScadaPlatformVariablesResult.java
  16. 45 0
      custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/manager/InfoSyncManager.java
  17. BIN
      data.mv.db

+ 0 - 5
custom-gateway-core/pom.xml

@@ -31,11 +31,6 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-validation</artifactId>
         </dependency>
-        <!--spring-data-kv-->
-        <dependency>
-            <groupId>org.springframework.data</groupId>
-            <artifactId>spring-data-keyvalue</artifactId>
-        </dependency>
         <!--spring-data-->
         <dependency>
             <groupId>org.springframework.data</groupId>

+ 5 - 1
custom-gateway-core/src/main/java/com/shkpr/service/customgateway/core/domain/po/DeviceTags.java

@@ -1,6 +1,8 @@
 package com.shkpr.service.customgateway.core.domain.po;
 
+import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 import org.springframework.data.annotation.Id;
 import org.springframework.data.relational.core.mapping.Table;
 
@@ -11,6 +13,8 @@ import org.springframework.data.relational.core.mapping.Table;
  * @since 1.0.0
  */
 @Data
+@NoArgsConstructor
+@AllArgsConstructor
 @Table("DEVICE_TAGS")
 public class DeviceTags {
     /**
@@ -25,5 +29,5 @@ public class DeviceTags {
     /**
      * 标签
      */
-    private String TAG;
+    private String tag;
 }

+ 1 - 1
custom-gateway-core/src/main/java/com/shkpr/service/customgateway/core/repository/embedded/AccessKeysRepository.java

@@ -11,6 +11,6 @@ import org.springframework.stereotype.Repository;
  * @since 1.0.0
  */
 @Repository
-public interface AccessKeysRepository extends CrudRepository<AccessKeys, String> {
+public interface AccessKeysRepository extends CrudRepository<AccessKeys, Long> {
 
 }

+ 1 - 1
custom-gateway-core/src/main/java/com/shkpr/service/customgateway/core/repository/embedded/DeviceSequencesRepository.java

@@ -16,7 +16,7 @@ import java.time.LocalDate;
  * @since 1.0.0
  */
 @Repository
-public interface DeviceSequencesRepository extends CrudRepository<DeviceSequences, LocalDate> {
+public interface DeviceSequencesRepository extends CrudRepository<DeviceSequences, Long> {
 
     /**
      * 自增序列

+ 10 - 3
custom-gateway-core/src/main/java/com/shkpr/service/customgateway/core/repository/embedded/DeviceTagsRepository.java

@@ -1,9 +1,9 @@
 package com.shkpr.service.customgateway.core.repository.embedded;
 
 import com.shkpr.service.customgateway.core.domain.po.DeviceTags;
-import org.springframework.data.repository.CrudRepository;
+import org.springframework.data.repository.PagingAndSortingRepository;
 
-import java.time.LocalDate;
+import java.util.List;
 
 /**
  * 设备标签库
@@ -11,5 +11,12 @@ import java.time.LocalDate;
  * @author 欧阳劲驰
  * @since 1.0.0
  */
-public interface DeviceTagsRepository extends CrudRepository<DeviceTags, LocalDate> {
+public interface DeviceTagsRepository extends PagingAndSortingRepository<DeviceTags, Long> {
+    /**
+     * 根据平台名称查询
+     *
+     * @param platformName 平台名称
+     * @return 实体集合
+     */
+    List<DeviceTags> findByPlatformName(String platformName);
 }

+ 13 - 0
custom-gateway-core/src/main/java/com/shkpr/service/customgateway/core/service/DeviceTagsService.java

@@ -1,5 +1,9 @@
 package com.shkpr.service.customgateway.core.service;
 
+import com.shkpr.service.customgateway.core.domain.po.DeviceTags;
+
+import java.util.List;
+
 /**
  * 设备标签service
  *
@@ -7,4 +11,13 @@ package com.shkpr.service.customgateway.core.service;
  * @since 1.0.0
  */
 public interface DeviceTagsService {
+
+    /**
+     * 同步采集标签
+     *
+     * @param platformName 平台名称
+     * @param tags         标签集合
+     * @return 同步状态
+     */
+    Boolean syncDeviceTags(String platformName, List<DeviceTags> tags);
 }

+ 2 - 1
custom-gateway-core/src/main/java/com/shkpr/service/customgateway/core/service/impl/DeviceSequencesServiceImpl.java

@@ -2,6 +2,7 @@ package com.shkpr.service.customgateway.core.service.impl;
 
 
 import com.shkpr.service.customgateway.core.constants.AreaCode;
+import com.shkpr.service.customgateway.core.constants.DataSourceNames;
 import com.shkpr.service.customgateway.core.constants.DeviceKind;
 import com.shkpr.service.customgateway.core.domain.po.DeviceSequences;
 import com.shkpr.service.customgateway.core.repository.embedded.DeviceSequencesRepository;
@@ -34,7 +35,7 @@ public class DeviceSequencesServiceImpl implements DeviceSequencesService {
      * @param date     时间
      * @return 序号
      */
-    @Transactional
+    @Transactional(transactionManager = DataSourceNames.EMBEDDED + "TransactionManager")
     public Integer getNextSequence(AreaCode areaCode, DeviceKind kind, LocalDate date) {
         //自增
         repository.incrementSeqBYDate(areaCode.getCode(), kind.getCode(), date);

+ 61 - 0
custom-gateway-core/src/main/java/com/shkpr/service/customgateway/core/service/impl/DeviceTagsServiceImpl.java

@@ -1,8 +1,19 @@
 package com.shkpr.service.customgateway.core.service.impl;
 
+import com.shkpr.service.customgateway.core.constants.DataSourceNames;
+import com.shkpr.service.customgateway.core.domain.po.DeviceTags;
 import com.shkpr.service.customgateway.core.repository.embedded.DeviceTagsRepository;
 import com.shkpr.service.customgateway.core.service.DeviceTagsService;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * 设备标签service实现
@@ -19,4 +30,54 @@ public class DeviceTagsServiceImpl implements DeviceTagsService {
         this.repository = repository;
     }
 
+    /**
+     * 同步采集标签
+     *
+     * @param platformName 平台名称
+     * @param deviceTags   标签集合
+     * @return 同步状态
+     */
+    @Override
+    @Transactional(DataSourceNames.EMBEDDED + "TransactionManager")
+    public Boolean syncDeviceTags(String platformName, List<DeviceTags> deviceTags) {
+        //参数校验
+        if (StringUtils.isEmpty(platformName)) return false;
+
+        //异常数据处理
+        if (deviceTags == null) deviceTags = Collections.emptyList();
+        deviceTags = deviceTags.stream()
+                .filter(Objects::nonNull)
+                .filter(deviceTag -> Objects.equals(platformName, deviceTag.getPlatformName()))
+                .collect(Collectors.toList());
+
+        //获取已存在的标签集合
+        List<DeviceTags> existingDeviceTags = repository.findByPlatformName(platformName);
+
+        //标签
+        Set<String> tags = deviceTags.stream()
+                .map(DeviceTags::getTag)
+                .collect(Collectors.toSet());
+        //已存在标签
+        Set<String> existingTags = existingDeviceTags.stream()
+                .map(DeviceTags::getTag)
+                .collect(Collectors.toSet());
+
+
+        //需要添加的标签
+        List<DeviceTags> toAdd = deviceTags.stream()
+                .filter(tag -> !existingTags.contains(tag.getTag()))
+                .collect(Collectors.toList());
+
+        //需要删除的标签
+        List<DeviceTags> toDelete = existingDeviceTags.stream()
+                .filter(tag -> !tags.contains(tag.getTag()))
+                .collect(Collectors.toList());
+
+        //执行添加
+        if (CollectionUtils.isNotEmpty(toAdd)) repository.saveAll(toAdd);
+        //执行删除
+        if (CollectionUtils.isNotEmpty(toDelete)) repository.deleteAll(toDelete);
+
+        return true;
+    }
 }

+ 2 - 1
custom-gateway-core/src/main/java/com/shkpr/service/customgateway/core/utils/CallingUtil.java

@@ -18,6 +18,7 @@ import org.springframework.http.HttpMethod;
 import org.springframework.stereotype.Component;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -97,7 +98,7 @@ public class CallingUtil {
                     .setHeaders(headers.toArray(new Header[0]))
                     .execute();
             //解析结果
-            R result = objectMapper.readValue(response.returnContent().toString(), resultType);
+            R result = objectMapper.readValue(response.returnContent().asString(StandardCharsets.UTF_8), resultType);
             if (!result.isOk()) throw new IOException("请求失败: " + result.getMessage());
 
             //返回数据

+ 2 - 2
custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/components/DataCollector.java

@@ -12,7 +12,7 @@ import com.shkpr.service.customgateway.core.utils.CallingUtil;
 import com.shkpr.service.customgateway.core.utils.InfluxDbUtil;
 import com.shkpr.service.customgateway.zhscada.constants.ScadaPlatformMetadata;
 import com.shkpr.service.customgateway.zhscada.domain.ScadaPlatformData;
-import com.shkpr.service.customgateway.zhscada.domain.ScadaPlatformResult;
+import com.shkpr.service.customgateway.zhscada.domain.ScadaPlatformDataResult;
 import org.influxdb.dto.Point;
 import org.springframework.http.HttpMethod;
 import org.springframework.stereotype.Component;
@@ -75,7 +75,7 @@ public class DataCollector {
         Map<String, Object> params = ScadaPlatformMetadata.getRealTimeDataParams(devices);
         //请求结果项
         List<ScadaPlatformData> items = callingUtil.request(url, HttpMethod.GET, params, Collections.emptyList(),
-                new TypeReference<ScadaPlatformResult<List<ScadaPlatformData>>>() {
+                new TypeReference<ScadaPlatformDataResult<List<ScadaPlatformData>>>() {
                 });
         LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME
                 , String.format("拉取数据成功,数据量:%d", items.size()));

+ 93 - 0
custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/components/InfoSynchronizer.java

@@ -0,0 +1,93 @@
+package com.shkpr.service.customgateway.zhscada.components;
+
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.global.base.log.LogLevelFlag;
+import com.global.base.log.LogPrintMgr;
+import com.shkpr.service.customgateway.core.constants.LogFlagBusiType;
+import com.shkpr.service.customgateway.core.domain.po.DeviceTags;
+import com.shkpr.service.customgateway.core.properties.CallingProperties;
+import com.shkpr.service.customgateway.core.service.DeviceTagsService;
+import com.shkpr.service.customgateway.core.utils.CallingUtil;
+import com.shkpr.service.customgateway.zhscada.constants.ScadaPlatformMetadata;
+import com.shkpr.service.customgateway.zhscada.domain.ScadaPlatformVariables;
+import com.shkpr.service.customgateway.zhscada.domain.ScadaPlatformVariablesResult;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.HttpMethod;
+import org.springframework.stereotype.Component;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * 信息同步器
+ *
+ * @author 欧阳劲驰
+ * @since 1.0.0
+ */
+@Component
+@Slf4j
+public class InfoSynchronizer {
+    /**
+     * log
+     */
+    private static final String BIZ_TYPE = "InfoSynchronizer";
+    private static final String CLASS_NAME = LogFlagBusiType.BUSI_ALL.toStrValue();
+
+    final
+    CallingProperties callingProperties;
+    final
+    DeviceTagsService deviceTagsService;
+    final
+    CallingUtil callingUtil;
+
+    public InfoSynchronizer(CallingProperties callingProperties, DeviceTagsService deviceTagsService, CallingUtil callingUtil) {
+        this.callingProperties = callingProperties;
+        this.deviceTagsService = deviceTagsService;
+        this.callingUtil = callingUtil;
+    }
+
+    /**
+     * 同步标签
+     */
+    public void syncDeviceTags() {
+        LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME
+                , "开始同步采集标签,开始拉取数据");
+        long begin = System.currentTimeMillis();
+
+        //对接点
+        CallingProperties.CallingEndpoint endpoint = callingProperties.getEndpoints().get(ScadaPlatformMetadata.NAME);
+        //请求地址
+        String url = endpoint.getUrl() + ScadaPlatformMetadata.Uri.VARIABLES;
+
+        //参数
+        Map<String, Object> params = Collections.singletonMap(ScadaPlatformMetadata.Params.PROJECT_NAME,
+                ScadaPlatformMetadata.DefaultValues.PROJECT_NAME);
+        //请求结果项
+        List<ScadaPlatformVariables> items = callingUtil.request(url, HttpMethod.GET, params, Collections.emptyList(),
+                new TypeReference<ScadaPlatformVariablesResult<List<ScadaPlatformVariables>>>() {
+                });
+        LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME
+                , String.format("拉取标签成功,数据量:%d", items.size()));
+
+
+        //转换采集标签对象
+        List<DeviceTags> dates = items.stream()
+                .map(var -> var.toDeviceTags(ScadaPlatformMetadata.NAME))
+                .collect(Collectors.toList());
+
+        //更新采集标签
+        Boolean upserted = deviceTagsService.syncDeviceTags(ScadaPlatformMetadata.NAME, dates);
+
+        long end = System.currentTimeMillis();
+        LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME
+                , String.format(
+                        "结束执行同步采集标签,同步状态:%s,用时(毫秒):%d"
+                        , upserted
+                        , (end - begin)
+                )
+        );
+    }
+}

+ 6 - 0
custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/constants/ScadaPlatformMetadata.java

@@ -37,6 +37,8 @@ public abstract class ScadaPlatformMetadata {
      * 接口地址
      */
     public interface Uri {
+        //获取变量列表
+        String VARIABLES = "/api/v1/variables";
         //获取实时数据
         String REAL_TIME_DATA = "/api/v1/realvardata";
     }
@@ -46,6 +48,8 @@ public abstract class ScadaPlatformMetadata {
      * 参数
      */
     public interface Params {
+        //项目名
+        String PROJECT_NAME = "projectInstanceName";
         //采集标签名
         String TAG_NAME = "TagName";
     }
@@ -54,6 +58,8 @@ public abstract class ScadaPlatformMetadata {
      * 默认值
      */
     public interface DefaultValues {
+        //项目名
+        String PROJECT_NAME = "大同数据采集服务端";
         //标签分隔符
         String TAG_DELIMITER = ",";
     }

+ 2 - 2
custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/domain/ScadaPlatformResult.java

@@ -7,7 +7,7 @@ import lombok.EqualsAndHashCode;
 import org.apache.http.HttpStatus;
 
 /**
- * Scada平台结果
+ * Scada平台数据结果
  *
  * @param <T> 数据类型
  * @author 欧阳劲驰
@@ -15,7 +15,7 @@ import org.apache.http.HttpStatus;
  */
 @Data
 @EqualsAndHashCode(callSuper = true)
-public class ScadaPlatformResult<T> extends ResultResponse<T> {
+public class ScadaPlatformDataResult<T> extends ResultResponse<T> {
     /**
      * 消息
      */

+ 40 - 0
custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/domain/ScadaPlatformVariables.java

@@ -0,0 +1,40 @@
+package com.shkpr.service.customgateway.zhscada.domain;
+
+import com.shkpr.service.customgateway.core.domain.po.DeviceTags;
+import lombok.Data;
+
+/**
+ * Scada平台变量
+ *
+ * @author 欧阳劲驰
+ * @since 1.0.0
+ */
+@Data
+public class ScadaPlatformVariables {
+    /**
+     * 描述
+     */
+    private String d;
+    /**
+     * 采集标签
+     */
+    private String n;
+    /**
+     * 其他信息
+     */
+    private Integer o;
+    /**
+     * 值类型,1-5分别表bool、int、float、double、string
+     */
+    private String t;
+
+    /**
+     * 转设备标签
+     *
+     * @param platformName 平台名称
+     * @return 设备标签
+     */
+    public DeviceTags toDeviceTags(String platformName) {
+        return new DeviceTags(null, platformName, n);
+    }
+}

+ 48 - 0
custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/domain/ScadaPlatformVariablesResult.java

@@ -0,0 +1,48 @@
+package com.shkpr.service.customgateway.zhscada.domain;
+
+import com.fasterxml.jackson.annotation.JsonAlias;
+import com.shkpr.service.customgateway.core.domain.ResultResponse;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.http.HttpStatus;
+
+/**
+ * Scada平台变量结果
+ *
+ * @param <T> 数据类型
+ * @author 欧阳劲驰
+ * @since 1.0.0
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class ScadaPlatformVariablesResult<T> extends ResultResponse<T> {
+    /**
+     * 消息
+     */
+    private String message;
+    /**
+     * 数据
+     */
+    @JsonAlias("TagValueList")
+    private T objectList;
+
+    @Override
+    public Integer getCode() {
+        return HttpStatus.SC_OK;
+    }
+
+    @Override
+    public String getMessage() {
+        return this.message;
+    }
+
+    @Override
+    public T getData() {
+        return this.objectList;
+    }
+
+    @Override
+    public Boolean isOk() {
+        return !"failed".equals(this.message);
+    }
+}

+ 45 - 0
custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/manager/InfoSyncManager.java

@@ -0,0 +1,45 @@
+package com.shkpr.service.customgateway.zhscada.manager;
+
+import com.shkpr.service.customgateway.zhscada.components.InfoSynchronizer;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+
+/**
+ * 信息同步管理
+ *
+ * @author 欧阳劲驰
+ * @since 1.0.0
+ */
+@Component
+public class InfoSyncManager {
+    final
+    ThreadPoolTaskExecutor taskScheduler;
+    final
+    InfoSynchronizer infoSynchronizer;
+
+    public InfoSyncManager(ThreadPoolTaskExecutor taskScheduler, InfoSynchronizer infoSynchronizer) {
+        this.taskScheduler = taskScheduler;
+        this.infoSynchronizer = infoSynchronizer;
+    }
+
+    /**
+     * 初始化
+     */
+    @PostConstruct
+    public void init() {
+        //同步用户信息
+        taskScheduler.execute(infoSynchronizer::syncDeviceTags);
+    }
+
+    /**
+     * 分钟任务
+     */
+    @Scheduled(cron = "0 */10 * * * *")
+    public void minuteTask() {
+        //同步用户信息
+        taskScheduler.execute(infoSynchronizer::syncDeviceTags);
+    }
+}