Browse Source

江津数据采集 更改采集逻辑和相关配置

1037015548@qq.com 8 months ago
parent
commit
4605eb5853

+ 5 - 10
dc3-gateway/src/main/java/io/github/pnoker/gateway/bizmgr/baseInit/KprBaseInitFun.java

@@ -89,14 +89,13 @@ public class KprBaseInitFun {
         List<String> VoltageSwitchgear=Arrays.asList(properties.getProperty("VoltageSwitchgear").split(","));
         List<String> WaterValve=Arrays.asList(properties.getProperty("WaterValve").split(","));
         List<String> VariableFrequencyDrive=Arrays.asList(properties.getProperty("VariableFrequencyDrive").split(","));
-        List<String> ChlorineFlowMeter=Arrays.asList(properties.getProperty("ChlorineFlowMeter").split(","));
         List<String> WaterQuality=Arrays.asList(properties.getProperty("WaterQuality").split(","));
         List<String> DifferentialPressure=Arrays.asList(properties.getProperty("DifferentialPressure").split(","));
         List<String> ElectricMotor=Arrays.asList(properties.getProperty("ElectricMotor").split(","));
         List<String> WaterPump=Arrays.asList(properties.getProperty("WaterPump").split(","));
         List<String> AirCompressor=Arrays.asList(properties.getProperty("AirCompressor").split(","));
         List<String> RootsBlower=Arrays.asList(properties.getProperty("RootsBlower").split(","));
-        List<String> thMeter=Arrays.asList(properties.getProperty("thMeter").split(","));
+        List<String> ThMeter=Arrays.asList(properties.getProperty("ThMeter").split(","));
         List<String> SludgeIntfMeter=Arrays.asList(properties.getProperty("SludgeIntfMeter").split(","));
         List<String> InformationSoftware=Arrays.asList(properties.getProperty("InformationSoftware").split(","));
         List<String> WaterMeter=Arrays.asList(properties.getProperty("WaterMeter").split(","));
@@ -107,14 +106,13 @@ public class KprBaseInitFun {
         jiangjinParams.put("VoltageSwitchgear",VoltageSwitchgear);
         jiangjinParams.put("WaterValve",WaterValve);
         jiangjinParams.put("VariableFrequencyDrive",VariableFrequencyDrive);
-        jiangjinParams.put("ChlorineFlowMeter",ChlorineFlowMeter);
         jiangjinParams.put("WaterQuality",WaterQuality);
         jiangjinParams.put("DifferentialPressure",DifferentialPressure);
         jiangjinParams.put("ElectricMotor",ElectricMotor);
         jiangjinParams.put("WaterPump",WaterPump);
         jiangjinParams.put("AirCompressor",AirCompressor);
         jiangjinParams.put("RootsBlower",RootsBlower);
-        jiangjinParams.put("thMeter",thMeter);
+        jiangjinParams.put("ThMeter",ThMeter);
         jiangjinParams.put("SludgeIntfMeter",SludgeIntfMeter);
         jiangjinParams.put("InformationSoftware",InformationSoftware);
         jiangjinParams.put("WaterMeter",WaterMeter);
@@ -125,14 +123,13 @@ public class KprBaseInitFun {
         List<String> DeviceVoltageSwitchgear=Arrays.asList(properties.getProperty("VoltageSwitchgear_device").split(","));
         List<String> DeviceWaterValve=Arrays.asList(properties.getProperty("WaterValve_device").split(","));
         List<String> DeviceVariableFrequencyDrive=Arrays.asList(properties.getProperty("VariableFrequencyDrive_device").split(","));
-        List<String> DeviceChlorineFlowMeter=Arrays.asList(properties.getProperty("ChlorineFlowMeter_device").split(","));
         List<String> DeviceWaterQuality=Arrays.asList(properties.getProperty("WaterQuality_device").split(","));
         List<String> DeviceDifferentialPressure=Arrays.asList(properties.getProperty("DifferentialPressure_device").split(","));
         List<String> DeviceElectricMotor=Arrays.asList(properties.getProperty("ElectricMotor_device").split(","));
         List<String> DeviceWaterPump=Arrays.asList(properties.getProperty("WaterPump_device").split(","));
         List<String> DeviceAirCompressor=Arrays.asList(properties.getProperty("AirCompressor_device").split(","));
         List<String> DeviceRootsBlower=Arrays.asList(properties.getProperty("RootsBlower_device").split(","));
-        List<String> DevicethMeter=Arrays.asList(properties.getProperty("thMeter_device").split(","));
+        List<String> DeviceThMeter=Arrays.asList(properties.getProperty("ThMeter_device").split(","));
         List<String> DeviceSludgeIntfMeter=Arrays.asList(properties.getProperty("SludgeIntfMeter_device").split(","));
         List<String> DeviceInformationSoftware=Arrays.asList(properties.getProperty("InformationSoftware_device").split(","));
         List<String> DeviceWaterMeter=Arrays.asList(properties.getProperty("WaterMeter_device").split(","));
@@ -143,14 +140,13 @@ public class KprBaseInitFun {
         jiangjinDeviceList.put("Device_VoltageSwitchgear",DeviceVoltageSwitchgear);
         jiangjinDeviceList.put("Device_WaterValve",DeviceWaterValve);
         jiangjinDeviceList.put("Device_VariableFrequencyDrive",DeviceVariableFrequencyDrive);
-        jiangjinDeviceList.put("Device_ChlorineFlowMeter",DeviceChlorineFlowMeter);
         jiangjinDeviceList.put("Device_WaterQuality",DeviceWaterQuality);
         jiangjinDeviceList.put("Device_DifferentialPressure",DeviceDifferentialPressure);
         jiangjinDeviceList.put("Device_ElectricMotor",DeviceElectricMotor);
         jiangjinDeviceList.put("Device_WaterPump",DeviceWaterPump);
         jiangjinDeviceList.put("Device_AirCompressor",DeviceAirCompressor);
         jiangjinDeviceList.put("Device_RootsBlower",DeviceRootsBlower);
-        jiangjinDeviceList.put("Device_thMeter",DevicethMeter);
+        jiangjinDeviceList.put("Device_ThMeter",DeviceThMeter);
         jiangjinDeviceList.put("Device_SludgeIntfMeter",DeviceSludgeIntfMeter);
         jiangjinDeviceList.put("Device_InformationSoftware",DeviceInformationSoftware);
         jiangjinDeviceList.put("Device_WaterMeter",DeviceWaterMeter);
@@ -161,14 +157,13 @@ public class KprBaseInitFun {
         jiangjinMeasurementMap.put("VoltageSwitchgear","VoltageSwitchgear");
         jiangjinMeasurementMap.put("WaterValve","WaterValve");
         jiangjinMeasurementMap.put("VariableFrequencyDrive","VariableFrequencyDrive");
-        jiangjinMeasurementMap.put("ChlorineFlowMeter","ChlorineFlowMeter");
         jiangjinMeasurementMap.put("WaterQuality","WaterQuality");
         jiangjinMeasurementMap.put("DifferentialPressure","DifferentialPressure");
         jiangjinMeasurementMap.put("ElectricMotor","ElectricMotor");
         jiangjinMeasurementMap.put("WaterPump","WaterPump");
         jiangjinMeasurementMap.put("AirCompressor","AirCompressor");
         jiangjinMeasurementMap.put("RootsBlower","RootsBlower");
-        jiangjinMeasurementMap.put("thMeter","thMeter");
+        jiangjinMeasurementMap.put("ThMeter","ThMeter");
         jiangjinMeasurementMap.put("SludgeIntfMeter","SludgeIntfMeter");
         jiangjinMeasurementMap.put("InformationSoftware","InformationSoftware");
         jiangjinMeasurementMap.put("WaterMeter","WaterMeter");

+ 37 - 26
dc3-gateway/src/main/java/io/github/pnoker/gateway/comtool/jiangjinThread/KafkaConsumer.java

@@ -77,6 +77,7 @@ public class KafkaConsumer {
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddr);
         props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer029-group");
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+//        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
@@ -124,32 +125,30 @@ public class KafkaConsumer {
             for (ConsumerRecord<String, String> record : records) {
                 log.info("Received message: (" + record.key() + ", " + record.value() + ") at offset "
                         + record.offset());
-                // todo 业务处理
-                new Thread(() -> {
-                    JSONObject resJson = JSONObject.parseObject(record.value());
+                // TODO 业务处理
+                JSONObject resJson = JSONObject.parseObject(record.value());
 
-                    //TODO 根据设备号得到设定好的对应的设备类型名
-                    List<String> filteredKeys = KprBaseInitFun.getInstance().jiangjinDeviceList.entrySet().stream()
-                            .filter(entry -> entry.getValue().contains(record.key()))
-                            .map(Map.Entry::getKey)
-                            .collect(Collectors.toList());
-                    if(CollectionUtils.isEmpty(filteredKeys)){
-                        return;
-                    }
-                    String deviceType = filteredKeys.get(0).split("_")[1];
-    //                //第三方对应的字段集
-                    List<String> params = KprBaseInitFun.getInstance().jiangjinParams.get(deviceType);
-    //                //deviceType为表名
-    //                //time为当前数据time的纳秒时间戳,已经做过整点处理了
-                    Point pointNanos = createPointFromJson(deviceType,
-                            resJson.getJSONObject("content").getJSONObject("data")
-                            ,record.key()
-                            ,params);
-                    if(pointNanos!=null) {
-                        KprJiangjinWaterBizfun.infulxJiangjinDbUtil.insert(pointNanos);
-                    }
-                    count.addAndGet(1);
-                }).start();
+                //TODO 根据设备号得到设定好的对应的设备类型名
+                List<String> filteredKeys = KprBaseInitFun.getInstance().jiangjinDeviceList.entrySet().stream()
+                        .filter(entry -> entry.getValue().contains(record.key()))
+                        .map(Map.Entry::getKey)
+                        .collect(Collectors.toList());
+                if(CollectionUtils.isEmpty(filteredKeys)){
+                    continue;
+                }
+                String deviceType = filteredKeys.get(0).split("_")[1];
+//                //第三方对应的字段集
+                List<String> params = KprBaseInitFun.getInstance().jiangjinParams.get(deviceType);
+//                //deviceType为表名
+//                //time为当前数据time的纳秒时间戳,已经做过整点处理了
+                Point pointNanos = createPointFromJson(deviceType,
+                        resJson.getJSONObject("content").getJSONObject("data")
+                        ,record.key()
+                        ,params);
+                if(pointNanos!=null) {
+                    KprJiangjinWaterBizfun.infulxJiangjinDbUtil.insert(pointNanos);
+                }
+                count.addAndGet(1);
             }
             log.info("接收" + count + "条记录共耗时:" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "毫秒");
         }
@@ -219,12 +218,24 @@ public class KafkaConsumer {
                 }
                 if(newKey.equals("flow_l_cur")||newKey.equals("flow_l_total_pos")){
                     pointBuilder.addField(newKey,  Double.valueOf(ObjectUtils.isEmpty(value)?"0.0":value.toString()));
+                    if(newKey.equals("flow_l_cur")){
+                        pointBuilder.addField(newKey,  Double.valueOf(ObjectUtils.isEmpty(value)?"0.0"
+                                :convertCubicMetersToLiters(Double.valueOf(value.toString()))));
+                        pointBuilder.addField("flow_cur",  Double.valueOf(ObjectUtils.isEmpty(value)?"0.0":value.toString()));
+                    }
+                    if(newKey.equals("flow_l_total_pos")){
+                        pointBuilder.addField(newKey,  Double.valueOf(ObjectUtils.isEmpty(value)?"0.0"
+                                :convertCubicMetersToLiters(Double.valueOf(value.toString()))));
+                        pointBuilder.addField("flow_total_pos",  Double.valueOf(ObjectUtils.isEmpty(value)?"0.0":value.toString()));
+                    }
                 }
                 break;
             }
         }
     }
-
+    public static String convertCubicMetersToLiters(double cubicMeters) {
+        return String.valueOf(cubicMeters * 1000);
+    }
 
     //获取CST 纳秒时间戳
     private static long convertToNanoTimestamp(Long dateTimeString) {