Forráskód Böngészése

枣阳dma增加激活状态处理,物联网平台增加密钥加载

欧阳劲驰 2 hete
szülő
commit
0c30cf6b10

+ 10 - 4
bespoke-gateway-app/src/main/resources/application-zydma.yml

@@ -24,6 +24,10 @@ spring:
         user: kpr
         password: kpr.2024@117.influxdb
         database: iot
+#认证
+security:
+  permit-pattern:
+    - ${gateway.routes.zydma}/users/ticket-exchange
 #对接
 calling:
   #对接点
@@ -36,7 +40,9 @@ calling:
       url: http://223.75.194.87:8200/pdserver
       access-key: Data
       secret-key: panda666.
-#认证
-security:
-  permit-pattern:
-    - ${gateway.routes.zydma}/users/ticket-exchange
+#任务
+scheduling:
+  activates:
+    collect-iot-platform: off
+    sync-user-info: off
+    sync-function-info: off

+ 18 - 1
bespoke-gateway-core/src/main/java/com/shkpr/service/bespokegateway/core/manager/IntegrationKeyManager.java

@@ -17,6 +17,9 @@ import javax.annotation.PostConstruct;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
 /**
  * 密钥管理器
@@ -126,7 +129,21 @@ public class IntegrationKeyManager {
                 String.format("开始刷新所有平台密钥,共%d个平台", keyLoaders.size()));
         long begin = System.currentTimeMillis();
 
-        for (String platformName : keyLoaders.keySet()) taskScheduler.execute(() -> refreshKey(platformName));
+        //执行刷新
+        List< Future<?>> futures = keyLoaders.keySet().stream()
+                .map(platformName -> taskScheduler.submit(() -> refreshKey(platformName)))
+                .collect(Collectors.toList());
+
+        //等待线程完成
+        for (Future<?> future : futures) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME
+                        , String.format("刷新所有平台密钥执行异常 error:%s", e)
+                );
+            }
+        }
 
         long end = System.currentTimeMillis();
         LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME

+ 6 - 2
bespoke-gateway-jldma/src/main/java/com/shkpr/service/bespokegateway/jldma/components/IotPlatformKeyLoader.java

@@ -16,10 +16,14 @@ import org.springframework.http.MediaType;
 import org.springframework.stereotype.Component;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.HashMap;
 
 /**
- * 物联网平台
+ * 物联网平台密钥加载器
+ *
+ * @author 欧阳劲驰
+ * @since 1.0.0
  */
 @Component
 public class IotPlatformKeyLoader implements IntegrationKeyLoader {
@@ -70,7 +74,7 @@ public class IotPlatformKeyLoader implements IntegrationKeyLoader {
 
             return new IntegrationKey(null, token.getTokenKey(), token.getTokenValue(), null);
         } catch (IOException e) {
-            throw new RuntimeException(e);
+            throw new UncheckedIOException(e);
         }
     }
 }

+ 7 - 16
bespoke-gateway-zydma/src/main/java/com/shkpr/service/bespokegateway/zydma/components/DataCollector.java

@@ -11,6 +11,7 @@ import com.shkpr.service.bespokegateway.core.domain.Device;
 import com.shkpr.service.bespokegateway.core.domain.DeviceTag;
 import com.shkpr.service.bespokegateway.core.domain.IntegrationKey;
 import com.shkpr.service.bespokegateway.core.exception.SelfException;
+import com.shkpr.service.bespokegateway.core.manager.IntegrationKeyManager;
 import com.shkpr.service.bespokegateway.core.properties.CallingProperties;
 import com.shkpr.service.bespokegateway.core.utils.CallingUtil;
 import com.shkpr.service.bespokegateway.core.utils.InfluxDbUtil;
@@ -27,7 +28,6 @@ import org.springframework.http.HttpMethod;
 import org.springframework.http.MediaType;
 import org.springframework.stereotype.Component;
 
-import java.io.IOException;
 import java.time.LocalDateTime;
 import java.time.temporal.ChronoUnit;
 import java.util.*;
@@ -50,6 +50,8 @@ public class DataCollector {
     final
     CallingProperties callingProperties;
     final
+    IntegrationKeyManager integrationKeyManager;
+    final
     DeviceRegistry deviceRegistry;
     final
     DeviceIdGenerator deviceIdGenerator;
@@ -58,11 +60,9 @@ public class DataCollector {
     final
     CallingUtil callingUtil;
 
-    //密钥
-    public volatile IntegrationKey key = null;
-
-    public DataCollector(CallingProperties callingProperties, DeviceRegistry deviceRegistry, DeviceIdGenerator deviceIdGenerator, InfluxDbUtil influxDbUtil, CallingUtil callingUtil) {
+    public DataCollector(CallingProperties callingProperties, IntegrationKeyManager integrationKeyManager, DeviceRegistry deviceRegistry, DeviceIdGenerator deviceIdGenerator, InfluxDbUtil influxDbUtil, CallingUtil callingUtil) {
         this.callingProperties = callingProperties;
+        this.integrationKeyManager = integrationKeyManager;
         this.deviceRegistry = deviceRegistry;
         this.deviceIdGenerator = deviceIdGenerator;
         this.influxDbUtil = influxDbUtil;
@@ -82,17 +82,8 @@ public class DataCollector {
 
         //平台对接点
         CallingEndpoint endpoint = callingProperties.getEndpoint(IotPlatformMetadata.NAME);
-
-        //===================获取密钥===================
-        try {
-            if (key == null) key = IotPlatformMetadata.getKey(endpoint);
-        } catch (IOException e) {
-            LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_WARN, BIZ_TYPE, CLASS_NAME,
-                    String.format("加载设备序号数据失败,error:%s", e));
-            return;
-        }
-        if (key == null) return;
-
+        //密钥
+        IntegrationKey key = integrationKeyManager.getKey(IotPlatformMetadata.NAME);
         //请求地址
         String url = endpoint.getUrl() + IotPlatformMetadata.Uri.GET_DEVICE_HISTORY_DATA;
         //请求头

+ 73 - 0
bespoke-gateway-zydma/src/main/java/com/shkpr/service/bespokegateway/zydma/components/IotPlatformKeyLoader.java

@@ -0,0 +1,73 @@
+package com.shkpr.service.bespokegateway.zydma.components;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.shkpr.service.bespokegateway.core.domain.CallingEndpoint;
+import com.shkpr.service.bespokegateway.core.domain.IntegrationKey;
+import com.shkpr.service.bespokegateway.core.domain.IntegrationKeyLoader;
+import com.shkpr.service.bespokegateway.zydma.constants.IotPlatformMetadata;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.HashMap;
+
+/**
+ * 物联网平台密钥加载器
+ *
+ * @author 欧阳劲驰
+ * @since 1.0.0
+ */
+@Component
+public class IotPlatformKeyLoader implements IntegrationKeyLoader {
+    final
+    ObjectMapper objectMapper;
+
+    public IotPlatformKeyLoader(ObjectMapper objectMapper) {
+        this.objectMapper = objectMapper;
+    }
+
+    /**
+     * 获取平台名称
+     *
+     * @return 平台名称
+     */
+    @Override
+    public String getPlatformName() {
+        return IotPlatformMetadata.NAME;
+    }
+
+    /**
+     * 加载密钥
+     *
+     * @param endpoint 对接信息
+     * @return 密钥
+     */
+    @Override
+    public IntegrationKey loadKey(CallingEndpoint endpoint) {
+        try {
+            //请求token
+            Response response = Request.Post(endpoint.getUrl() + IotPlatformMetadata.Uri.GENERATE_TOKEN)
+                    .addHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
+                    .bodyByteArray(objectMapper.writeValueAsBytes(
+                            new HashMap<String, String>() {{
+                                put(IotPlatformMetadata.Params.USERNAME, endpoint.getAccessKey());
+                                put(IotPlatformMetadata.Params.PASSWORD, endpoint.getSecretKey());
+                            }}
+                    ))
+                    .execute();
+            //解析token
+            IotPlatformMetadata.TokenResult tokenResult = objectMapper.readValue(response.returnContent().asString(), IotPlatformMetadata.TokenResult.class);
+            IotPlatformMetadata.TokenResult.TokenData data = tokenResult.getData();
+            if (!tokenResult.isOk() || !ObjectUtils.allNotNull(data, data.getUserToken(), data.getTokenType(), data.getAccessToken()))
+                return null;
+            return new IntegrationKey(null, data.getUserToken(), null, data.getTokenType() + " " + data.getAccessToken());
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+}

+ 22 - 40
bespoke-gateway-zydma/src/main/java/com/shkpr/service/bespokegateway/zydma/constants/IotPlatformMetadata.java

@@ -1,21 +1,12 @@
 package com.shkpr.service.bespokegateway.zydma.constants;
 
 import com.fasterxml.jackson.annotation.JsonAlias;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.shkpr.service.bespokegateway.core.constants.*;
-import com.shkpr.service.bespokegateway.core.domain.CallingEndpoint;
-import com.shkpr.service.bespokegateway.core.domain.IntegrationKey;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.Getter;
 import lombok.Setter;
-import org.apache.commons.lang3.ObjectUtils;
-import org.apache.http.HttpHeaders;
-import org.apache.http.client.fluent.Request;
-import org.apache.http.client.fluent.Response;
-import org.springframework.http.MediaType;
 
-import java.io.IOException;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.*;
@@ -28,37 +19,18 @@ import java.util.stream.Collectors;
  * @since 1.0.0
  */
 public abstract class IotPlatformMetadata {
-
-    private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-    private static final ObjectMapper objectMapper = new ObjectMapper();
-
-    //系统名称
-    public static String NAME = "zaoyang-iot";
-    //最大查询小时
-    public static int MAX_QUERY_HOU = (7 * 24) - 1;
-
     /**
-     * 获取密钥
+     * 系统名称
      */
-    public static IntegrationKey getKey(CallingEndpoint endpoint) throws IOException {
-        //请求token
-        Response response = Request.Post(endpoint.getUrl() + Uri.GENERATE_TOKEN)
-                .addHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
-                .bodyByteArray(objectMapper.writeValueAsBytes(
-                        new HashMap<String, String>() {{
-                            put(Params.USERNAME, endpoint.getAccessKey());
-                            put(Params.PASSWORD, endpoint.getSecretKey());
-                        }}
-                ))
-                .execute();
-        //解析token
-        TokenResult tokenResult = objectMapper.readValue(response.returnContent().asString(), TokenResult.class);
-        TokenResult.TokenData data = tokenResult.getData();
-        if (!tokenResult.isOk() || !ObjectUtils.allNotNull(data, data.getUserToken(), data.getTokenType(), data.getAccessToken()))
-            return null;
-
-        return new IntegrationKey(null, data.getUserToken(), null, data.getTokenType() + " " + data.getAccessToken());
-    }
+    public static final String NAME = "zaoyang-iot";
+    /**
+     * 最大查询小时
+     */
+    public static final int MAX_QUERY_HOU = (7 * 24) - 1;
+    /**
+     * 日期格式
+     */
+    private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
 
     /**
      * 获取历史数据参数
@@ -190,10 +162,20 @@ public abstract class IotPlatformMetadata {
     }
 
     /**
+     * 任务key
+     */
+    public interface SchedulingKeys {
+        /**
+         * 采集物联网平台
+         */
+        String COLLECT_IOT_PLATFORM = "collect-iot-platform";
+    }
+
+    /**
      * token结果
      */
     @Data
-    private static class TokenResult {
+    public static class TokenResult {
         private Integer code;
         private String msg;
         private TokenData data;
@@ -203,7 +185,7 @@ public abstract class IotPlatformMetadata {
         }
 
         @Data
-        private static class TokenData {
+        public static class TokenData {
             @JsonAlias("user_token")
             private String userToken;
             @JsonAlias("access_token")

+ 14 - 0
bespoke-gateway-zydma/src/main/java/com/shkpr/service/bespokegateway/zydma/constants/MiddlePlatformMetadata.java

@@ -103,4 +103,18 @@ public abstract class MiddlePlatformMetadata extends IntegrationMetadata {
         //用户id
         String USER_ID = "-1";
     }
+
+    /**
+     * 任务key
+     */
+    public interface SchedulingKeys {
+        /**
+         * 同步设备标签
+         */
+        String SYNC_USER_INFO = "sync-user-info";
+        /**
+         * 采集scada
+         */
+        String SYNC_FUNCTION_INFO = "sync-function-info";
+    }
 }

+ 10 - 3
bespoke-gateway-zydma/src/main/java/com/shkpr/service/bespokegateway/zydma/manager/DataManager.java

@@ -1,6 +1,8 @@
 package com.shkpr.service.bespokegateway.zydma.manager;
 
+import com.shkpr.service.bespokegateway.core.properties.SchedulingProperties;
 import com.shkpr.service.bespokegateway.zydma.components.DataCollector;
+import com.shkpr.service.bespokegateway.zydma.constants.IotPlatformMetadata;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
@@ -16,11 +18,14 @@ import javax.annotation.PostConstruct;
 @Component
 public class DataManager {
     final
+    SchedulingProperties schedulingProperties;
+    final
     ThreadPoolTaskExecutor taskScheduler;
     final
     DataCollector dataCollector;
 
-    public DataManager(ThreadPoolTaskExecutor taskScheduler, DataCollector dataCollector) {
+    public DataManager(SchedulingProperties schedulingProperties, ThreadPoolTaskExecutor taskScheduler, DataCollector dataCollector) {
+        this.schedulingProperties = schedulingProperties;
         this.taskScheduler = taskScheduler;
         this.dataCollector = dataCollector;
     }
@@ -31,7 +36,8 @@ public class DataManager {
     @PostConstruct
     public void init() {
         //采集物联网平台
-        taskScheduler.execute(() -> dataCollector.collectIotPlatform(30 * 24));
+        if (schedulingProperties.isTaskActive(IotPlatformMetadata.SchedulingKeys.COLLECT_IOT_PLATFORM))
+            taskScheduler.execute(() -> dataCollector.collectIotPlatform(30 * 24));
     }
 
     /**
@@ -40,6 +46,7 @@ public class DataManager {
     @Scheduled(cron = "0 */15 0,6,12,18 * * ?")
     public void minuteTask() {
         //采集物联网平台
-        taskScheduler.execute(() -> dataCollector.collectIotPlatform(2 * 6));
+        if (schedulingProperties.isTaskActive(IotPlatformMetadata.SchedulingKeys.COLLECT_IOT_PLATFORM))
+            taskScheduler.execute(() -> dataCollector.collectIotPlatform(2 * 6));
     }
 }

+ 16 - 5
bespoke-gateway-zydma/src/main/java/com/shkpr/service/bespokegateway/zydma/manager/InfoSyncManager.java

@@ -1,6 +1,8 @@
 package com.shkpr.service.bespokegateway.zydma.manager;
 
+import com.shkpr.service.bespokegateway.core.properties.SchedulingProperties;
 import com.shkpr.service.bespokegateway.zydma.components.InfoSynchronizer;
+import com.shkpr.service.bespokegateway.zydma.constants.MiddlePlatformMetadata;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
@@ -16,11 +18,14 @@ import javax.annotation.PostConstruct;
 @Component
 public class InfoSyncManager {
     final
+    SchedulingProperties schedulingProperties;
+    final
     ThreadPoolTaskExecutor taskScheduler;
     final
     InfoSynchronizer infoSynchronizer;
 
-    public InfoSyncManager(ThreadPoolTaskExecutor taskScheduler, InfoSynchronizer infoSynchronizer) {
+    public InfoSyncManager(SchedulingProperties schedulingProperties, ThreadPoolTaskExecutor taskScheduler, InfoSynchronizer infoSynchronizer) {
+        this.schedulingProperties = schedulingProperties;
         this.taskScheduler = taskScheduler;
         this.infoSynchronizer = infoSynchronizer;
     }
@@ -31,8 +36,11 @@ public class InfoSyncManager {
     @PostConstruct
     public void init() {
         //同步用户信息
-        taskScheduler.execute(infoSynchronizer::syncUserInfo);
-        taskScheduler.execute(infoSynchronizer::syncFunctionInfo);
+        if (schedulingProperties.isTaskActive(MiddlePlatformMetadata.SchedulingKeys.SYNC_USER_INFO))
+            taskScheduler.execute(infoSynchronizer::syncUserInfo);
+        //同步角色信息
+        if (schedulingProperties.isTaskActive(MiddlePlatformMetadata.SchedulingKeys.SYNC_FUNCTION_INFO))
+            taskScheduler.execute(infoSynchronizer::syncFunctionInfo);
     }
 
     /**
@@ -41,7 +49,10 @@ public class InfoSyncManager {
     @Scheduled(cron = "0 */10 * * * *")
     public void minuteTask() {
         //同步用户信息
-        taskScheduler.execute(infoSynchronizer::syncUserInfo);
-        taskScheduler.execute(infoSynchronizer::syncFunctionInfo);
+        if (schedulingProperties.isTaskActive(MiddlePlatformMetadata.SchedulingKeys.SYNC_USER_INFO))
+            taskScheduler.execute(infoSynchronizer::syncUserInfo);
+        //同步角色信息
+        if (schedulingProperties.isTaskActive(MiddlePlatformMetadata.SchedulingKeys.SYNC_FUNCTION_INFO))
+            taskScheduler.execute(infoSynchronizer::syncFunctionInfo);
     }
 }