Browse Source

江津数据采集

1037015548@qq.com 8 months ago
parent
commit
a809b6130e

+ 14 - 0
dc3-gateway/pom.xml

@@ -176,6 +176,20 @@
             <version>5.2.3</version>
         </dependency>
 
+        <!-- Apache Kafka Clients -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>2.8.0</version> <!-- 请根据需要使用最新版本 -->
+        </dependency>
+
+        <!-- Google Guava -->
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>30.1.1-jre</version> <!-- 请根据需要使用最新版本 -->
+        </dependency>
+
     </dependencies>
 
     <repositories>

+ 133 - 0
dc3-gateway/src/main/java/io/github/pnoker/gateway/bizmgr/KprJiangjinWaterBizfun.java

@@ -0,0 +1,133 @@
+package io.github.pnoker.gateway.bizmgr;
+
+import com.alibaba.fastjson.JSONObject;
+import io.github.pnoker.gateway.SpringContextUtil;
+import io.github.pnoker.gateway.bizmgr.baseInit.KprBaseInitFun;
+import io.github.pnoker.gateway.comtool.jiangjinThread.KafkaConsumer;
+import io.github.pnoker.gateway.utils.InfulxDbUtil;
+import io.github.pnoker.gateway.utils.InfulxJiangjinDbUtil;
+import org.influxdb.dto.Point;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.time.*;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * @ClassName KprJiangjinWaterBizfun
+ * @Description: TODO
+ * @Author LX
+ * @Date 2024/11/21
+ * @Version V1.0
+ **/
+public class KprJiangjinWaterBizfun {
+    private static final Logger log = LoggerFactory.getLogger(KprJiangjinWaterBizfun.class);
+
+    private final static String mStrClassName = "KprJiangjinWaterBizfun";
+    private final static String EMPTY_NULL = "NULL";
+
+    public static InfulxJiangjinDbUtil infulxJiangjinDbUtil = null;//infulx工具类对象
+
+    public static Point createPointFromJson(String deviceType, JSONObject jsonObject,String cmCode,List<String> params) {
+        // 获取时间戳
+        long nanoTimestamp = convertToNanoTimestamp(jsonObject.getLong("collectionTime"));
+        if(nanoTimestamp==0L){
+            return null;
+        }
+        // 创建 Point.Builder 对象
+        Point.Builder pointBuilder = Point.measurement(KprBaseInitFun.getInstance().jiangjinMeasurementMap.get(deviceType))
+                .tag("dev_id",cmCode)
+                .time(nanoTimestamp, TimeUnit.NANOSECONDS);
+
+        // 遍历 JSON 对象的键值对,并添加到 Point.Builder 中
+        for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
+            String key = entry.getKey();
+            if (!key.equals("collectionTime")) {
+                processField(pointBuilder,key, entry.getValue(),params);
+            }
+        }
+        if(!pointBuilder.hasFields()){
+            return null;
+        }
+
+        // 构建 Point 对象
+        try {
+            return pointBuilder.build();
+        }catch(Exception ex){
+            ex.printStackTrace();
+        }
+        return null;
+    }
+
+    //指定列处理
+    private static void processField(Point.Builder pointBuilder, String key, Object value,List<String> params) {
+        if (value == null) {
+            return; // 跳过空值
+        }
+        //String[]长度如果为0,则说明本系统没有对应字段,则用它本身,反之用第二个元素作为字段名
+        List<String[]> newParams = params.stream()
+                .map(s -> s.split("___"))
+                .collect(Collectors.toList());
+
+        for (String[] clies:newParams){
+            boolean exists = Arrays.asList(clies).contains(key);
+            if(exists){
+                String newKey = clies.length>1?clies[1]:clies[0];
+                if (value instanceof Integer) {
+                    pointBuilder.addField(newKey, (Integer) value);
+                } else if (value instanceof Long) {
+                    pointBuilder.addField(newKey, (Long) value);
+                } else if (value instanceof Boolean) {
+                    pointBuilder.addField(newKey, ((Boolean)value)?1:0);
+                } else if (value instanceof String) {
+                    if (((String)value).equals("01")||((String)value).equals("00")){
+                        pointBuilder.addField(newKey, ((String) value).replace("0",""));
+                    }else {
+                        pointBuilder.addField(newKey, (String) value);
+                    }
+                }else if (value instanceof BigDecimal) {
+                    pointBuilder.addField(newKey, ((BigDecimal) value).doubleValue());
+                }else{
+                    pointBuilder.addField(newKey, (String.valueOf(value)) );
+                }
+                break;
+            }
+        }
+    }
+
+
+    //获取CST 纳秒时间戳
+    private static long convertToNanoTimestamp(Long dateTimeString) {
+        // 解析字符串为 LocalDateTime 对象
+        LocalDateTime dateTime = Instant.ofEpochMilli(dateTimeString)
+                .atZone(ZoneId.systemDefault())
+                .toLocalDateTime();
+        // 指定时区为CST(Asia/Shanghai)
+        ZoneId shanghaiZoneId = ZoneId.of("Asia/Shanghai");
+        // 将LocalDateTime对象转换为ZonedDateTime对象
+        ZonedDateTime zonedDateTime = dateTime.atZone(shanghaiZoneId);
+        // 转换为秒级别的时间戳
+        long epochSecond = dateTime.toEpochSecond(ZoneOffset.UTC);
+        // 将ZonedDateTime对象转换为Instant对象(UTC时间点)
+        Instant instant = zonedDateTime.toInstant();
+        // 计算从1970年1月1日00:00:00 UTC以来的纳秒数
+        long nanosecondsSinceEpoch = ChronoUnit.NANOS.between(Instant.EPOCH, instant);
+        long currentTimeNanos = System.currentTimeMillis() * 1_000_000L;
+        if(nanosecondsSinceEpoch>currentTimeNanos){
+            return 0L;
+        }
+        return nanosecondsSinceEpoch;
+    }
+
+    //TODO 处理kafka所有已存在数据
+    public static void InitHistory(){
+        KafkaConsumer kafkaConsumerDemo = new KafkaConsumer("10.127.16.85:9092", "jiangjin");
+        kafkaConsumerDemo.doConsume();
+    }
+}

+ 133 - 26
dc3-gateway/src/main/java/io/github/pnoker/gateway/bizmgr/baseInit/KprBaseInitFun.java

@@ -22,6 +22,12 @@ public class KprBaseInitFun {
 
     public Map<String,String> measurementMap = new HashMap<>();
 
+    //TODO 江津相关
+    public Map<String,List<String>> jiangjinParams = new HashMap<>();
+    public Map<String,List<String>> jiangjinDeviceList = new HashMap<>();
+
+    public Map<String,String> jiangjinMeasurementMap = new HashMap<>();
+
     private static volatile KprBaseInitFun msInstance = null;
     public static KprBaseInitFun getInstance(){
         if (msInstance == null){
@@ -39,37 +45,138 @@ public class KprBaseInitFun {
 
     public void init(){
         try {
-            InputStream inputStream = getConfigStream("application.yml");
-            Properties properties = new Properties();
-            properties.load(inputStream);
-            List<String> WATER_METER= Arrays.asList(properties.getProperty("watermeter").split(","));
-            List<String> FLOW_METER=Arrays.asList(properties.getProperty("flowmeter").split(","));
-            List<String> PRESS_METER=Arrays.asList(properties.getProperty("pressmeter").split(","));
-            List<String> WATER_QUALITY=Arrays.asList(properties.getProperty("waterquality").split(","));
-            List<String> WATER_PUMP=Arrays.asList(properties.getProperty("waterpump").split(","));
-            List<String> ELEC_CTRL_VALVE=Arrays.asList(properties.getProperty("elecctrlvalve").split(","));
-            List<String> LIQUID_LEVEL_METER=Arrays.asList(properties.getProperty("liquidlevelmeter").split(","));
-
-            dangyangParams.put("WATER_METER",WATER_METER);
-            dangyangParams.put("FLOW_METER",FLOW_METER);
-            dangyangParams.put("PRESS_METER",PRESS_METER);
-            dangyangParams.put("WATER_QUALITY",WATER_QUALITY);
-            dangyangParams.put("WATER_PUMP",WATER_PUMP);
-            dangyangParams.put("ELEC_CTRL_VALVE",ELEC_CTRL_VALVE);
-            dangyangParams.put("LIQUID_LEVEL_METER",LIQUID_LEVEL_METER);
-
-            measurementMap.put("WATER_METER",properties.getProperty("WATER_METER"));
-            measurementMap.put("FLOW_METER",properties.getProperty("FLOW_METER"));
-            measurementMap.put("PRESS_METER",properties.getProperty("PRESS_METER"));
-            measurementMap.put("WATER_QUALITY",properties.getProperty("WATER_QUALITY"));
-            measurementMap.put("WATER_PUMP",properties.getProperty("WATER_PUMP"));
-            measurementMap.put("ELEC_CTRL_VALVE",properties.getProperty("ELEC_CTRL_VALVE"));
-            measurementMap.put("LIQUID_LEVEL_METER",properties.getProperty("LIQUID_LEVEL_METER"));
+            initDangyang();
+            initJiangjin();
         }catch(Exception ex){
             ex.printStackTrace();
         }
     }
 
+
+    public void initDangyang()throws Exception{
+        InputStream inputStream = getConfigStream("application.yml");
+        Properties properties = new Properties();
+        properties.load(inputStream);
+        List<String> WATER_METER= Arrays.asList(properties.getProperty("watermeter").split(","));
+        List<String> FLOW_METER=Arrays.asList(properties.getProperty("flowmeter").split(","));
+        List<String> PRESS_METER=Arrays.asList(properties.getProperty("pressmeter").split(","));
+        List<String> WATER_QUALITY=Arrays.asList(properties.getProperty("waterquality").split(","));
+        List<String> WATER_PUMP=Arrays.asList(properties.getProperty("waterpump").split(","));
+        List<String> ELEC_CTRL_VALVE=Arrays.asList(properties.getProperty("elecctrlvalve").split(","));
+        List<String> LIQUID_LEVEL_METER=Arrays.asList(properties.getProperty("liquidlevelmeter").split(","));
+
+        dangyangParams.put("WATER_METER",WATER_METER);
+        dangyangParams.put("FLOW_METER",FLOW_METER);
+        dangyangParams.put("PRESS_METER",PRESS_METER);
+        dangyangParams.put("WATER_QUALITY",WATER_QUALITY);
+        dangyangParams.put("WATER_PUMP",WATER_PUMP);
+        dangyangParams.put("ELEC_CTRL_VALVE",ELEC_CTRL_VALVE);
+        dangyangParams.put("LIQUID_LEVEL_METER",LIQUID_LEVEL_METER);
+
+        measurementMap.put("WATER_METER",properties.getProperty("WATER_METER"));
+        measurementMap.put("FLOW_METER",properties.getProperty("FLOW_METER"));
+        measurementMap.put("PRESS_METER",properties.getProperty("PRESS_METER"));
+        measurementMap.put("WATER_QUALITY",properties.getProperty("WATER_QUALITY"));
+        measurementMap.put("WATER_PUMP",properties.getProperty("WATER_PUMP"));
+        measurementMap.put("ELEC_CTRL_VALVE",properties.getProperty("ELEC_CTRL_VALVE"));
+        measurementMap.put("LIQUID_LEVEL_METER",properties.getProperty("LIQUID_LEVEL_METER"));
+    }
+    public void initJiangjin()throws Exception{
+        InputStream inputStream = getConfigStream("application-config.yml");
+        Properties properties = new Properties();
+        properties.load(inputStream);
+
+        List<String> VoltageSwitchgear=Arrays.asList(properties.getProperty("VoltageSwitchgear").split(","));
+        List<String> WaterValve=Arrays.asList(properties.getProperty("WaterValve").split(","));
+        List<String> VariableFrequencyDrive=Arrays.asList(properties.getProperty("VariableFrequencyDrive").split(","));
+        List<String> ChlorineFlowMeter=Arrays.asList(properties.getProperty("ChlorineFlowMeter").split(","));
+        List<String> WaterQuality=Arrays.asList(properties.getProperty("WaterQuality").split(","));
+        List<String> DifferentialPressure=Arrays.asList(properties.getProperty("DifferentialPressure").split(","));
+        List<String> ElectricMotor=Arrays.asList(properties.getProperty("ElectricMotor").split(","));
+        List<String> WaterPump=Arrays.asList(properties.getProperty("WaterPump").split(","));
+        List<String> AirCompressor=Arrays.asList(properties.getProperty("AirCompressor").split(","));
+        List<String> RootsBlower=Arrays.asList(properties.getProperty("RootsBlower").split(","));
+        List<String> thMeter=Arrays.asList(properties.getProperty("thMeter").split(","));
+        List<String> SludgeIntfMeter=Arrays.asList(properties.getProperty("SludgeIntfMeter").split(","));
+        List<String> InformationSoftware=Arrays.asList(properties.getProperty("InformationSoftware").split(","));
+        List<String> WaterMeter=Arrays.asList(properties.getProperty("WaterMeter").split(","));
+        List<String> SmokeDetector=Arrays.asList(properties.getProperty("SmokeDetector").split(","));
+        List<String> LiquidLevelDiffer=Arrays.asList(properties.getProperty("LiquidLevelDiffer").split(","));
+        List<String> LiquidLevel=Arrays.asList(properties.getProperty("LiquidLevel").split(","));
+
+        jiangjinParams.put("VoltageSwitchgear",VoltageSwitchgear);
+        jiangjinParams.put("WaterValve",WaterValve);
+        jiangjinParams.put("VariableFrequencyDrive",VariableFrequencyDrive);
+        jiangjinParams.put("ChlorineFlowMeter",ChlorineFlowMeter);
+        jiangjinParams.put("WaterQuality",WaterQuality);
+        jiangjinParams.put("DifferentialPressure",DifferentialPressure);
+        jiangjinParams.put("ElectricMotor",ElectricMotor);
+        jiangjinParams.put("WaterPump",WaterPump);
+        jiangjinParams.put("AirCompressor",AirCompressor);
+        jiangjinParams.put("RootsBlower",RootsBlower);
+        jiangjinParams.put("thMeter",thMeter);
+        jiangjinParams.put("SludgeIntfMeter",SludgeIntfMeter);
+        jiangjinParams.put("InformationSoftware",InformationSoftware);
+        jiangjinParams.put("WaterMeter",WaterMeter);
+        jiangjinParams.put("SmokeDetector",SmokeDetector);
+        jiangjinParams.put("LiquidLevelDiffer",LiquidLevelDiffer);
+        jiangjinParams.put("LiquidLevel",LiquidLevel);
+
+        List<String> DeviceVoltageSwitchgear=Arrays.asList(properties.getProperty("VoltageSwitchgear_device").split(","));
+        List<String> DeviceWaterValve=Arrays.asList(properties.getProperty("WaterValve_device").split(","));
+        List<String> DeviceVariableFrequencyDrive=Arrays.asList(properties.getProperty("VariableFrequencyDrive_device").split(","));
+        List<String> DeviceChlorineFlowMeter=Arrays.asList(properties.getProperty("ChlorineFlowMeter_device").split(","));
+        List<String> DeviceWaterQuality=Arrays.asList(properties.getProperty("WaterQuality_device").split(","));
+        List<String> DeviceDifferentialPressure=Arrays.asList(properties.getProperty("DifferentialPressure_device").split(","));
+        List<String> DeviceElectricMotor=Arrays.asList(properties.getProperty("ElectricMotor_device").split(","));
+        List<String> DeviceWaterPump=Arrays.asList(properties.getProperty("WaterPump_device").split(","));
+        List<String> DeviceAirCompressor=Arrays.asList(properties.getProperty("AirCompressor_device").split(","));
+        List<String> DeviceRootsBlower=Arrays.asList(properties.getProperty("RootsBlower_device").split(","));
+        List<String> DevicethMeter=Arrays.asList(properties.getProperty("thMeter_device").split(","));
+        List<String> DeviceSludgeIntfMeter=Arrays.asList(properties.getProperty("SludgeIntfMeter_device").split(","));
+        List<String> DeviceInformationSoftware=Arrays.asList(properties.getProperty("InformationSoftware_device").split(","));
+        List<String> DeviceWaterMeter=Arrays.asList(properties.getProperty("WaterMeter_device").split(","));
+        List<String> DeviceSmokeDetector=Arrays.asList(properties.getProperty("SmokeDetector_device").split(","));
+        List<String> DeviceLiquidLevelDiffer=Arrays.asList(properties.getProperty("LiquidLevelDiffer_device").split(","));
+        List<String> DeviceLiquidLevel=Arrays.asList(properties.getProperty("LiquidLevel_device").split(","));
+
+        jiangjinDeviceList.put("Device_VoltageSwitchgear",DeviceVoltageSwitchgear);
+        jiangjinDeviceList.put("Device_WaterValve",DeviceWaterValve);
+        jiangjinDeviceList.put("Device_VariableFrequencyDrive",DeviceVariableFrequencyDrive);
+        jiangjinDeviceList.put("Device_ChlorineFlowMeter",DeviceChlorineFlowMeter);
+        jiangjinDeviceList.put("Device_WaterQuality",DeviceWaterQuality);
+        jiangjinDeviceList.put("Device_DifferentialPressure",DeviceDifferentialPressure);
+        jiangjinDeviceList.put("Device_ElectricMotor",DeviceElectricMotor);
+        jiangjinDeviceList.put("Device_WaterPump",DeviceWaterPump);
+        jiangjinDeviceList.put("Device_AirCompressor",DeviceAirCompressor);
+        jiangjinDeviceList.put("Device_RootsBlower",DeviceRootsBlower);
+        jiangjinDeviceList.put("Device_thMeter",DevicethMeter);
+        jiangjinDeviceList.put("Device_SludgeIntfMeter",DeviceSludgeIntfMeter);
+        jiangjinDeviceList.put("Device_InformationSoftware",DeviceInformationSoftware);
+        jiangjinDeviceList.put("Device_WaterMeter",DeviceWaterMeter);
+        jiangjinDeviceList.put("Device_SmokeDetector",DeviceSmokeDetector);
+        jiangjinDeviceList.put("Device_LiquidLevelDiffer",DeviceLiquidLevelDiffer);
+        jiangjinDeviceList.put("Device_LiquidLevel",DeviceLiquidLevel);
+
+        jiangjinMeasurementMap.put("VoltageSwitchgear","VoltageSwitchgear");
+        jiangjinMeasurementMap.put("WaterValve","WaterValve");
+        jiangjinMeasurementMap.put("VariableFrequencyDrive","VariableFrequencyDrive");
+        jiangjinMeasurementMap.put("ChlorineFlowMeter","ChlorineFlowMeter");
+        jiangjinMeasurementMap.put("WaterQuality","WaterQuality");
+        jiangjinMeasurementMap.put("DifferentialPressure","DifferentialPressure");
+        jiangjinMeasurementMap.put("ElectricMotor","ElectricMotor");
+        jiangjinMeasurementMap.put("WaterPump","WaterPump");
+        jiangjinMeasurementMap.put("AirCompressor","AirCompressor");
+        jiangjinMeasurementMap.put("RootsBlower","RootsBlower");
+        jiangjinMeasurementMap.put("thMeter","thMeter");
+        jiangjinMeasurementMap.put("SludgeIntfMeter","SludgeIntfMeter");
+        jiangjinMeasurementMap.put("InformationSoftware","InformationSoftware");
+        jiangjinMeasurementMap.put("WaterMeter","WaterMeter");
+        jiangjinMeasurementMap.put("SmokeDetector","SmokeDetector");
+        jiangjinMeasurementMap.put("LiquidLevelDiffer","LiquidLevelDiffer");
+        jiangjinMeasurementMap.put("LiquidLevel","LiquidLevel");
+    }
+
     public InputStream getConfigStream(String configFileName) {
         InputStream inputStream = null;
         try {

+ 130 - 93
dc3-gateway/src/main/java/io/github/pnoker/gateway/comtool/ScheduleTaskMgr.java

@@ -3,8 +3,10 @@ package io.github.pnoker.gateway.comtool;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import io.github.pnoker.gateway.bizmgr.KprDangyangWaterBizFun;
+import io.github.pnoker.gateway.bizmgr.KprJiangjinWaterBizfun;
 import io.github.pnoker.gateway.dbdao.DBMgrProxy;
 import io.github.pnoker.gateway.utils.InfulxDbUtil;
+import io.github.pnoker.gateway.utils.InfulxJiangjinDbUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
@@ -93,121 +95,156 @@ public class ScheduleTaskMgr {
     @Value("${dangyang.http.list:}")
     private String listUrl = ""; // 设备列表接口地址
 
-
     /**
      *  当阳相关
      */
-    //TODO 启动后5秒初始化所有配置参数
-    @PostConstruct
-    public void initDangyangApplication(){
-        new Timer().schedule(new TimerTask() {
-            @Override
-            public void run() {
-                try {
-                    KprDangyangWaterBizFun.infulxDbUtil = infulxDbUtil;
-                    KprDangyangWaterBizFun.deviceType = JSONArray.parseArray(deviceTypeStr);
-                    KprDangyangWaterBizFun.username = username;
-                    KprDangyangWaterBizFun.password = password;
-                    KprDangyangWaterBizFun.tokenUrl = tokenUrl;
-                    KprDangyangWaterBizFun.realtimeDataListUrl = realtimeDataListUrl;
-                    KprDangyangWaterBizFun.hisDataListUrl = hisDataListUrl;
-                    KprDangyangWaterBizFun.listUrl = listUrl;
-                }catch(Exception ex){
-                    log.error("当阳启动时初始化配置参数失败:"+ex.getLocalizedMessage());
-                }
-            }
-        },5000);
-    }
-
-    //TODO 根据失效时间定时获取token
-    //TODO 启动后8秒获取第一次token,在以失效时间为基础-5分钟为下一次获取token的条件
-    private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
-    private static final long INITIAL_DELAY = 8; // 初始延迟8秒
-    private static final long BUFFER_TIME = 5 * 60 * 1000; // 5分钟的缓冲时间(以毫秒为单位)
-
-    @PostConstruct
-    public void getToken() {
-        scheduler.schedule(new TokenTask(), INITIAL_DELAY, TimeUnit.SECONDS);
-    }
-
-    private class TokenTask implements Runnable {
-        @Override
-        public void run() {
-            // 获取 accessToken 和 expiresTime
-            JSONObject data = KprDangyangWaterBizFun.getDangyangToken();
-            if (data != null) {
-                String accessToken = data.getString("accessToken");
-                Long expiresTime = data.getLong("expiresTime");
-
-                KprDangyangWaterBizFun.dangyangToken = accessToken;
-
-                // 打印获取到的 token 和过期时间
-                System.out.println("Access Token: " + accessToken);
-                System.out.println("Expires Time: " + expiresTime);
-
-                // 计算下一次获取 token 的延迟时间
-                long delayMillis = expiresTime - System.currentTimeMillis() - BUFFER_TIME;
-                long delaySeconds = delayMillis / 1000; // 转换为秒
-
-                // 调度下一次任务
-                if (delaySeconds > 0) {
-                    scheduler.schedule(this, delaySeconds, TimeUnit.SECONDS);
-                } else {
-                    // 如果计算出来的延迟时间已经过期,立即执行下一次任务
-                    scheduler.schedule(this, 0, TimeUnit.SECONDS);
-                }
-            }
-        }
-    }
-
-    //可以做动态关联,暂时不用了
+//    //TODO 启动后5秒初始化所有配置参数
 //    @PostConstruct
-//    public void initDeviceDefine(){
+//    public void initDangyangApplication(){
 //        new Timer().schedule(new TimerTask() {
 //            @Override
 //            public void run() {
-//                KprDangyangWaterBizFun.flushDefine();
+//                try {
+//                    infulxDbUtil.initInfluxDataBase();
+//                    KprDangyangWaterBizFun.infulxDbUtil = infulxDbUtil;
+//                    KprDangyangWaterBizFun.deviceType = JSONArray.parseArray(deviceTypeStr);
+//                    KprDangyangWaterBizFun.username = username;
+//                    KprDangyangWaterBizFun.password = password;
+//                    KprDangyangWaterBizFun.tokenUrl = tokenUrl;
+//                    KprDangyangWaterBizFun.realtimeDataListUrl = realtimeDataListUrl;
+//                    KprDangyangWaterBizFun.hisDataListUrl = hisDataListUrl;
+//                    KprDangyangWaterBizFun.listUrl = listUrl;
+//                }catch(Exception ex){
+//                    log.error("当阳启动时初始化配置参数失败:"+ex.getLocalizedMessage());
+//                }
 //            }
 //        },5000);
 //    }
+//
+//    //TODO 根据失效时间定时获取token
+//    //TODO 启动后8秒获取第一次token,在以失效时间为基础-5分钟为下一次获取token的条件
+//    private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+//    private static final long INITIAL_DELAY = 8; // 初始延迟8秒
+//    private static final long BUFFER_TIME = 5 * 60 * 1000; // 5分钟的缓冲时间(以毫秒为单位)
+//
+//    @PostConstruct
+//    public void getToken() {
+//        scheduler.schedule(new TokenTask(), INITIAL_DELAY, TimeUnit.SECONDS);
+//    }
+//
+//    private class TokenTask implements Runnable {
+//        @Override
+//        public void run() {
+//            // 获取 accessToken 和 expiresTime
+//            JSONObject data = KprDangyangWaterBizFun.getDangyangToken();
+//            if (data != null) {
+//                String accessToken = data.getString("accessToken");
+//                Long expiresTime = data.getLong("expiresTime");
+//
+//                KprDangyangWaterBizFun.dangyangToken = accessToken;
+//
+//                // 打印获取到的 token 和过期时间
+//                System.out.println("Access Token: " + accessToken);
+//                System.out.println("Expires Time: " + expiresTime);
+//
+//                // 计算下一次获取 token 的延迟时间
+//                long delayMillis = expiresTime - System.currentTimeMillis() - BUFFER_TIME;
+//                long delaySeconds = delayMillis / 1000; // 转换为秒
+//
+//                // 调度下一次任务
+//                if (delaySeconds > 0) {
+//                    scheduler.schedule(this, delaySeconds, TimeUnit.SECONDS);
+//                } else {
+//                    // 如果计算出来的延迟时间已经过期,立即执行下一次任务
+//                    scheduler.schedule(this, 0, TimeUnit.SECONDS);
+//                }
+//            }
+//        }
+//    }
+//
+//    //可以做动态关联,暂时不用了
+////    @PostConstruct
+////    public void initDeviceDefine(){
+////        new Timer().schedule(new TimerTask() {
+////            @Override
+////            public void run() {
+////                KprDangyangWaterBizFun.flushDefine();
+////            }
+////        },5000);
+////    }
+//
+//    @PostConstruct
+//    public void initReal(){
+//        new Timer().schedule(new TimerTask() {
+//            @Override
+//            public void run() {
+//                KprDangyangWaterBizFun.checkRealtimeData();
+//            }
+//        },10000);
+//    }
+//    @PostConstruct
+//    public void initHistory(){
+//        new Timer().schedule(new TimerTask() {
+//            @Override
+//            public void run() {
+//                KprDangyangWaterBizFun.checkOneMonth();
+//            }
+//        },11000);
+//    }
+//    @PostConstruct
+//    public void initDingzhiHistory(){
+//        new Timer().schedule(new TimerTask() {
+//            @Override
+//            public void run() {
+//                KprDangyangWaterBizFun.checkDingzhiHistory();
+//            }
+//        },12000);
+//    }
+//
+//    @Scheduled(cron = "0 */10 * * * ?")
+//    public void initToken(){
+//        JSONObject data = KprDangyangWaterBizFun.getDangyangToken();
+//        if (data != null) {
+//            String accessToken = data.getString("accessToken");
+//            Long expiresTime = data.getLong("expiresTime");
+//
+//            KprDangyangWaterBizFun.dangyangToken = accessToken;
+//        }
+//    }
 
+    //TODO 江津相关
+
+    @Resource(name = "infulxJiangjinDbUtil")
+    private InfulxJiangjinDbUtil infulxJiangjinDbUtil;
+
+    //TODO 启动后5秒初始化所有配置参数
     @PostConstruct
-    public void initReal(){
-        new Timer().schedule(new TimerTask() {
-            @Override
-            public void run() {
-                KprDangyangWaterBizFun.checkRealtimeData();
-            }
-        },10000);
-    }
-    @PostConstruct
-    public void initHistory(){
+    public void initJiangjinApplication(){
         new Timer().schedule(new TimerTask() {
             @Override
             public void run() {
-                KprDangyangWaterBizFun.checkOneMonth();
+                try {
+                    infulxJiangjinDbUtil.initInfluxDataBase();
+                    KprJiangjinWaterBizfun.infulxJiangjinDbUtil = infulxJiangjinDbUtil;
+                }catch(Exception ex){
+                    log.error("江津启动时初始化配置参数失败:"+ex.getLocalizedMessage());
+                }
             }
-        },11000);
+        },5000);
     }
+
     @PostConstruct
-    public void initDingzhiHistory(){
+    public void initJiangjinHisData(){
         new Timer().schedule(new TimerTask() {
             @Override
             public void run() {
-                KprDangyangWaterBizFun.checkDingzhiHistory();
-            }
-        },12000);
-    }
-
-    @Scheduled(cron = "0 */10 * * * ?")
-    public void initToken(){
-        JSONObject data = KprDangyangWaterBizFun.getDangyangToken();
-        if (data != null) {
-            String accessToken = data.getString("accessToken");
-            Long expiresTime = data.getLong("expiresTime");
+                try {
+                    KprJiangjinWaterBizfun.InitHistory();
+                }catch(Exception ex){
 
-            KprDangyangWaterBizFun.dangyangToken = accessToken;
-        }
+                }
+            }
+        },7000);
     }
 
 }

+ 263 - 0
dc3-gateway/src/main/java/io/github/pnoker/gateway/comtool/jiangjinThread/KafkaConsumer.java

@@ -0,0 +1,263 @@
+package io.github.pnoker.gateway.comtool.jiangjinThread;
+
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.nacos.shaded.com.google.common.base.Stopwatch;
+import io.github.pnoker.gateway.SpringContextUtil;
+import io.github.pnoker.gateway.bizmgr.KprDangyangWaterBizFun;
+import io.github.pnoker.gateway.bizmgr.KprJiangjinWaterBizfun;
+import io.github.pnoker.gateway.bizmgr.baseInit.KprBaseInitFun;
+import io.github.pnoker.gateway.utils.InfulxJiangjinDbUtil;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.influxdb.dto.Point;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
+
+import java.math.BigDecimal;
+import java.time.*;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * @ClassName KafkaConsumer
+ * @Description: TODO
+ * @Author LX
+ * @Date 2024/11/12
+ * @Version V1.0
+ **/
+public class KafkaConsumer {
+    /**
+     * log 日志
+     */
+    private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
+
+    /**
+     * 计数
+     */
+    private final AtomicInteger count = new AtomicInteger(0);
+
+    /**
+     * 原生消费者
+     */
+    private org.apache.kafka.clients.consumer.KafkaConsumer consumer;
+
+    /**
+     * kafka 连接地址
+     */
+    private String kafkaAddr;
+
+    /**
+     * kafka 消费主题
+     */
+    private String topic;
+
+    /**
+     * 构造函数
+     * @param kafkaAddr kafka 连接地址
+     * @param topic kafka 消费主题
+     */
+    public KafkaConsumer(String kafkaAddr, String topic) {
+        this.kafkaAddr = kafkaAddr;
+        this.topic = topic;
+    }
+
+    /**
+     * 生成消费者对象
+     */
+    private void getConsumer(){
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddr);
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer029-group");
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
+        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringDeserializer");
+//        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+        //如果重置为 latest,消费者会跳过所有现有消息,只处理新到达的消息。
+        //如果重置为 earliest,消费者会重新处理所有已存在的消息。
+        /**
+         * 使用 latest 适合于不关心历史消息,只处理实时数据的场景。
+            使用 earliest 适合于需要完整处理所有可用数据的场景,例如数据迁移或批数据处理。
+         */
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+//        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        // 此处为加密配置,用户名密码需要调整为分配给你的信息
+        props.put("security.protocol", "SASL_PLAINTEXT");
+        props.put("sasl.mechanism", "SCRAM-SHA-512");
+        props.put("sasl.jaas.config",
+                "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"consumer029\" password=\"SL2ASL9K\";");
+        consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(props);
+    }
+
+    /**
+     * 消费数据
+     */
+    public void doConsume() {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        if(consumer == null) {
+            getConsumer();
+        }
+        while (true) {
+            consumer.subscribe(Collections.singletonList(topic));
+            ConsumerRecords<String, String> records = consumer.poll(1000);
+            if (records.isEmpty()) {
+                try {
+                    Thread.sleep(5000);
+                    log.warn("未消费到数据,睡眠5秒...");
+                    continue;
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                    log.error("线程sleep异常",e);
+                }
+            }
+            for (ConsumerRecord<String, String> record : records) {
+                log.info("Received message: (" + record.key() + ", " + record.value() + ") at offset "
+                        + record.offset());
+                // todo 业务处理
+                JSONObject resJson = JSONObject.parseObject(record.value());
+
+                //TODO 根据设备号得到设定好的对应的设备类型名
+                List<String> filteredKeys = KprBaseInitFun.getInstance().jiangjinDeviceList.entrySet().stream()
+                        .filter(entry -> entry.getValue().contains(record.key()))
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toList());
+                if(CollectionUtils.isEmpty(filteredKeys)){
+                    continue;
+                }
+                String deviceType = filteredKeys.get(0).split("_")[1];
+//                //第三方对应的字段集
+                List<String> params = KprBaseInitFun.getInstance().jiangjinParams.get(deviceType);
+//                //deviceType为表名
+//                //time为当前数据time的纳秒时间戳,已经做过整点处理了
+                Point pointNanos = createPointFromJson(deviceType,
+                        resJson.getJSONObject("content").getJSONObject("data")
+                        ,record.key()
+                        ,params);
+                if(pointNanos!=null) {
+                    KprJiangjinWaterBizfun.infulxJiangjinDbUtil.insert(pointNanos);
+                }
+                count.addAndGet(1);
+            }
+            log.info("接收" + count + "条记录共耗时:" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "毫秒");
+        }
+    }
+
+    public static Point createPointFromJson(String deviceType, JSONObject jsonObject,String cmCode,List<String> params) {
+        // 获取时间戳
+        long nanoTimestamp = convertToNanoTimestamp(jsonObject.getLong("collectionTime"));
+        if(nanoTimestamp==0L){
+            return null;
+        }
+        // 创建 Point.Builder 对象
+        Point.Builder pointBuilder = Point.measurement(KprBaseInitFun.getInstance().jiangjinMeasurementMap.get(deviceType))
+                .tag("dev_id",cmCode)
+                .time(nanoTimestamp, TimeUnit.NANOSECONDS);
+
+        // 遍历 JSON 对象的键值对,并添加到 Point.Builder 中
+        for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
+            String key = entry.getKey();
+            if (!key.equals("collectionTime")) {
+                processField(pointBuilder,key, entry.getValue(),params);
+            }
+        }
+        if(!pointBuilder.hasFields()){
+            return null;
+        }
+
+        // 构建 Point 对象
+        try {
+            return pointBuilder.build();
+        }catch(Exception ex){
+            ex.printStackTrace();
+        }
+        return null;
+    }
+
+    //指定列处理
+    private static void processField(Point.Builder pointBuilder, String key, Object value,List<String> params) {
+        if (value == null) {
+            return; // 跳过空值
+        }
+        //String[]长度如果为0,则说明本系统没有对应字段,则用它本身,反之用第二个元素作为字段名
+        List<String[]> newParams = params.stream()
+                .map(s -> s.split("___"))
+                .collect(Collectors.toList());
+
+        for (String[] clies:newParams){
+            boolean exists = Arrays.asList(clies).contains(key);
+            if(exists){
+                String newKey = clies.length>1?clies[1]:clies[0];
+                if (value instanceof Integer) {
+                    pointBuilder.addField(newKey, (Integer) value);
+                } else if (value instanceof Long) {
+                    pointBuilder.addField(newKey, (Long) value);
+                } else if (value instanceof Boolean) {
+                    pointBuilder.addField(newKey, ((Boolean)value)?1:0);
+                } else if (value instanceof String) {
+                    if (((String)value).equals("01")||((String)value).equals("00")){
+                        pointBuilder.addField(newKey, ((String) value).replace("0",""));
+                    }else {
+                        pointBuilder.addField(newKey, (String) value);
+                    }
+                }else if (value instanceof BigDecimal) {
+                    pointBuilder.addField(newKey, ((BigDecimal) value).doubleValue());
+                }else{
+                    pointBuilder.addField(newKey, (String.valueOf(value)) );
+                }
+                break;
+            }
+        }
+    }
+
+
+    //获取CST 纳秒时间戳
+    private static long convertToNanoTimestamp(Long dateTimeString) {
+        // 解析字符串为 LocalDateTime 对象
+        LocalDateTime dateTime = Instant.ofEpochMilli(dateTimeString)
+                .atZone(ZoneId.systemDefault())
+                .toLocalDateTime();
+        // 指定时区为CST(Asia/Shanghai)
+        ZoneId shanghaiZoneId = ZoneId.of("Asia/Shanghai");
+        // 将LocalDateTime对象转换为ZonedDateTime对象
+        ZonedDateTime zonedDateTime = dateTime.atZone(shanghaiZoneId);
+        // 转换为秒级别的时间戳
+        long epochSecond = dateTime.toEpochSecond(ZoneOffset.UTC);
+        // 将ZonedDateTime对象转换为Instant对象(UTC时间点)
+        Instant instant = zonedDateTime.toInstant();
+        // 计算从1970年1月1日00:00:00 UTC以来的纳秒数
+        long nanosecondsSinceEpoch = ChronoUnit.NANOS.between(Instant.EPOCH, instant);
+        long currentTimeNanos = System.currentTimeMillis() * 1_000_000L;
+        if(nanosecondsSinceEpoch>currentTimeNanos){
+            return 0L;
+        }
+
+        // 取整到最近的整分
+        long secondsSinceEpoch = nanosecondsSinceEpoch / 1_000_000_000L;
+        long nanosWithinSecond = nanosecondsSinceEpoch % 1_000_000_000L;
+
+        // 如果秒内的纳秒数大于等于30秒,进位到下一个整分
+        if (nanosWithinSecond >= 30 * 1_000_000_000L) {
+            secondsSinceEpoch += 60 - (secondsSinceEpoch % 60);
+        } else {
+            secondsSinceEpoch -= secondsSinceEpoch % 60;
+        }
+
+        // 返回调整后的纳秒时间戳
+        return secondsSinceEpoch * 1_000_000_000L;
+    }
+
+    public static void main(String[] args) {
+        // kafka的连接地址和主题请根据实际情况修改(kafka集群多个节点用逗号隔开,如10.15.0.114:9092,10.15.0.115:9092)
+//        KafkaConsumer kafkaConsumerDemo = new KafkaConsumer("10.127.16.85:9092", "jiangjin");
+//        kafkaConsumerDemo.doConsume();
+    }
+}

+ 5 - 11
dc3-gateway/src/main/java/io/github/pnoker/gateway/utils/InfulxDbUtil.java

@@ -37,19 +37,13 @@ public class InfulxDbUtil {
 
     private InfluxDB influxDB = null;
 
-    @PostConstruct
     public void initInfluxDataBase(){
-        new Timer().schedule(new TimerTask() {
-            @Override
-            public void run() {
-                influxDB = InfluxDBFactory.connect(serverUrl, username, password);
-                // 创建数据库
-                influxDB.query(new Query("CREATE DATABASE " + database));
+         influxDB = InfluxDBFactory.connect(serverUrl, username, password);
+         // 创建数据库
+         influxDB.query(new Query("CREATE DATABASE " + database));
 
-                // 设置要使用的数据库
-                influxDB.setDatabase(database);
-            }
-        },3000);
+         // 设置要使用的数据库
+         influxDB.setDatabase(database);
     }
 
     public ResponseCode insert(Point point){

+ 75 - 0
dc3-gateway/src/main/java/io/github/pnoker/gateway/utils/InfulxJiangjinDbUtil.java

@@ -0,0 +1,75 @@
+package io.github.pnoker.gateway.utils;
+
+import com.alibaba.cloud.commons.lang.StringUtils;
+import io.github.pnoker.gateway.comtool.ResponseCode;
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.Point;
+import org.influxdb.dto.Query;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * @ClassName InfulxDbUtil
+ * @Description: TODO influxDb连接工具类
+ * @Author LX
+ * @Date 2024/9/3
+ * @Version V1.0
+ **/
+@Component("infulxJiangjinDbUtil")
+public class InfulxJiangjinDbUtil {
+    @Value("${jiangjin.inluxdb.serverUrl:}")
+    private String serverUrl = ""; // InfluxDB 服务器地址
+
+    @Value("${jiangjin.inluxdb.username:}")
+    private String username = ""; // InfluxDB 用户名
+
+    @Value("${jiangjin.inluxdb.password:}")
+    private String password = ""; // InfluxDB 密码
+
+    @Value("${jiangjin.inluxdb.database:}")
+    private String database = ""; // 数据库名称
+
+    private InfluxDB influxDB = null;
+
+    public void initInfluxDataBase(){
+        try {
+            if (!StringUtils.isEmpty(username) && !StringUtils.isEmpty(password)) {
+                influxDB = InfluxDBFactory.connect(serverUrl, username, password);
+            } else {
+                influxDB = InfluxDBFactory.connect(serverUrl);
+            }
+            // 创建数据库
+            influxDB.query(new Query("CREATE DATABASE " + database));
+
+            // 设置要使用的数据库
+            influxDB.setDatabase(database);
+        }catch (Exception ex){
+            ex.printStackTrace();
+        }
+    }
+
+    public ResponseCode insert(Point point){
+        try {
+            influxDB.write(point);
+            return ResponseCode.RESULT_NORMAL;
+        }catch(Exception ex){
+            return ResponseCode.BUSINESS_DB_REQ_FAILED;
+        }
+    }
+
+    public ResponseCode insertBatch(BatchPoints point){
+        try {
+            influxDB.write(point);
+            return ResponseCode.RESULT_NORMAL;
+        }catch(Exception ex){
+            return ResponseCode.BUSINESS_DB_REQ_FAILED;
+        }
+    }
+
+}

+ 126 - 0
dc3-gateway/src/main/java/io/github/pnoker/gateway/utils/KafkaConsumerDemo.java

@@ -0,0 +1,126 @@
+package io.github.pnoker.gateway.utils;
+
+import com.alibaba.nacos.shaded.com.google.common.base.Stopwatch;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @ClassName KafkaConsumer
+ * @Description: TODO
+ * @Author LX
+ * @Date 2024/11/12
+ * @Version V1.0
+ **/
+public class KafkaConsumerDemo {
+    /**
+     * log 日志
+     */
+    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerDemo.class);
+
+    /**
+     * 计数
+     */
+    private final AtomicInteger count = new AtomicInteger(0);
+
+    /**
+     * 原生消费者
+     */
+    private KafkaConsumer<String, String> consumer;
+
+    /**
+     * kafka 连接地址
+     */
+    private String kafkaAddr;
+
+    /**
+     * kafka 消费主题
+     */
+    private String topic;
+
+    /**
+     * 构造函数
+     * @param kafkaAddr kafka 连接地址
+     * @param topic kafka 消费主题
+     */
+    public KafkaConsumerDemo(String kafkaAddr, String topic) {
+        this.kafkaAddr = kafkaAddr;
+        this.topic = topic;
+    }
+
+    /**
+     * 生成消费者对象
+     */
+    private void getConsumer(){
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddr);
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer029-group");
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
+        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringDeserializer");
+//        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+        //如果重置为 latest,消费者会跳过所有现有消息,只处理新到达的消息。
+        //如果重置为 earliest,消费者会重新处理所有已存在的消息。
+        /**
+         * 使用 latest 适合于不关心历史消息,只处理实时数据的场景。
+            使用 earliest 适合于需要完整处理所有可用数据的场景,例如数据迁移或批数据处理。
+         */
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+//        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        // 此处为加密配置,用户名密码需要调整为分配给你的信息
+        props.put("security.protocol", "SASL_PLAINTEXT");
+        props.put("sasl.mechanism", "SCRAM-SHA-512");
+        props.put("sasl.jaas.config",
+                "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"consumer029\" password=\"SL2ASL9K\";");
+        consumer = new KafkaConsumer<>(props);
+    }
+
+    /**
+     * 消费数据
+     */
+    public void doConsume() {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        if(consumer == null) {
+            getConsumer();
+        }
+        while (true) {
+            consumer.subscribe(Collections.singletonList(topic));
+            ConsumerRecords<String, String> records = consumer.poll(1000);
+            if (records.isEmpty()) {
+                try {
+                    Thread.sleep(5000);
+                    log.warn("未消费到数据,睡眠5秒...");
+                    continue;
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                    log.error("线程sleep异常",e);
+                }
+            }
+            for (ConsumerRecord<String, String> record : records) {
+                log.info("Received message: (" + record.key() + ", " + record.value() + ") at offset "
+                        + record.offset());
+                // todo 业务处理
+                count.addAndGet(1);
+            }
+            log.info("接收" + count + "条记录共耗时:" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "毫秒");
+        }
+    }
+
+    public static void main(String[] args) {
+        // kafka的连接地址和主题请根据实际情况修改(kafka集群多个节点用逗号隔开,如10.15.0.114:9092,10.15.0.115:9092)
+        KafkaConsumerDemo kafkaConsumerDemo = new KafkaConsumerDemo("10.127.16.85:9092", "jiangjin");
+        kafkaConsumerDemo.doConsume();
+    }
+}

File diff suppressed because it is too large
+ 38 - 0
dc3-gateway/src/main/resources/application-config.yml


+ 19 - 4
dc3-gateway/src/main/resources/application.yml

@@ -16,7 +16,7 @@
 
 server:
   #服务器配置
-  port: 9309
+  port: 8000
   undertow:
     threads:
       io: 2
@@ -128,7 +128,8 @@ spring:
             #   fallbackUri: 'forward:/fallback'
   datasource:
       driver-class-name: org.postgis.DriverWrapper
-      jdbc-url: jdbc:postgresql_postGIS://192.168.0.4:5432/watersmart?useSSL=false&useAffectedRows=false&allowMultiQueries=true
+#      jdbc-url: jdbc:postgresql_postGIS://192.168.0.4:5432/watersmart?useSSL=false&useAffectedRows=false&allowMultiQueries=true
+      jdbc-url: jdbc:postgresql_postGIS://140.246.183.164:5432/water_smart_develop_branch?useSSL=false&useAffectedRows=false&allowMultiQueries=true
       username: postgres
       password: kpr.23417.postgres
       data:
@@ -157,10 +158,14 @@ xuchang:
 dangyang:
 #influxdb相关配置
   inluxdb:
-    database: iot
+    database: iot_test
+#    database: iot
     serverUrl: http://192.168.0.4:8086
     username: kpr
     password: kpr.2024dy.influxdb
+#    serverUrl: http://127.0.0.1:8086
+#    username: admin
+#    password: yourpassword
 #设备数据相关接口地址
   http:
     username: shkpr
@@ -210,4 +215,14 @@ PRESS_METER: WaterMeter
 WATER_QUALITY: WaterQuality
 WATER_PUMP: WaterPump
 ELEC_CTRL_VALVE: ValveMeter
-LIQUID_LEVEL_METER: LiquidLevel
+LIQUID_LEVEL_METER: LiquidLevel
+
+#江津influxdb配置
+jiangjin:
+#influxdb相关配置
+  inluxdb:
+    database: iot_test
+#    database: iot
+    serverUrl: http://10.101.16.13:8086
+#    username: kpr
+#    password: kpr.2024dy.influxdb