|
@@ -71,15 +71,12 @@ public class KafkaConsumer {
|
|
|
|
|
|
/**
|
|
|
* 生成消费者对象
|
|
|
- * 当前消费者对象是历史记录同步
|
|
|
*/
|
|
|
private void getConsumer(){
|
|
|
Properties props = new Properties();
|
|
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddr);
|
|
|
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer029-group");
|
|
|
-// props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer029-group-" + System.currentTimeMillis());// 动态生成组 ID
|
|
|
-// props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
|
|
|
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); // 禁用自动提交
|
|
|
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
|
|
|
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
|
|
|
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
|
|
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
|
|
@@ -91,7 +88,7 @@ public class KafkaConsumer {
|
|
|
//如果重置为 earliest,消费者会重新处理所有已存在的消息。
|
|
|
/**
|
|
|
* 使用 latest 适合于不关心历史消息,只处理实时数据的场景。
|
|
|
- 使用 earliest 适合于需要完整处理所有可用数据的场景,例如数据迁移或批数据处理。
|
|
|
+ 使用 earliest 适合于需要完整处理所有可用数据的场景,例如数据迁移或批数据处理。
|
|
|
*/
|
|
|
// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
|
|
|
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
|
@@ -100,7 +97,7 @@ public class KafkaConsumer {
|
|
|
props.put("sasl.mechanism", "SCRAM-SHA-512");
|
|
|
props.put("sasl.jaas.config",
|
|
|
"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"consumer029\" password=\"SL2ASL9K\";");
|
|
|
- consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(props);
|
|
|
+ consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -128,29 +125,31 @@ public class KafkaConsumer {
|
|
|
log.info("Received message: (" + record.key() + ", " + record.value() + ") at offset "
|
|
|
+ record.offset());
|
|
|
// todo 业务处理
|
|
|
- JSONObject resJson = JSONObject.parseObject(record.value());
|
|
|
+ new Thread(() -> {
|
|
|
+ JSONObject resJson = JSONObject.parseObject(record.value());
|
|
|
|
|
|
- //TODO 根据设备号得到设定好的对应的设备类型名
|
|
|
- List<String> filteredKeys = KprBaseInitFun.getInstance().jiangjinDeviceList.entrySet().stream()
|
|
|
- .filter(entry -> entry.getValue().contains(record.key()))
|
|
|
- .map(Map.Entry::getKey)
|
|
|
- .collect(Collectors.toList());
|
|
|
- if(CollectionUtils.isEmpty(filteredKeys)){
|
|
|
- continue;
|
|
|
- }
|
|
|
- String deviceType = filteredKeys.get(0).split("_")[1];
|
|
|
-// //第三方对应的字段集
|
|
|
- List<String> params = KprBaseInitFun.getInstance().jiangjinParams.get(deviceType);
|
|
|
-// //deviceType为表名
|
|
|
-// //time为当前数据time的纳秒时间戳,已经做过整点处理了
|
|
|
- Point pointNanos = createPointFromJson(deviceType,
|
|
|
- resJson.getJSONObject("content").getJSONObject("data")
|
|
|
- ,record.key()
|
|
|
- ,params);
|
|
|
- if(pointNanos!=null) {
|
|
|
- KprJiangjinWaterBizfun.infulxJiangjinDbUtil.insert(pointNanos);
|
|
|
- }
|
|
|
- count.addAndGet(1);
|
|
|
+ //TODO 根据设备号得到设定好的对应的设备类型名
|
|
|
+ List<String> filteredKeys = KprBaseInitFun.getInstance().jiangjinDeviceList.entrySet().stream()
|
|
|
+ .filter(entry -> entry.getValue().contains(record.key()))
|
|
|
+ .map(Map.Entry::getKey)
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ if(CollectionUtils.isEmpty(filteredKeys)){
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ String deviceType = filteredKeys.get(0).split("_")[1];
|
|
|
+ // //第三方对应的字段集
|
|
|
+ List<String> params = KprBaseInitFun.getInstance().jiangjinParams.get(deviceType);
|
|
|
+ // //deviceType为表名
|
|
|
+ // //time为当前数据time的纳秒时间戳,已经做过整点处理了
|
|
|
+ Point pointNanos = createPointFromJson(deviceType,
|
|
|
+ resJson.getJSONObject("content").getJSONObject("data")
|
|
|
+ ,record.key()
|
|
|
+ ,params);
|
|
|
+ if(pointNanos!=null) {
|
|
|
+ KprJiangjinWaterBizfun.infulxJiangjinDbUtil.insert(pointNanos);
|
|
|
+ }
|
|
|
+ count.addAndGet(1);
|
|
|
+ }).start();
|
|
|
}
|
|
|
log.info("接收" + count + "条记录共耗时:" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "毫秒");
|
|
|
}
|