Procházet zdrojové kódy

influx改为分要素插入

欧阳劲驰 před 1 měsícem
rodič
revize
6418483c44

+ 26 - 17
iot-server/iot-server-data/src/main/java/com/shkpr/iot/server/data/service/impl/DataServiceImpl.java

@@ -10,10 +10,10 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.influxdb.dto.Point;
 import org.springframework.stereotype.Service;
 
+import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.time.temporal.ChronoUnit;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 /**
  * 数据service实现
@@ -40,21 +40,30 @@ public class DataServiceImpl implements DataService {
     public Boolean saveData(DataInfo data) {
         //排除空数据
         if (CollectionUtils.isEmpty(data.getDataValues())) return false;
-        //获取时间戳
-        long timestamp = data.getDateTime()
-                .truncatedTo(ChronoUnit.MINUTES)
-                .atZone(ZoneId.systemDefault())
-                .toInstant()
-                .toEpochMilli();
-        //构建点
-        Point point = Point.measurement("WaterQuality")
-                .tag(InfluxdbMetadata.FLAG_EQUIPMENT_CODE, data.getEquipmentCode())
-                .fields(data.getDataValues().stream()
-                        .collect(Collectors.toMap(DataValue::getKey, DataValue::getValue)))
-                .time(timestamp, TimeUnit.MILLISECONDS)
-                .build();
-        log.info("插入点位数据:{}", point);
-        //插入点
-        return influxDBUtil.insert(point);
+        //插入状态
+        Boolean result = false;
+        //遍历数据集合
+        for (DataValue dataValue : data.getDataValues()) {
+            final LocalDateTime dateTime = dataValue.getDateTime();
+            //分钟
+            int minute = dateTime.getMinute();
+            //获取时间戳
+            long timestamp = dateTime
+                    .plusMinutes(minute % 3 == 0 ? 0 : 3 - (minute % 3))
+                    .truncatedTo(ChronoUnit.MINUTES)
+                    .atZone(ZoneId.systemDefault())
+                    .toInstant()
+                    .toEpochMilli();
+            //构建点
+            Point point = Point.measurement("WaterQuality")
+                    .tag(InfluxdbMetadata.FLAG_EQUIPMENT_CODE, data.getEquipmentCode())
+                    .addField(dataValue.getKey(), (String) dataValue.getValue())
+                    .time(timestamp, TimeUnit.MILLISECONDS)
+                    .build();
+            log.info("插入点位数据:{}", point);
+            //插入点
+            result = influxDBUtil.insert(point);
+        }
+        return result;
     }
 }