Explorar el Código

江津数据采集 更改采集逻辑和相关配置

1037015548@qq.com hace 7 meses
padre
commit
796f8d6fa9

+ 100 - 4
dc3-gateway/src/main/java/io/github/pnoker/gateway/utils/KafkaConsumerDemo.java

@@ -1,15 +1,26 @@
 package io.github.pnoker.gateway.utils;
 
 import com.alibaba.nacos.shaded.com.google.common.base.Stopwatch;
+import net.sf.jsqlparser.expression.LongValue;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.Point;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
+import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -118,9 +129,94 @@ public class KafkaConsumerDemo {
         }
     }
 
-    public static void main(String[] args) {
-        // kafka的连接地址和主题请根据实际情况修改(kafka集群多个节点用逗号隔开,如10.15.0.114:9092,10.15.0.115:9092)
-        KafkaConsumerDemo kafkaConsumerDemo = new KafkaConsumerDemo("10.127.16.85:9092", "jiangjin");
-        kafkaConsumerDemo.doConsume();
+//    public static void main(String[] args) {
+////        // kafka的连接地址和主题请根据实际情况修改(kafka集群多个节点用逗号隔开,如10.15.0.114:9092,10.15.0.115:9092)
+////        KafkaConsumerDemo kafkaConsumerDemo = new KafkaConsumerDemo("10.127.16.85:9092", "jiangjin");
+////        kafkaConsumerDemo.doConsume();
+////    }
+public static void main(String[] args) {
+    // 创建InfluxDB客户端
+    InfluxDB influxDB = InfluxDBFactory.connect("http://10.101.16.13:8086");
+    String database = "iot";
+    influxDB.setDatabase(database);
+
+    // 查询数据
+    Query query = new Query("SELECT * FROM WaterMeter", database);
+    QueryResult result = influxDB.query(query);
+
+    // 处理数据
+    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
+    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+    int count = 0; // 计数器,用于控制批量插入
+    BatchPoints batchPoints = BatchPoints.database(database).build();
+
+    for (QueryResult.Result res : result.getResults()) {
+        if (res.getSeries() == null) continue;
+        for (QueryResult.Series series : res.getSeries()) {
+            List<List<Object>> values = series.getValues();
+            List<String> columns = series.getColumns();
+
+            for (List<Object> value : values) {
+                String timeString = (String) value.get(columns.indexOf("time"));
+                long timeNanos = 0;
+                try {
+                    // 解析时间字符串为毫秒
+                    long timeMillis = dateFormat.parse(timeString).getTime();
+                    // 转换为纳秒
+                    timeNanos = TimeUnit.MILLISECONDS.toNanos(timeMillis);
+                } catch (ParseException e) {
+                    e.printStackTrace();
+                    continue; // 跳过无法解析的时间
+                }
+
+                String devId = (String) value.get(columns.indexOf("dev_id"));
+                String devId1 = (String) value.get(columns.indexOf("dev_id_1"));
+                Double flow_cur = (Double) value.get(columns.indexOf("flow_cur"));
+                Double flow_l_cur = (Double) value.get(columns.indexOf("flow_l_cur"));
+                Double flow_l_total_pos = (Double) value.get(columns.indexOf("flow_l_total_pos"));
+                Double flow_total_net = (Double) value.get(columns.indexOf("flow_total_net"));
+                Double flow_total_pos = (Double) value.get(columns.indexOf("flow_total_pos"));
+                Double flow_total_rev = (Double) value.get(columns.indexOf("flow_total_rev"));
+                Double press_cur = (Double) value.get(columns.indexOf("press_cur"));
+
+                if (devId == null || devId.isEmpty()) {
+                    devId = devId1; // 如果 dev_id 为空,则使用 dev_id_1 的值
+                }
+
+                Point point = Point.measurement("WaterMeter_temp")
+                        .time(timeNanos, TimeUnit.NANOSECONDS)
+                        .tag("dev_id", devId)
+                        .addField("flow_cur", flow_cur)
+                        .addField("flow_l_cur", flow_l_cur)
+                        .addField("flow_l_total_pos", flow_l_total_pos)
+                        .addField("flow_total_net", flow_total_net)
+                        .addField("flow_total_pos", flow_total_pos)
+                        .addField("flow_total_rev", flow_total_rev)
+                        .addField("press_cur", press_cur)
+                        .build();
+
+                batchPoints.point(point);
+                count++;
+
+                // 当批量点数达到2000时,写入并重置
+                if (count >= 2000) {
+                    influxDB.write(batchPoints);
+                    batchPoints = BatchPoints.database(database).build(); // 重置BatchPoints
+                    count = 0; // 重置计数器
+                }
+            }
+        }
     }
+
+    // 写入剩余未满2000条的数据
+    if (count > 0) {
+        influxDB.write(batchPoints);
+    }
+
+    System.out.println("执行完毕");
+    // 关闭InfluxDB客户端
+    influxDB.close();
+}
+
 }