package io.github.pnoker.gateway.bizmgr; import com.alibaba.fastjson.JSONObject; import io.github.pnoker.gateway.SpringContextUtil; import io.github.pnoker.gateway.bizmgr.baseInit.KprBaseInitFun; import io.github.pnoker.gateway.comtool.TimeTool; import io.github.pnoker.gateway.comtool.jiangjinThread.KafkaConsumer; import io.github.pnoker.gateway.dbdao.DBMgrProxy; import io.github.pnoker.gateway.utils.InfulxDbUtil; import io.github.pnoker.gateway.utils.InfulxJiangjinDbUtil; import org.influxdb.dto.Point; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; import java.math.BigDecimal; import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.*; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; import java.time.temporal.ChronoField; import java.time.temporal.ChronoUnit; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static io.github.pnoker.gateway.comtool.jiangjinThread.KafkaConsumer.convertCubicMetersToLiters; /** * @ClassName KprJiangjinWaterBizfun * @Description: TODO * @Author LX * @Date 2024/11/21 * @Version V1.0 **/ public class KprJiangjinWaterBizfun { private static final Logger log = LoggerFactory.getLogger(KprJiangjinWaterBizfun.class); private final static String mStrClassName = "KprJiangjinWaterBizfun"; private final static String EMPTY_NULL = "NULL"; public static InfulxJiangjinDbUtil infulxJiangjinDbUtil = null;//infulx工具类对象 public static Point createPointFromJson(String deviceType,String standardCode,Map mapEntity,String cmCode,List params) { // 获取时间戳 long nanoTimestamp = convertToNanoTimestamp(mapEntity.get("QCQUISITION_TIME").toString()); if(nanoTimestamp==0L){ return null; } String measurement = KprBaseInitFun.getInstance().jiangjinMeasurementMap.get(deviceType); // 创建 Point.Builder 对象 Point.Builder pointBuilder = Point.measurement(measurement) .tag("dev_id",cmCode) .time(nanoTimestamp, TimeUnit.NANOSECONDS); // 遍历 JSON 对象的键值对,并添加到 Point.Builder 中 processField(pointBuilder,standardCode, mapEntity.get("VAL"),params); if(!pointBuilder.hasFields()){ return null; } // 构建 Point 对象 try { return pointBuilder.build(); }catch(Exception ex){ ex.printStackTrace(); } return null; } //指定列处理 private static void processField(Point.Builder pointBuilder, String key, Object value,List params) { if (value == null) { return; // 跳过空值 } //String[]长度如果为0,则说明本系统没有对应字段,则用它本身,反之用第二个元素作为字段名 List newParams = params.stream() .map(s -> s.split("___")) .collect(Collectors.toList()); for (String[] clies:newParams){ boolean exists = Arrays.asList(clies).contains(key); if(exists){ String newKey = clies.length>1?clies[1]:clies[0]; if (value instanceof Integer) { pointBuilder.addField(newKey, (Integer) value); } else if (value instanceof Long) { pointBuilder.addField(newKey, (Long) value); } else if (value instanceof Boolean) { pointBuilder.addField(newKey, ((Boolean)value)?1:0); } else if (value instanceof String) { if (((String)value).equals("01")||((String)value).equals("00")){ pointBuilder.addField(newKey, ((String) value).replace("0","")); }else { pointBuilder.addField(newKey, (String) value); } }else if (value instanceof BigDecimal) { pointBuilder.addField(newKey, ((BigDecimal) value).doubleValue()); }else{ pointBuilder.addField(newKey, (String.valueOf(value)) ); } if(newKey.equals("flow_l_cur")||newKey.equals("flow_l_total_pos")){ pointBuilder.addField(newKey, Double.valueOf(ObjectUtils.isEmpty(value)?"0.0":value.toString())); if(newKey.equals("flow_l_cur")){ pointBuilder.addField(newKey, Double.valueOf(ObjectUtils.isEmpty(value)?"0.0" :convertCubicMetersToLiters(Double.valueOf(value.toString())))); pointBuilder.addField("flow_cur", Double.valueOf(ObjectUtils.isEmpty(value)?"0.0":value.toString())); } if(newKey.equals("flow_l_total_pos")){ pointBuilder.addField(newKey, Double.valueOf(ObjectUtils.isEmpty(value)?"0.0" :convertCubicMetersToLiters(Double.valueOf(value.toString())))); pointBuilder.addField("flow_total_pos", Double.valueOf(ObjectUtils.isEmpty(value)?"0.0":value.toString())); } } break; } } } //获取CST 纳秒时间戳 private static long convertToNanoTimestamp(String dateTimeString) { // 定义时间格式,支持可选的小数秒部分 DateTimeFormatter formatter = new DateTimeFormatterBuilder() .appendPattern("yyyy-MM-dd HH:mm:ss") .optionalStart() .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true) .optionalEnd() .toFormatter(); // 解析字符串为 LocalDateTime 对象 LocalDateTime dateTime = LocalDateTime.parse(dateTimeString, formatter); // 四舍五入到最近的整分 dateTime = dateTime.truncatedTo(ChronoUnit.MINUTES); if (dateTime.getSecond() >= 30) { dateTime = dateTime.plusMinutes(1); } // 确保分钟数可以被2整除 int minute = dateTime.getMinute(); if (minute % 2 != 0) { dateTime = dateTime.plusMinutes(1); } // 指定时区为CST(Asia/Shanghai) ZoneId shanghaiZoneId = ZoneId.of("Asia/Shanghai"); // 将 LocalDateTime 对象转换为 ZonedDateTime 对象 ZonedDateTime zonedDateTime = dateTime.atZone(shanghaiZoneId); // 将 ZonedDateTime 对象转换为 Instant 对象(UTC时间点) Instant instant = zonedDateTime.toInstant(); // 计算从1970年1月1日00:00:00 UTC以来的纳秒数 long nanosecondsSinceEpoch = ChronoUnit.NANOS.between(Instant.EPOCH, instant); // 输出调试信息 // System.out.println("输入的时间(四舍五入到整分且分钟数为偶数): " + dateTime); // System.out.println("时区转换后的时间: " + zonedDateTime); // System.out.println("转换为UTC的时间: " + instant); // System.out.println("计算的纳秒数: " + nanosecondsSinceEpoch); return nanosecondsSinceEpoch; } //TODO 处理kafka所有已存在数据 public static void InitHistory(){ // KafkaConsumer kafkaConsumerDemo = new KafkaConsumer("10.127.16.85:9092", "jiangjin"); // kafkaConsumerDemo.doConsume(); } //TODO 计算采集频率 // 在方法外部定义集合 private static Map>> deviceDataMap = new HashMap<>(); private static Map deviceFrequencyMap = new HashMap<>(); public static void InitDeviceFrequency() { try { int count = DBMgrProxy.getInstance().applyJiangjinDbApi().getTabWaterRealCount(""); if (count > 0) { List> mapList = DBMgrProxy.getInstance().applyJiangjinDbApi() .getListWaterReal(""); if (!mapList.isEmpty()) { for (Map map : mapList) { String deviceCode = map.get("DEVICE_CODE").toString(); String acquisitionTime = map.get("QCQUISITION_TIME").toString(); // 采集时间为字符串 String tagStandardCode = map.get("TAG_STANDARD_CODE").toString(); // 使用设备编号和标准代码作为键 String uniqueKey = deviceCode + "_" +acquisitionTime+ "_" + tagStandardCode; // 检查并更新设备数据集合 deviceDataMap.computeIfAbsent(uniqueKey, k -> new ArrayList<>()); List> deviceDataList = deviceDataMap.get(uniqueKey); // 检查时间戳是否已存在,确保不重复添加相同时间点的数据 boolean exists = deviceDataList.stream().anyMatch(data -> data.get("QCQUISITION_TIME").equals(acquisitionTime)); if (!exists) { if (deviceDataList.size() >= 2) { deviceDataList.remove(0); // 保持最新的两条数据 } deviceDataList.add(map); // 计算采集频率(假设时间字符串格式为"yyyy-MM-dd HH:mm:ss") if (deviceDataList.size() == 2) { String time1 = deviceDataList.get(0).get("QCQUISITION_TIME").toString(); String time2 = deviceDataList.get(1).get("QCQUISITION_TIME").toString(); // 转换时间字符串为时间戳 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); try { long timestamp1 = sdf.parse(time1).getTime(); long timestamp2 = sdf.parse(time2).getTime(); int frequency = (int) Math.abs(timestamp2 - timestamp1) / 60000; // 以分钟为单位 deviceFrequencyMap.put(uniqueKey, frequency); } catch (ParseException e) { System.err.println("时间格式解析错误:" + e.getMessage()); } } } } } else { System.err.println("实时数据采集为空"); } } } catch (Exception ex) { ex.printStackTrace(); System.err.println("InitRealDbError: " + ex.getLocalizedMessage()); } } //TODO 数据库视图采集 //TODO 按设备的最新时间作为视图查询条件 public static void InitRealDb(){ try { int count = DBMgrProxy.getInstance().applyJiangjinDbApi().getTabWaterRealCount(""); if(count>0){ List> mapList = DBMgrProxy.getInstance().applyJiangjinDbApi() // .getListWaterReal(" DEVICE_CODE = '050101000021100065426' "); .getListWaterReal(""); if(!CollectionUtils.isEmpty(mapList)){ //TODO 首先进行特殊数据处理 int i = 1; for (Map map:mapList){ //TODO 不存在过滤的就执行 if(!KprBaseInitFun.getInstance().closeTag.contains(map.get("TAG_CODE").toString())){ //TODO 业务过程 //TODO 根据设备号得到设定好的对应的设备类型名 Optional foundKey = KprBaseInitFun.getInstance().jiangjinTypeList.entrySet().stream() .filter(entry -> entry.getValue().contains(map.get("DEVICE_TYPE_CODE"))) // 过滤出包含特定值的 entry .map(Map.Entry::getKey) // 提取 key .findFirst(); // 找到第一个匹配的 key if(foundKey.isPresent()) { String deviceType = foundKey.get().split("_")[1]; //TODO 数据特殊处理 if(map.get("DEVICE_NAME").toString().contains("电量表")){ deviceType = "VoltageSwitchgear"; }else if(map.get("DEVICE_NAME").toString().contains("压力变送器")){ deviceType = "WaterMeter"; }else if(map.get("TAG_DESC").toString().contains("轴承温度")){ deviceType = "ElectricMotor"; } //第三方对应的字段集 List params = KprBaseInitFun.getInstance().jiangjinParams.get(deviceType); //deviceType为表名 //time为当前数据time的纳秒时间戳,已经做过整点处理了 Point pointNanos = createPointFromJson(deviceType, map.get("TAG_STANDARD_CODE").toString(), map , map.get("DEVICE_CODE").toString() , params); if (pointNanos != null) { KprJiangjinWaterBizfun.infulxJiangjinDbUtil.insert(pointNanos); } } i++; } } }else{ log.error(mStrClassName+";实时数据采集为空:"+ TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT) +"InitRealDbNullError"); } } }catch(Exception ex){ ex.printStackTrace(); log.error(mStrClassName+";InitRealDbError:"+ex.getLocalizedMessage()); } } //TODO 历史数据,根据调用传递的开始时间作为历史数据接入的起始时间 public static void initHistoryDb(LocalDateTime startDateTime){ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); if(startDateTime==null){ return; } try{ //TODO LocalDateTime todayMidnight = LocalDateTime.now().toLocalDate().atTime(1, 0); // 循环从 startDateTime 开始,每 3 小时一次,直到今天 0 点 LocalDateTime currentDateTime = startDateTime;//每次循环查询的起始时间 while(currentDateTime.isBefore(todayMidnight)) { String startStr = currentDateTime.format(formatter); String endStr = currentDateTime.plusHours(3).format(formatter); log.info("历史数据执行:"+startStr+"~~~"+endStr); String extend = " QCQUISITION_TIME >= TO_DATE('" + startStr + "', 'YYYY-MM-DD HH24:MI:SS') " + " AND QCQUISITION_TIME <= TO_DATE('" + endStr + "', 'YYYY-MM-DD HH24:MI:SS')"; // String extend = " QCQUISITION_TIME >= TO_DATE('" + startStr + "', 'YYYY-MM-DD HH24:MI:SS') " + // " AND QCQUISITION_TIME <= TO_DATE('" + endStr + "', 'YYYY-MM-DD HH24:MI:SS') and TAG_CODE = 'OPC.JJZLS.CENTER_MODBUS_SS_LJLL2'"; int count = DBMgrProxy.getInstance().applyJiangjinDbApi().getTabWaterHistoryCount(extend); if(count>0){ int pageNum = count % 5000 == 0 ? count / 5000 : (count / 5000) + 1;//总页数 Integer limit = 5000; if (pageNum <= 1) { limit = count;//说明总数比第一页小 } for (int i = 0; i < pageNum; i++) { Integer offset = i * limit; List> mapList = DBMgrProxy.getInstance().applyJiangjinDbApi() .getPageListWaterHistory(offset+limit,offset,extend); if(!CollectionUtils.isEmpty(mapList)){ for (Map map:mapList){ //TODO 不存在过滤的就执行 if(!KprBaseInitFun.getInstance().closeTag.contains(map.get("TAG_CODE").toString())){ //TODO 业务过程 //TODO 根据设备号得到设定好的对应的设备类型名 Optional foundKey = KprBaseInitFun.getInstance().jiangjinTypeList.entrySet().stream() .filter(entry -> entry.getValue().contains(map.get("DEVICE_TYPE_CODE"))) // 过滤出包含特定值的 entry .map(Map.Entry::getKey) // 提取 key .findFirst(); // 找到第一个匹配的 key if(foundKey.isPresent()) { String deviceType = foundKey.get().split("_")[1]; //第三方对应的字段集 List params = KprBaseInitFun.getInstance().jiangjinParams.get(deviceType); //deviceType为表名 //time为当前数据time的纳秒时间戳,已经做过整点处理了 Point pointNanos = createPointFromJson(deviceType, map.get("TAG_STANDARD_CODE").toString(), map , map.get("DEVICE_CODE").toString() , params); if (pointNanos != null) { KprJiangjinWaterBizfun.infulxJiangjinDbUtil.insert(pointNanos); } } } } }else{ log.error(mStrClassName+";历史数据采集为空:"+ TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT) +"initHistoryDbNullError"); } } log.info(startStr+"到"+endStr+"数据整理完成:"+count); } currentDateTime = currentDateTime.plusHours(3); // 增加 3 小时 } }catch(Exception ex){ ex.printStackTrace(); log.error(mStrClassName+";initHistoryDbError:"+ex.getLocalizedMessage()); } } }