浏览代码

实现中环Scada采集

欧阳劲驰 2 周之前
父节点
当前提交
ab1bedcb9f

+ 1 - 1
custom-gateway-app/pom.xml

@@ -76,7 +76,7 @@
                     <skipTests>true</skipTests>
                     <systemPropertyVariables>
                         <spring.datasource.config-path>../db_config.yml</spring.datasource.config-path>
-                        <device.map-path>../dev_map.yml</device.map-path>
+                        <device.map-path>../dev_${enable.module}.yml</device.map-path>
                         <spring.datasource.multi.embedded.url>jdbc:h2:file:../data
                         </spring.datasource.multi.embedded.url>
                     </systemPropertyVariables>

+ 6 - 0
custom-gateway-app/src/main/resources/application-zhscada.yml

@@ -0,0 +1,6 @@
+#对接
+calling:
+  #对接点
+  endpoints:
+    scada-platform:
+      url: http://119.96.174.191:9434

+ 1 - 1
custom-gateway-app/src/main/resources/application.yml

@@ -74,4 +74,4 @@ temp-file:
   max-age: PT4H
 #设备
 device:
-  map-path: ./dev_map.yml
+  map-path: ./dev_@enable.module@.yml

+ 93 - 0
custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/components/DataCollector.java

@@ -0,0 +1,93 @@
+package com.shkpr.service.customgateway.zhscada.components;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.global.base.log.LogLevelFlag;
+import com.global.base.log.LogPrintMgr;
+import com.shkpr.service.customgateway.core.components.DeviceIdGenerator;
+import com.shkpr.service.customgateway.core.components.DeviceRegistry;
+import com.shkpr.service.customgateway.core.constants.LogFlagBusiType;
+import com.shkpr.service.customgateway.core.domain.Device;
+import com.shkpr.service.customgateway.core.properties.CallingProperties;
+import com.shkpr.service.customgateway.core.utils.CallingUtil;
+import com.shkpr.service.customgateway.core.utils.InfluxDbUtil;
+import com.shkpr.service.customgateway.zhscada.constants.ScadaPlatformMetadata;
+import com.shkpr.service.customgateway.zhscada.domain.ScadaPlatformData;
+import com.shkpr.service.customgateway.zhscada.domain.ScadaPlatformResult;
+import org.influxdb.dto.Point;
+import org.springframework.http.HttpMethod;
+import org.springframework.stereotype.Component;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * 数据采集器
+ *
+ * @author 欧阳劲驰
+ * @since 1.0.0
+ */
+@Component
+public class DataCollector {
+    /**
+     * log
+     */
+    private static final String CLASS_NAME = "DataCollector";
+    private static final String BIZ_TYPE = LogFlagBusiType.BUSI_ALL.toStrValue();
+
+    final
+    CallingProperties callingProperties;
+    final
+    DeviceRegistry deviceRegistry;
+    final
+    DeviceIdGenerator deviceIdGenerator;
+    final
+    InfluxDbUtil influxDbUtil;
+    final
+    CallingUtil callingUtil;
+
+    public DataCollector(CallingProperties callingProperties, DeviceRegistry deviceRegistry, DeviceIdGenerator deviceIdGenerator, InfluxDbUtil influxDbUtil, CallingUtil callingUtil) {
+        this.callingProperties = callingProperties;
+        this.deviceRegistry = deviceRegistry;
+        this.deviceIdGenerator = deviceIdGenerator;
+        this.influxDbUtil = influxDbUtil;
+        this.callingUtil = callingUtil;
+    }
+
+    /**
+     * 采集scada
+     */
+    public void collectScada() {
+        LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME
+                , "开始采集Scada数据,开始拉取数据");
+        long begin = System.currentTimeMillis();
+
+        //平台对接点
+        CallingProperties.CallingEndpoint endpoint = callingProperties.getEndpoints().get(ScadaPlatformMetadata.NAME);
+        //请求地址
+        String url = endpoint.getUrl() + ScadaPlatformMetadata.Uri.REAL_TIME_DATA;
+
+        //设备
+        List<Device> devices = deviceRegistry.findAll();
+
+        //参数
+        Map<String, Object> params = ScadaPlatformMetadata.getRealTimeDataParams(devices);
+        //请求结果项
+        List<ScadaPlatformData> items = callingUtil.request(url, HttpMethod.GET, params, Collections.emptyList(),
+                new TypeReference<ScadaPlatformResult<List<ScadaPlatformData>>>() {
+                });
+        LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME
+                , String.format("拉取数据成功,数据量:%d", items.size()));
+
+        //构建influxdb
+        List<Point> points = items.parallelStream().map(d -> d.toPoint(devices)).collect(Collectors.toList());
+        //写入influxDb
+        influxDbUtil.write(points);
+
+        long end = System.currentTimeMillis();
+        LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME
+                , String.format("结束采集Scada数据 用时(毫秒):%d", (end - begin))
+        );
+    }
+}

+ 68 - 0
custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/constants/ScadaPlatformMetadata.java

@@ -0,0 +1,68 @@
+package com.shkpr.service.customgateway.zhscada.constants;
+
+import com.shkpr.service.customgateway.core.constants.AreaCode;
+import com.shkpr.service.customgateway.core.domain.Device;
+import com.shkpr.service.customgateway.core.domain.DeviceTag;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Scada平台集成元数据
+ *
+ * @author 欧阳劲驰
+ * @since 1.0.0
+ */
+public abstract class ScadaPlatformMetadata {
+    //系统名称
+    public static String NAME = "scadaPlatform";
+
+    /**
+     * 获取实时数据参数
+     *
+     * @param devices 设备列表
+     * @return 参数
+     */
+    public static Map<String, Object> getRealTimeDataParams(List<Device> devices) {
+        return Collections.singletonMap(Params.TAG_NAME,
+                devices.stream().flatMap(it ->
+                        it.getTags().stream()
+                                .map(DeviceTag::getTag)
+                ).collect(Collectors.joining(DefaultValues.TAG_DELIMITER)));
+    }
+
+    /**
+     * 接口地址
+     */
+    public interface Uri {
+        //获取实时数据
+        String REAL_TIME_DATA = "/api/v1/realvardata";
+    }
+
+
+    /**
+     * 参数
+     */
+    public interface Params {
+        //采集标签名
+        String TAG_NAME = "TagName";
+    }
+
+    /**
+     * 默认值
+     */
+    public interface DefaultValues {
+        //标签分隔符
+        String TAG_DELIMITER = ",";
+    }
+
+    /**
+     * 设备
+     */
+    public interface Devices {
+        //区号
+        AreaCode AREA_CODE = AreaCode.ZAO_YANG;
+    }
+}

+ 87 - 0
custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/domain/ScadaPlatformData.java

@@ -0,0 +1,87 @@
+package com.shkpr.service.customgateway.zhscada.domain;
+
+import com.fasterxml.jackson.annotation.JsonAlias;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.shkpr.service.customgateway.core.constants.InfluxdbMetadata;
+import com.shkpr.service.customgateway.core.domain.Device;
+import com.shkpr.service.customgateway.core.domain.DeviceTag;
+import lombok.Data;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.influxdb.dto.Point;
+import org.springframework.format.annotation.DateTimeFormat;
+
+import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Scada平台数据
+ *
+ * @author 欧阳劲驰
+ * @since 1.0.0
+ */
+@Data
+public class ScadaPlatformData {
+    /**
+     * 采集标签
+     */
+    @JsonAlias("N")
+    private String n;
+    /**
+     * 质量戳(192为GOOD)
+     */
+    @JsonAlias("Q")
+    private Integer q;
+    /**
+     * 时间戳UTC时间
+     */
+    @JsonAlias("T")
+    @JsonFormat(pattern = "yyyy-MM-dd  H:m:s", locale = "zh_CN", timezone = "UTC")
+    @DateTimeFormat(pattern = "yyyy-MM-dd  H:mm:ss")
+    private ZonedDateTime t;
+    /**
+     * 实时值,值可能空
+     */
+    @JsonAlias("V")
+    private String v;
+
+    /**
+     * 转influxdb对象
+     *
+     * @param devices 设备列表
+     * @return influxdb对象
+     */
+    public Point toPoint(List<Device> devices) {
+        if (StringUtils.isAnyBlank(n, v) || t == null || !NumberUtils.isParsable(v)) return null;
+        //获取设备
+        Device device = devices.stream()
+                .filter(d -> d.getTags().stream().map(DeviceTag::getTag).anyMatch(n::equals))
+                .findFirst().orElse(null);
+        if (device == null) return null;
+        //获取标签
+        DeviceTag tag = device.getTags().stream()
+                .filter(t -> Objects.equals(n, t.getTag()))
+                .findFirst().orElse(null);
+        if (tag == null) return null;
+        //获取时间戳
+        long timestamp = this.t
+                .truncatedTo(ChronoUnit.MINUTES)
+                .toInstant()
+                .toEpochMilli();
+
+        //构建influxdb
+        return Point
+                //表名
+                .measurement(tag.getMeasurement())
+                //设备id
+                .tag(InfluxdbMetadata.Tags.DEVICE_ID, device.getDeviceId())
+                //时间
+                .time(timestamp, TimeUnit.MILLISECONDS)
+                //值
+                .addField(tag.getField(), Double.parseDouble(v))
+                .build();
+    }
+}

+ 44 - 0
custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/domain/ScadaPlatformResult.java

@@ -0,0 +1,44 @@
+package com.shkpr.service.customgateway.zhscada.domain;
+
+import com.fasterxml.jackson.annotation.JsonAlias;
+import com.shkpr.service.customgateway.core.domain.ResultResponse;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.http.HttpStatus;
+
+/**
+ * Scada平台结果
+ *
+ * @param <T> 数据类型
+ * @author 欧阳劲驰
+ * @since 1.0.0
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class ScadaPlatformResult<T> extends ResultResponse<T> {
+    /**
+     * 数据
+     */
+    @JsonAlias("TagValueList")
+    private T tagValueList;
+
+    @Override
+    public Integer getCode() {
+        return HttpStatus.SC_OK;
+    }
+
+    @Override
+    public String getMessage() {
+        return null;
+    }
+
+    @Override
+    public T getData() {
+        return this.tagValueList;
+    }
+
+    @Override
+    public Boolean isOk() {
+        return true;
+    }
+}

+ 46 - 0
custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/manager/DataManager.java

@@ -0,0 +1,46 @@
+package com.shkpr.service.customgateway.zhscada.manager;
+
+
+import com.shkpr.service.customgateway.zhscada.components.DataCollector;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+
+/**
+ * 物联网管理
+ *
+ * @author 欧阳劲驰
+ * @serial 1.0.0
+ */
+@Component
+public class DataManager {
+    final
+    ThreadPoolTaskExecutor taskScheduler;
+    final
+    DataCollector dataCollector;
+
+    public DataManager(ThreadPoolTaskExecutor taskScheduler, DataCollector dataCollector) {
+        this.taskScheduler = taskScheduler;
+        this.dataCollector = dataCollector;
+    }
+
+    /**
+     * 初始化
+     */
+    @PostConstruct
+    public void init() {
+        //采集实时数据
+        taskScheduler.execute(dataCollector::collectScada);
+    }
+
+    /**
+     * 小时任务
+     */
+    @Scheduled(cron = "0 */15 * * * ?")
+    public void minuteTask() {
+        //采集实时数据
+        taskScheduler.execute(dataCollector::collectScada);
+    }
+}

+ 9 - 2
db_config.yml

@@ -1,12 +1,14 @@
 spring:
-  #jdbc
+  #数据源
   datasource:
     multi:
+      #主要
       primary:
         url: jdbc:postgresql://223.75.194.87:6057/watersmart?useSSL=false&useAffectedRows=false&allowMultiQueries=true&rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai
         username: postgres
         password: kpr.23417.postgres
         driver-class-name: org.postgresql.Driver
+      #嵌入式
       embedded:
         url: jdbc:h2:file:./data
         driver-class-name: org.h2.Driver
@@ -25,4 +27,9 @@ spring:
       - url: http://119.96.165.176:8086/
         user: kpr
         password: kpr.2024@117.influxdb
-        database: iot
+        database: iot
+      #中环
+#      - url: http://127.0.0.1:8086/
+#        user: kpr
+#        password: kpr.2025@117.influxdb
+#        database: iot

+ 21 - 0
dev_zhscada.yml

@@ -0,0 +1,21 @@
+---
+- deviceId: "710022511110001"
+  deviceName: "深圳大道与机场路"
+  deviceKind: "flow"
+  createTime: "2025-11-06 17:45:44"
+  tags:
+    - tag: "gw_szddyjcl_ssll"
+      protocol: "json"
+      measurement: "WaterMeter"
+      field: "flow_cur"
+      valueType: "double"
+    - tag: "gw_sdddyjcl_zljll"
+      protocol: "json"
+      measurement: "WaterMeter"
+      field: "flow_total_pos"
+      valueType: "double"
+    - tag: "gw_szddyjcl_fljll"
+      protocol: "json"
+      measurement: "WaterMeter"
+      field: "flow_total_rev"
+      valueType: "double"

dev_map.yml → dev_zydma.yml