|
|
@@ -0,0 +1,124 @@
|
|
|
+package com.shkpr.service.customgateway.zhscada.components;
|
|
|
+
|
|
|
+import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
|
|
|
+import com.global.base.log.LogLevelFlag;
|
|
|
+import com.global.base.log.LogPrintMgr;
|
|
|
+import com.shkpr.service.customgateway.core.components.DeviceRegistry;
|
|
|
+import com.shkpr.service.customgateway.core.constants.LogFlagBusiType;
|
|
|
+import com.shkpr.service.customgateway.core.domain.Device;
|
|
|
+import com.shkpr.service.customgateway.core.domain.DeviceTag;
|
|
|
+import com.shkpr.service.customgateway.core.utils.InfluxDbUtil;
|
|
|
+import com.shkpr.service.customgateway.zhscada.constants.ScadaPlatformMetadata;
|
|
|
+import com.shkpr.service.customgateway.zhscada.domain.po.SensorCollectData;
|
|
|
+import com.shkpr.service.customgateway.zhscada.properties.MigrateProperties;
|
|
|
+import com.shkpr.service.customgateway.zhscada.service.SensorCollectDataService;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.influxdb.dto.Point;
|
|
|
+import org.springframework.core.io.Resource;
|
|
|
+import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
|
|
|
+import org.springframework.core.io.support.ResourcePatternResolver;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Objects;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 数据迁移器
|
|
|
+ *
|
|
|
+ * @author 欧阳劲驰
|
|
|
+ * @serial 1.0.4
|
|
|
+ */
|
|
|
+@Component
|
|
|
+public class DataMigrator {
|
|
|
+ //log
|
|
|
+ private static final String CLASS_NAME = "DataCollector";
|
|
|
+ private static final String BIZ_TYPE = LogFlagBusiType.ZHONG_HUAN_SACADA.toStrValue();
|
|
|
+
|
|
|
+ final
|
|
|
+ MigrateProperties migrateProperties;
|
|
|
+ final
|
|
|
+ DeviceRegistry deviceRegistry;
|
|
|
+ final
|
|
|
+ SensorCollectDataService sensorCollectDataService;
|
|
|
+ final
|
|
|
+ InfluxDbUtil influxDbUtil;
|
|
|
+
|
|
|
+ //传感器映射,k:tag,v:code
|
|
|
+ private Map<String, String> sensorMap = new HashMap<>();
|
|
|
+
|
|
|
+ public DataMigrator(MigrateProperties migrateProperties, DeviceRegistry deviceRegistry, SensorCollectDataService sensorCollectDataService, InfluxDbUtil influxDbUtil) {
|
|
|
+ this.migrateProperties = migrateProperties;
|
|
|
+ this.deviceRegistry = deviceRegistry;
|
|
|
+ this.sensorCollectDataService = sensorCollectDataService;
|
|
|
+ this.influxDbUtil = influxDbUtil;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 迁移scada
|
|
|
+ */
|
|
|
+ public void migrateScada() {
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME
|
|
|
+ , "开始迁移Scada数据");
|
|
|
+ long begin = System.currentTimeMillis();
|
|
|
+
|
|
|
+ //加载传感器配置
|
|
|
+ loadSensors();
|
|
|
+ //设备
|
|
|
+ List<Device> devices = deviceRegistry.findAll(ScadaPlatformMetadata.NAME);
|
|
|
+ //遍历设备,读取数据并写入
|
|
|
+ devices.forEach(device -> {
|
|
|
+ //遍历tag
|
|
|
+ for (DeviceTag tag : device.getTags()) {
|
|
|
+ //传感器code
|
|
|
+ String code = sensorMap.getOrDefault(tag.getTag(), null);
|
|
|
+ if (StringUtils.isBlank(code)) continue;
|
|
|
+ //起止时间
|
|
|
+ SensorCollectData timeRange = sensorCollectDataService.findTimeRange(migrateProperties.getSchema(),code);
|
|
|
+ LocalDateTime minTime = timeRange.getMinTime();
|
|
|
+ LocalDateTime maxTime = timeRange.getMaxTime();
|
|
|
+ //查询数据
|
|
|
+ List<SensorCollectData> dates = sensorCollectDataService.findAlign(migrateProperties.getSchema(),
|
|
|
+ minTime, maxTime, migrateProperties.getInterval(), migrateProperties.getIntervalUnit(),
|
|
|
+ migrateProperties.getAlignUnit(), code);
|
|
|
+
|
|
|
+ //构建influxdb
|
|
|
+ List<Point> points = dates.parallelStream().map(d -> d.toPoint(devices, sensorMap))
|
|
|
+ .filter(Objects::nonNull).collect(Collectors.toList());
|
|
|
+ //写入influxDb
|
|
|
+ influxDbUtil.write(points);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+
|
|
|
+ long end = System.currentTimeMillis();
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, BIZ_TYPE, CLASS_NAME
|
|
|
+ , String.format("结束迁移Scada数据 用时(毫秒):%d", (end - begin))
|
|
|
+ );
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 加载传感器
|
|
|
+ */
|
|
|
+ private void loadSensors() {
|
|
|
+ try {
|
|
|
+ //解析文件
|
|
|
+ ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
|
|
|
+ Resource resources = resourceResolver.getResources(migrateProperties.getSensorMapPath())[0];
|
|
|
+ //解析map
|
|
|
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
|
|
|
+ sensorMap = mapper.readValue(resources.getFile(), mapper.getTypeFactory()
|
|
|
+ .constructMapType(HashMap.class, String.class, String.class));
|
|
|
+ } catch (IOException e) {
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, BIZ_TYPE, CLASS_NAME
|
|
|
+ , String.format("传感器映射配置文件读取失败 error:%s", e)
|
|
|
+ );
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|