|
@@ -3,19 +3,26 @@ package io.github.pnoker.gateway.bizmgr;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import io.github.pnoker.gateway.SpringContextUtil;
|
|
|
import io.github.pnoker.gateway.bizmgr.baseInit.KprBaseInitFun;
|
|
|
+import io.github.pnoker.gateway.comtool.TimeTool;
|
|
|
import io.github.pnoker.gateway.comtool.jiangjinThread.KafkaConsumer;
|
|
|
+import io.github.pnoker.gateway.dbdao.DBMgrProxy;
|
|
|
import io.github.pnoker.gateway.utils.InfulxDbUtil;
|
|
|
import io.github.pnoker.gateway.utils.InfulxJiangjinDbUtil;
|
|
|
import org.influxdb.dto.Point;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
|
|
|
import java.math.BigDecimal;
|
|
|
import java.time.*;
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
+import java.time.format.DateTimeFormatterBuilder;
|
|
|
+import java.time.temporal.ChronoField;
|
|
|
import java.time.temporal.ChronoUnit;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Optional;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
@@ -34,9 +41,9 @@ public class KprJiangjinWaterBizfun {
|
|
|
|
|
|
public static InfulxJiangjinDbUtil infulxJiangjinDbUtil = null;//infulx工具类对象
|
|
|
|
|
|
- public static Point createPointFromJson(String deviceType, JSONObject jsonObject,String cmCode,List<String> params) {
|
|
|
+ public static Point createPointFromJson(String deviceType,String standardCode,Map<String,Object> mapEntity,String cmCode,List<String> params) {
|
|
|
// 获取时间戳
|
|
|
- long nanoTimestamp = convertToNanoTimestamp(jsonObject.getLong("collectionTime"));
|
|
|
+ long nanoTimestamp = convertToNanoTimestamp(mapEntity.get("QCQUISITION_TIME").toString());
|
|
|
if(nanoTimestamp==0L){
|
|
|
return null;
|
|
|
}
|
|
@@ -46,12 +53,8 @@ public class KprJiangjinWaterBizfun {
|
|
|
.time(nanoTimestamp, TimeUnit.NANOSECONDS);
|
|
|
|
|
|
// 遍历 JSON 对象的键值对,并添加到 Point.Builder 中
|
|
|
- for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
|
|
|
- String key = entry.getKey();
|
|
|
- if (!key.equals("collectionTime")) {
|
|
|
- processField(pointBuilder,key, entry.getValue(),params);
|
|
|
- }
|
|
|
- }
|
|
|
+
|
|
|
+ processField(pointBuilder,standardCode, mapEntity.get("VAL"),params);
|
|
|
if(!pointBuilder.hasFields()){
|
|
|
return null;
|
|
|
}
|
|
@@ -103,31 +106,168 @@ public class KprJiangjinWaterBizfun {
|
|
|
|
|
|
|
|
|
//获取CST 纳秒时间戳
|
|
|
- private static long convertToNanoTimestamp(Long dateTimeString) {
|
|
|
+ private static long convertToNanoTimestamp(String dateTimeString) {
|
|
|
+ // 定义时间格式,支持可选的小数秒部分
|
|
|
+ DateTimeFormatter formatter = new DateTimeFormatterBuilder()
|
|
|
+ .appendPattern("yyyy-MM-dd HH:mm:ss")
|
|
|
+ .optionalStart()
|
|
|
+ .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
|
|
|
+ .optionalEnd()
|
|
|
+ .toFormatter();
|
|
|
+
|
|
|
// 解析字符串为 LocalDateTime 对象
|
|
|
- LocalDateTime dateTime = Instant.ofEpochMilli(dateTimeString)
|
|
|
- .atZone(ZoneId.systemDefault())
|
|
|
- .toLocalDateTime();
|
|
|
+ LocalDateTime dateTime = LocalDateTime.parse(dateTimeString, formatter);
|
|
|
+
|
|
|
// 指定时区为CST(Asia/Shanghai)
|
|
|
ZoneId shanghaiZoneId = ZoneId.of("Asia/Shanghai");
|
|
|
- // 将LocalDateTime对象转换为ZonedDateTime对象
|
|
|
+
|
|
|
+ // 将 LocalDateTime 对象转换为 ZonedDateTime 对象
|
|
|
ZonedDateTime zonedDateTime = dateTime.atZone(shanghaiZoneId);
|
|
|
- // 转换为秒级别的时间戳
|
|
|
- long epochSecond = dateTime.toEpochSecond(ZoneOffset.UTC);
|
|
|
- // 将ZonedDateTime对象转换为Instant对象(UTC时间点)
|
|
|
+
|
|
|
+ // 将 ZonedDateTime 对象转换为 Instant 对象(UTC时间点)
|
|
|
Instant instant = zonedDateTime.toInstant();
|
|
|
+
|
|
|
// 计算从1970年1月1日00:00:00 UTC以来的纳秒数
|
|
|
long nanosecondsSinceEpoch = ChronoUnit.NANOS.between(Instant.EPOCH, instant);
|
|
|
- long currentTimeNanos = System.currentTimeMillis() * 1_000_000L;
|
|
|
- if(nanosecondsSinceEpoch>currentTimeNanos){
|
|
|
- return 0L;
|
|
|
- }
|
|
|
- return nanosecondsSinceEpoch;
|
|
|
+
|
|
|
+ // 每分钟的纳秒数
|
|
|
+ long nanosPerMinute = 60L * 1_000_000_000L;
|
|
|
+
|
|
|
+ // 四舍五入到最接近的整分的纳秒值
|
|
|
+ long roundedNanos = Math.round((double) nanosecondsSinceEpoch / nanosPerMinute) * nanosPerMinute;
|
|
|
+
|
|
|
+ // 输出调试信息
|
|
|
+// System.out.println("解析的时间: " + dateTime);
|
|
|
+// System.out.println("时区转换后的时间: " + zonedDateTime);
|
|
|
+// System.out.println("计算的纳秒数: " + nanosecondsSinceEpoch);
|
|
|
+// System.out.println("最接近整分的纳秒数: " + roundedNanos);
|
|
|
+
|
|
|
+ return roundedNanos;
|
|
|
}
|
|
|
|
|
|
//TODO 处理kafka所有已存在数据
|
|
|
public static void InitHistory(){
|
|
|
- KafkaConsumer kafkaConsumerDemo = new KafkaConsumer("10.127.16.85:9092", "jiangjin");
|
|
|
- kafkaConsumerDemo.doConsume();
|
|
|
+// KafkaConsumer kafkaConsumerDemo = new KafkaConsumer("10.127.16.85:9092", "jiangjin");
|
|
|
+// kafkaConsumerDemo.doConsume();
|
|
|
+ }
|
|
|
+
|
|
|
+ //TODO 数据库视图采集
|
|
|
+ //TODO 按设备的最新时间作为视图查询条件
|
|
|
+ public static void InitRealDb(){
|
|
|
+ try {
|
|
|
+ int count = DBMgrProxy.getInstance().applyJiangjinDbApi().getTabWaterRealCount("");
|
|
|
+ if(count>0){
|
|
|
+ List<Map<String,Object>> mapList = DBMgrProxy.getInstance().applyJiangjinDbApi().getListWaterReal("");
|
|
|
+ if(!CollectionUtils.isEmpty(mapList)){
|
|
|
+ //TODO 首先进行特殊数据处理
|
|
|
+ int i = 1;
|
|
|
+ for (Map<String,Object> map:mapList){
|
|
|
+ //TODO 不存在过滤的就执行
|
|
|
+ if(!KprBaseInitFun.getInstance().closeTag.contains(map.get("TAG_CODE").toString())){
|
|
|
+ //TODO 业务过程
|
|
|
+ //TODO 根据设备号得到设定好的对应的设备类型名
|
|
|
+ Optional<String> foundKey = KprBaseInitFun.getInstance().jiangjinTypeList.entrySet().stream()
|
|
|
+ .filter(entry -> entry.getValue().contains(map.get("DEVICE_TYPE_CODE"))) // 过滤出包含特定值的 entry
|
|
|
+ .map(Map.Entry::getKey) // 提取 key
|
|
|
+ .findFirst(); // 找到第一个匹配的 key
|
|
|
+ if(foundKey.isPresent()) {
|
|
|
+ String deviceType = foundKey.get().split("_")[1];
|
|
|
+ //第三方对应的字段集
|
|
|
+ List<String> params = KprBaseInitFun.getInstance().jiangjinParams.get(deviceType);
|
|
|
+ //deviceType为表名
|
|
|
+ //time为当前数据time的纳秒时间戳,已经做过整点处理了
|
|
|
+ Point pointNanos = createPointFromJson(deviceType,
|
|
|
+ map.get("TAG_STANDARD_CODE").toString(),
|
|
|
+ map
|
|
|
+ , map.get("DEVICE_CODE").toString()
|
|
|
+ , params);
|
|
|
+ if (pointNanos != null) {
|
|
|
+ KprJiangjinWaterBizfun.infulxJiangjinDbUtil.insert(pointNanos);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ i++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }else{
|
|
|
+ log.error(mStrClassName+";实时数据采集为空:"+ TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)
|
|
|
+ +"InitRealDbNullError");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }catch(Exception ex){
|
|
|
+ ex.printStackTrace();
|
|
|
+ log.error(mStrClassName+";InitRealDbError:"+ex.getLocalizedMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //TODO 历史数据,根据调用传递的开始时间作为历史数据接入的起始时间
|
|
|
+ public static void initHistoryDb(LocalDateTime startDateTime){
|
|
|
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
+ System.out.println("历史数据执行:"+startDateTime.format(formatter));
|
|
|
+ if(startDateTime==null){
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ try{
|
|
|
+ //TODO 按每天六小时从起始时间循环到今天的0点
|
|
|
+ LocalDateTime todayMidnight = LocalDateTime.now().toLocalDate().atTime(1, 0);
|
|
|
+
|
|
|
+ // 循环从 startDateTime 开始,每 6 小时一次,直到今天 0 点
|
|
|
+ LocalDateTime currentDateTime = startDateTime;//每次循环查询的起始时间
|
|
|
+ while(currentDateTime.isBefore(todayMidnight)) {
|
|
|
+ String startStr = currentDateTime.format(formatter);
|
|
|
+ String endStr = currentDateTime.plusHours(6).format(formatter);
|
|
|
+ String extend = " QCQUISITION_TIME >= TO_DATE('" + startStr + "', 'YYYY-MM-DD HH24:MI:SS') " +
|
|
|
+ " AND QCQUISITION_TIME <= TO_DATE('" + endStr + "', 'YYYY-MM-DD HH24:MI:SS') ";
|
|
|
+ int count = DBMgrProxy.getInstance().applyJiangjinDbApi().getTabWaterHistoryCount(extend);
|
|
|
+ if(count>0){
|
|
|
+ int pageNum = count % 5000 == 0 ? count / 5000 : (count / 5000) + 1;//总页数
|
|
|
+
|
|
|
+ Integer limit = 5000;
|
|
|
+ if (pageNum <= 1) {
|
|
|
+ limit = count;//说明总数比第一页小
|
|
|
+ }
|
|
|
+ for (int i = 0; i < pageNum; i++) {
|
|
|
+ Integer offset = i * limit;
|
|
|
+ List<Map<String,Object>> mapList = DBMgrProxy.getInstance().applyJiangjinDbApi()
|
|
|
+ .getPageListWaterHistory(offset+limit,offset,extend);
|
|
|
+ if(!CollectionUtils.isEmpty(mapList)){
|
|
|
+ for (Map<String,Object> map:mapList){
|
|
|
+ //TODO 不存在过滤的就执行
|
|
|
+ if(!KprBaseInitFun.getInstance().closeTag.contains(map.get("TAG_CODE").toString())){
|
|
|
+ //TODO 业务过程
|
|
|
+ //TODO 根据设备号得到设定好的对应的设备类型名
|
|
|
+ Optional<String> foundKey = KprBaseInitFun.getInstance().jiangjinTypeList.entrySet().stream()
|
|
|
+ .filter(entry -> entry.getValue().contains(map.get("DEVICE_TYPE_CODE"))) // 过滤出包含特定值的 entry
|
|
|
+ .map(Map.Entry::getKey) // 提取 key
|
|
|
+ .findFirst(); // 找到第一个匹配的 key
|
|
|
+ if(foundKey.isPresent()) {
|
|
|
+ String deviceType = foundKey.get().split("_")[1];
|
|
|
+ //第三方对应的字段集
|
|
|
+ List<String> params = KprBaseInitFun.getInstance().jiangjinParams.get(deviceType);
|
|
|
+ //deviceType为表名
|
|
|
+ //time为当前数据time的纳秒时间戳,已经做过整点处理了
|
|
|
+ Point pointNanos = createPointFromJson(deviceType,
|
|
|
+ map.get("TAG_STANDARD_CODE").toString(),
|
|
|
+ map
|
|
|
+ , map.get("DEVICE_CODE").toString()
|
|
|
+ , params);
|
|
|
+ if (pointNanos != null) {
|
|
|
+ KprJiangjinWaterBizfun.infulxJiangjinDbUtil.insert(pointNanos);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }else{
|
|
|
+ log.error(mStrClassName+";历史数据采集为空:"+ TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)
|
|
|
+ +"initHistoryDbNullError");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ System.out.println(startStr+"到"+endStr+"数据整理完成:"+count);
|
|
|
+ }
|
|
|
+ currentDateTime = currentDateTime.plusHours(6); // 增加 6 小时
|
|
|
+ }
|
|
|
+ }catch(Exception ex){
|
|
|
+ ex.printStackTrace();
|
|
|
+ log.error(mStrClassName+";initHistoryDbError:"+ex.getLocalizedMessage());
|
|
|
+ }
|
|
|
}
|
|
|
}
|