欧阳劲驰 1 месяц назад
Родитель
Сommit
08daba68cd

+ 1 - 1
custom-gateway-app/src/main/resources/application-zhscada.yml

@@ -27,7 +27,7 @@ calling:
       url: http://119.96.174.191:9434
 #迁移
 migrate:
-  #表名
+  #模式
   schema: zh_scada
   #传感器映射文件
   sensor-map-path: classpath*:sensor-id.yml

+ 1 - 1
custom-gateway-app/src/main/resources/zhscada-backup/application-zhscada.yml

@@ -27,7 +27,7 @@ calling:
       url: http://119.96.174.191:9434
 #迁移
 migrate:
-  #表名
+  #模式
   schema: scada
   #传感器映射文件
   sensor-map-path: file:E:\kpr\kpr-custom-gateway\sensor-id.yml

+ 2 - 0
custom-gateway-core/src/main/java/com/shkpr/service/customgateway/core/utils/InfluxDbUtil.java

@@ -8,6 +8,7 @@ import com.shkpr.service.customgateway.core.constants.LogFlagBusiType;
 import com.shkpr.service.customgateway.core.domain.InfluxDbClient;
 import com.shkpr.service.customgateway.core.properties.InfluxDbProperties;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
 import org.influxdb.dto.BatchPoints;
 import org.influxdb.dto.Point;
 import org.influxdb.dto.Query;
@@ -72,6 +73,7 @@ public class InfluxDbUtil {
      */
     public void write(List<Point> points) {
         try {
+            if (CollectionUtils.isEmpty(points)) return;
             LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME
                     , String.format("开始批量写入InfluxDb,数据量:%d", points.size()));
             long begin = System.currentTimeMillis();

+ 2 - 0
custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/components/DataMigrator.java

@@ -94,6 +94,8 @@ public class DataMigrator {
                 //构建influxdb
                 List<Point> points = dates.parallelStream().map(d -> d.toPoint(devices, sensorMap))
                         .filter(Objects::nonNull).collect(Collectors.toList());
+                dates.clear();
+
                 //写入influxDb
                 influxDbUtil.write(points);
             }

+ 4 - 2
custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/mapper/scada/SensorCollectDataMapper.java

@@ -18,14 +18,16 @@ public interface SensorCollectDataMapper {
     /**
      * 根据传感器code查询时间范围
      *
-     * @param code 传感器code
+     * @param schema 模式
+     * @param code   传感器code
      * @return 时间范围
      */
-    SensorCollectData findTimeRangeByCode(@Param("schema") String schema,@Param("code") String code);
+    SensorCollectData findTimeRangeByCode(@Param("schema") String schema, @Param("code") String code);
 
     /**
      * 根据传感器code查询对齐值
      *
+     * @param schema       模式
      * @param startTime    开始时间
      * @param endTime      结束时间
      * @param interval     采集间隔

+ 1 - 1
custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/properties/MigrateProperties.java

@@ -19,7 +19,7 @@ public class MigrateProperties {
      */
     private String sensorMapPath;
     /**
-     * 表名
+     * 模式
      */
     private String schema;
     /**

+ 2 - 2
custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/service/SensorCollectDataService.java

@@ -15,7 +15,7 @@ public interface SensorCollectDataService {
     /**
      * 查询时间范围
      *
-     * @param schema 架构名
+     * @param schema 模式
      * @param code   传感器code
      * @return 时间范围
      */
@@ -24,7 +24,7 @@ public interface SensorCollectDataService {
     /**
      * 查询对齐值
      *
-     * @param schema       架构名
+     * @param schema       模式
      * @param startTime    开始时间
      * @param endTime      结束时间
      * @param interval     采集间隔

+ 2 - 2
custom-gateway-zhscada/src/main/java/com/shkpr/service/customgateway/zhscada/service/impl/SensorCollectDataServiceImpl.java

@@ -48,7 +48,7 @@ public class SensorCollectDataServiceImpl implements SensorCollectDataService {
     /**
      * 根据传感器code查询时间范围
      *
-     * @param schema 架构名
+     * @param schema 模式
      * @param code   传感器code
      * @return 时间范围
      */
@@ -60,7 +60,7 @@ public class SensorCollectDataServiceImpl implements SensorCollectDataService {
     /**
      * 根据传感器code查询对齐值
      *
-     * @param schema       架构名
+     * @param schema       模式
      * @param startTime    开始时间
      * @param endTime      结束时间
      * @param interval     采集间隔

+ 17 - 12
custom-gateway-zhscada/src/main/resources/mapper/SensorCollectDataMapper.xml

@@ -2,8 +2,9 @@
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 <mapper namespace="com.shkpr.service.customgateway.zhscada.mapper.scada.SensorCollectDataMapper">
     <select id="findTimeRangeByCode" resultType="com.shkpr.service.customgateway.zhscada.domain.po.SensorCollectData">
-        select min(coll_time) as min_time,
-               max(coll_time) as max_time
+        select /*+ PARALLEL(8) */
+            min(coll_time) as min_time,
+            max(coll_time) as max_time
         from ${schema}.t_sersor_coll_data_day
         where sensor_code = #{code,jdbcType=VARCHAR}
     </select>
@@ -11,23 +12,27 @@
     <select id="findAlignByTimeAndCode" resultType="com.shkpr.service.customgateway.zhscada.domain.po.SensorCollectData"
             fetchSize="3000">
         with time_seq (start_time)
-                 as (select trunc(#{startTime,jdbcType=TIMESTAMP}, #{alignUnit,jdbcType=VARCHAR}) as start_time
+                 as (select /*+ PARALLEL(8) */
+                         trunc(#{startTime,jdbcType=TIMESTAMP}, #{alignUnit,jdbcType=VARCHAR}) as start_time
                      from dual
                      union all
-                     select start_time + numtodsinterval(#{interval,jdbcType=INTEGER}, #{intervalUnit,jdbcType=VARCHAR})
+                     select /*+ PARALLEL(8) */
+                         start_time + numtodsinterval(#{interval,jdbcType=INTEGER}, #{intervalUnit,jdbcType=VARCHAR})
                      from time_seq
                      where start_time +
                            numtodsinterval(#{interval,jdbcType=INTEGER}, #{intervalUnit,jdbcType=VARCHAR}) &lt;
                            #{endTime,jdbcType=TIMESTAMP})
-        select ts.start_time  as time,
-               sh.coll_value  as value,
-               sh.sensor_code as code
+        select /*+ PARALLEL(8) */
+            ts.start_time  as time,
+            sh.coll_value  as value,
+            sh.sensor_code as code
         from time_seq ts
-                 left join (select sensor_code,
-                                   coll_value,
-                                   trunc(coll_time, #{alignUnit,jdbcType=VARCHAR})                                as minute_trunc,
-                                   row_number() over (partition by trunc(coll_time, #{alignUnit,jdbcType=VARCHAR}), sensor_code
-                                       order by abs(coll_time - trunc(coll_time, #{alignUnit,jdbcType=VARCHAR}))) as rn
+                 left join (select /*+ PARALLEL(8) */
+                                sensor_code,
+                                coll_value,
+                                trunc(coll_time, #{alignUnit,jdbcType=VARCHAR})                                as minute_trunc,
+                                row_number() over (partition by trunc(coll_time, #{alignUnit,jdbcType=VARCHAR}), sensor_code
+                                    order by abs(coll_time - trunc(coll_time, #{alignUnit,jdbcType=VARCHAR}))) as rn
                             from ${schema}.t_sersor_coll_data_day
                             where sensor_code = #{code,jdbcType=VARCHAR}
                               and coll_time &gt;= #{startTime,jdbcType=TIMESTAMP}