Переглянути джерело

实现爆管分析压力计数据迁移

欧阳劲驰 4 місяців тому
батько
коміт
c9c2d0789c

+ 73 - 0
src/main/java/com/shkpr/service/alambizplugin/bizservice/PipeBurstDataBizService.java

@@ -0,0 +1,73 @@
+package com.shkpr.service.alambizplugin.bizservice;
+
+import com.global.base.log.LogLevelFlag;
+import com.global.base.log.LogPrintMgr;
+import com.shkpr.service.alambizplugin.commtools.InfluxDbUtil;
+import com.shkpr.service.alambizplugin.constants.InfluxdbMetadata;
+import com.shkpr.service.alambizplugin.constants.LogFlagBusiType;
+import com.shkpr.service.alambizplugin.dbdao.services.intef.PipeBurstDataPressCurService;
+import com.shkpr.service.alambizplugin.dto.PipeBurstDataPressCur;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+/**
+ * 爆管数据service
+ *
+ * @author 欧阳劲驰
+ * @since 1.0.0
+ */
+@Component
+public class PipeBurstDataBizService {
+    /**
+     * log
+     */
+    private final String mStrClassName;
+    private final String mBizType;
+
+    private final PipeBurstDataPressCurService dataPressCurService;
+    private final InfluxDbUtil influxDbUtil;
+
+    public PipeBurstDataBizService(PipeBurstDataPressCurService dataPressCurService, InfluxDbUtil influxDbUtil) {
+        mStrClassName = "PipeBurstDataPressCurServiceImpl";
+        mBizType = LogFlagBusiType.BUSI_PIPE_BURST.toStrValue();
+        this.dataPressCurService = dataPressCurService;
+        this.influxDbUtil = influxDbUtil;
+
+        migratePressCur();
+    }
+
+    /**
+     * 迁移压力计
+     * <p>每小时的05分</p>
+     */
+    @Scheduled(cron = "0 5 * * * *")
+    public void migratePressCur() {
+        LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
+                , "开始执行压力计数据迁移任务");
+        long begin = System.currentTimeMillis();
+
+        //读取数据
+        List<PipeBurstDataPressCur> dates = influxDbUtil.query(InfluxdbMetadata.SQL.READ_PRESS_CUR_SQL, PipeBurstDataPressCur.class);
+        LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
+                , String.format(
+                        "读取InfluxDb成功,数据量:%s"
+                        , dates.size()
+                )
+        );
+
+        //批量合并
+        Boolean upserted = dataPressCurService.upsertAll(dates);
+
+        long end = System.currentTimeMillis();
+        LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
+                , String.format(
+                        "结束执行压力计数据迁移任务,迁移状态:%s,用时(毫秒):%d"
+                        , upserted
+                        , (end - begin)
+                )
+        );
+    }
+
+}

Різницю між файлами не показано, бо вона завелика
+ 8 - 1
src/main/java/com/shkpr/service/alambizplugin/constants/InfluxdbMetadata.java


+ 30 - 0
src/main/java/com/shkpr/service/alambizplugin/dbdao/mapper/PipeBurstDataPressCurMapper.java

@@ -0,0 +1,30 @@
+package com.shkpr.service.alambizplugin.dbdao.mapper;
+
+import com.shkpr.service.alambizplugin.dto.PipeBurstDataPressCur;
+import org.apache.ibatis.annotations.Mapper;
+
+import java.util.List;
+
+/**
+ * 爆管数据压力计mapper
+ *
+ * @author 欧阳劲驰
+ * @since 1.0.0
+ */
+@Mapper
+public interface PipeBurstDataPressCurMapper {
+    /**
+     * 合并操作
+     *
+     * @param record 对象
+     * @return 合并数量
+     */
+    int upsert(PipeBurstDataPressCur record);
+
+    /**
+     * 查询全部
+     *
+     * @return 数据集合
+     */
+    List<PipeBurstDataPressCur> selectAll();
+}

+ 99 - 0
src/main/java/com/shkpr/service/alambizplugin/dbdao/services/PipeBurstDataPressCurServiceImpl.java

@@ -0,0 +1,99 @@
+package com.shkpr.service.alambizplugin.dbdao.services;
+
+import com.global.base.log.LogLevelFlag;
+import com.global.base.log.LogPrintMgr;
+import com.shkpr.service.alambizplugin.constants.LogFlagBusiType;
+import com.shkpr.service.alambizplugin.dbdao.mapper.PipeBurstDataPressCurMapper;
+import com.shkpr.service.alambizplugin.dbdao.services.intef.PipeBurstDataPressCurService;
+import com.shkpr.service.alambizplugin.dto.PipeBurstDataPressCur;
+import org.apache.ibatis.session.ExecutorType;
+import org.apache.ibatis.session.SqlSession;
+import org.apache.ibatis.session.SqlSessionFactory;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Service;
+
+import java.sql.Connection;
+import java.util.List;
+
+/**
+ * 爆管数据压力计service实现
+ *
+ * @author 欧阳劲驰
+ * @since 1.0.0
+ */
+@Service
+public class PipeBurstDataPressCurServiceImpl implements PipeBurstDataPressCurService {
+    /**
+     * log
+     */
+    private final String mStrClassName;
+    private final String mBizType;
+
+    private final SqlSessionFactory mainSqlSessionFactory;
+
+
+    public PipeBurstDataPressCurServiceImpl(@Qualifier("mainSqlSessionFactory") SqlSessionFactory mainSqlSessionFactory) {
+        mStrClassName = "PipeBurstDataPressCurServiceImpl";
+        mBizType = LogFlagBusiType.BUSI_PIPE_BURST.toStrValue();
+        this.mainSqlSessionFactory = mainSqlSessionFactory;
+    }
+
+    /**
+     * 批量合并操作
+     *
+     * @param dates 数据集合
+     * @return 合并状态
+     */
+    @Override
+    public Boolean upsertAll(List<PipeBurstDataPressCur> dates) {
+        LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
+                , String.format(
+                        "开始批量写入压力计数据,开启批处理 数据量:%s"
+                        , dates.size()
+                )
+        );
+        long begin = System.currentTimeMillis();
+
+        //开启批处理
+        try (SqlSession sqlSession = mainSqlSessionFactory.openSession(ExecutorType.BATCH, false)) {
+            try {
+                //设置手动提交
+                Connection conn = sqlSession.getConnection();
+                conn.setAutoCommit(false);
+
+                //从session获取mapper
+                PipeBurstDataPressCurMapper dataPressCurMapper = sqlSession.getMapper(PipeBurstDataPressCurMapper.class);
+
+                //批量合并
+                dates.forEach(dataPressCurMapper::upsert);
+
+                //发送sql至数据库
+                sqlSession.flushStatements();
+                //提交
+                sqlSession.commit();
+                conn.commit();
+
+                long end = System.currentTimeMillis();
+                LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
+                        , String.format(
+                                "结束批量写入压力计数据,提交并关闭批处理 用时(毫秒):%d"
+                                , (end - begin)
+                        )
+                );
+                return true;
+            } catch (Exception e) {
+                //回滚
+                sqlSession.rollback();
+
+                LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
+                        , String.format(
+                                "批量写入压力计数据,回滚操作 error:%s"
+                                , e
+                        )
+                );
+
+                return false;
+            }
+        }
+    }
+}

+ 21 - 0
src/main/java/com/shkpr/service/alambizplugin/dbdao/services/intef/PipeBurstDataPressCurService.java

@@ -0,0 +1,21 @@
+package com.shkpr.service.alambizplugin.dbdao.services.intef;
+
+import com.shkpr.service.alambizplugin.dto.PipeBurstDataPressCur;
+
+import java.util.List;
+
+/**
+ * 爆管数据压力计service
+ *
+ * @author 欧阳劲驰
+ * @since 1.0.0
+ */
+public interface PipeBurstDataPressCurService {
+    /**
+     * 批量合并操作
+     *
+     * @param dates 数据集合
+     * @return 合并状态
+     */
+    Boolean upsertAll(List<PipeBurstDataPressCur> dates);
+}

+ 35 - 0
src/main/java/com/shkpr/service/alambizplugin/dto/PipeBurstDataPressCur.java

@@ -0,0 +1,35 @@
+package com.shkpr.service.alambizplugin.dto;
+
+import com.shkpr.service.alambizplugin.annotation.InfluxDbMapping;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+
+/**
+ * 爆管压力计数据
+ *
+ * @author 欧阳劲驰
+ * @since 1.0.0
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class PipeBurstDataPressCur {
+    /**
+     * 时间
+     */
+    @InfluxDbMapping("time")
+    private LocalDateTime time;
+    /**
+     * 设备id
+     */
+    @InfluxDbMapping("dev_id")
+    private String devId;
+    /**
+     * 值
+     */
+    @InfluxDbMapping("press_cur")
+    private Double value;
+}

+ 23 - 0
src/main/resources/mapper/PipeBurstDataPressCurMapper.xml

@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.shkpr.service.alambizplugin.dbdao.mapper.PipeBurstDataPressCurMapper">
+    <resultMap id="BaseResultMap" type="com.shkpr.service.alambizplugin.dto.PipeBurstDataPressCur">
+        <!--@Table k1_pgn_pb_data_press_cur-->
+        <id column="time" jdbcType="TIMESTAMP" property="time"/>
+        <result column="dev_id" jdbcType="VARCHAR" property="devId"/>
+        <result column="value" jdbcType="VARCHAR" property="value"/>
+    </resultMap>
+
+    <insert id="upsert">
+        insert into k1_pgn_pb_data_press_cur (time, dev_id, value)
+        values (timezone('utc', #{time,jdbcType=TIMESTAMP} at time zone current_setting('timezone')),
+                #{devId,jdbcType=VARCHAR},
+                #{value,jdbcType=DOUBLE})
+        on conflict (time, dev_id) do update set value = excluded.value;
+    </insert>
+
+    <select id="selectAll" resultMap="BaseResultMap">
+        select id, (time at time zone 'utc')::timestamp as time, dev_id, value
+        from k1_pgn_pb_data_press_cur;
+    </select>
+</mapper>