|
@@ -1,5 +1,6 @@
|
|
|
package io.github.pnoker.gateway.comtool.jiangjinThread;
|
|
|
|
|
|
+import cn.hutool.core.util.ObjectUtil;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import com.alibaba.nacos.shaded.com.google.common.base.Stopwatch;
|
|
|
import io.github.pnoker.gateway.SpringContextUtil;
|
|
@@ -14,6 +15,7 @@ import org.influxdb.dto.Point;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.util.CollectionUtils;
|
|
|
+import org.springframework.util.ObjectUtils;
|
|
|
|
|
|
import java.math.BigDecimal;
|
|
|
import java.time.*;
|
|
@@ -69,12 +71,15 @@ public class KafkaConsumer {
|
|
|
|
|
|
/**
|
|
|
* 生成消费者对象
|
|
|
+ * 当前消费者对象是历史记录同步
|
|
|
*/
|
|
|
private void getConsumer(){
|
|
|
Properties props = new Properties();
|
|
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddr);
|
|
|
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer029-group");
|
|
|
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
|
|
|
+// props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer029-group-" + System.currentTimeMillis());// 动态生成组 ID
|
|
|
+// props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
|
|
|
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); // 禁用自动提交
|
|
|
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
|
|
|
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
|
|
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
|
|
@@ -88,8 +93,8 @@ public class KafkaConsumer {
|
|
|
* 使用 latest 适合于不关心历史消息,只处理实时数据的场景。
|
|
|
使用 earliest 适合于需要完整处理所有可用数据的场景,例如数据迁移或批数据处理。
|
|
|
*/
|
|
|
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
|
|
|
-// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
|
|
+// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
|
|
|
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
|
|
// 此处为加密配置,用户名密码需要调整为分配给你的信息
|
|
|
props.put("security.protocol", "SASL_PLAINTEXT");
|
|
|
props.put("sasl.mechanism", "SCRAM-SHA-512");
|
|
@@ -213,6 +218,9 @@ public class KafkaConsumer {
|
|
|
}else{
|
|
|
pointBuilder.addField(newKey, (String.valueOf(value)) );
|
|
|
}
|
|
|
+ if(newKey.equals("flow_l_cur")||newKey.equals("flow_l_total_pos")){
|
|
|
+ pointBuilder.addField(newKey, Double.valueOf(ObjectUtils.isEmpty(value)?"0.0":value.toString()));
|
|
|
+ }
|
|
|
break;
|
|
|
}
|
|
|
}
|