|
- package io.github.pnoker.gateway.bizmgr;
- import com.alibaba.fastjson.JSONObject;
- import io.github.pnoker.gateway.SpringContextUtil;
- import io.github.pnoker.gateway.bizmgr.baseInit.KprBaseInitFun;
- import io.github.pnoker.gateway.comtool.TimeTool;
- import io.github.pnoker.gateway.comtool.jiangjinThread.KafkaConsumer;
- import io.github.pnoker.gateway.dbdao.DBMgrProxy;
- import io.github.pnoker.gateway.utils.InfulxDbUtil;
- import io.github.pnoker.gateway.utils.InfulxJiangjinDbUtil;
- import org.influxdb.dto.Point;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.util.CollectionUtils;
- import org.springframework.util.ObjectUtils;
- import java.math.BigDecimal;
- import java.text.ParseException;
- import java.text.SimpleDateFormat;
- import java.time.*;
- import java.time.format.DateTimeFormatter;
- import java.time.format.DateTimeFormatterBuilder;
- import java.time.temporal.ChronoField;
- import java.time.temporal.ChronoUnit;
- import java.util.*;
- import java.util.concurrent.TimeUnit;
- import java.util.stream.Collectors;
- import static io.github.pnoker.gateway.comtool.jiangjinThread.KafkaConsumer.convertCubicMetersToLiters;
- /**
- * @ClassName KprJiangjinWaterBizfun
- * @Description: TODO
- * @Author LX
- * @Date 2024/11/21
- * @Version V1.0
- **/
- public class KprJiangjinWaterBizfun {
- private static final Logger log = LoggerFactory.getLogger(KprJiangjinWaterBizfun.class);
- private final static String mStrClassName = "KprJiangjinWaterBizfun";
- private final static String EMPTY_NULL = "NULL";
- public static InfulxJiangjinDbUtil infulxJiangjinDbUtil = null;//infulx工具类对象
- public static Point createPointFromJson(String deviceType,String standardCode,Map<String,Object> mapEntity,String cmCode,List<String> params) {
- // 获取时间戳
- long nanoTimestamp = convertToNanoTimestamp(mapEntity.get("QCQUISITION_TIME").toString());
- if(nanoTimestamp==0L){
- return null;
- }
- String measurement = KprBaseInitFun.getInstance().jiangjinMeasurementMap.get(deviceType);
- // 创建 Point.Builder 对象
- Point.Builder pointBuilder = Point.measurement(measurement)
- .tag("dev_id",cmCode)
- .time(nanoTimestamp, TimeUnit.NANOSECONDS);
- // 遍历 JSON 对象的键值对,并添加到 Point.Builder 中
- processField(pointBuilder,standardCode, mapEntity.get("VAL"),params);
- if(!pointBuilder.hasFields()){
- return null;
- }
- // 构建 Point 对象
- try {
- return pointBuilder.build();
- }catch(Exception ex){
- ex.printStackTrace();
- }
- return null;
- }
- //指定列处理
- private static void processField(Point.Builder pointBuilder, String key, Object value,List<String> params) {
- if (value == null) {
- return; // 跳过空值
- }
- //String[]长度如果为0,则说明本系统没有对应字段,则用它本身,反之用第二个元素作为字段名
- List<String[]> newParams = params.stream()
- .map(s -> s.split("___"))
- .collect(Collectors.toList());
- for (String[] clies:newParams){
- boolean exists = Arrays.asList(clies).contains(key);
- if(exists){
- String newKey = clies.length>1?clies[1]:clies[0];
- if (value instanceof Integer) {
- pointBuilder.addField(newKey, (Integer) value);
- } else if (value instanceof Long) {
- pointBuilder.addField(newKey, (Long) value);
- } else if (value instanceof Boolean) {
- pointBuilder.addField(newKey, ((Boolean)value)?1:0);
- } else if (value instanceof String) {
- if (((String)value).equals("01")||((String)value).equals("00")){
- pointBuilder.addField(newKey, ((String) value).replace("0",""));
- }else {
- pointBuilder.addField(newKey, (String) value);
- }
- }else if (value instanceof BigDecimal) {
- pointBuilder.addField(newKey, ((BigDecimal) value).doubleValue());
- }else{
- pointBuilder.addField(newKey, (String.valueOf(value)) );
- }
- if(newKey.equals("flow_l_cur")||newKey.equals("flow_l_total_pos")){
- pointBuilder.addField(newKey, Double.valueOf(ObjectUtils.isEmpty(value)?"0.0":value.toString()));
- if(newKey.equals("flow_l_cur")){
- pointBuilder.addField(newKey, Double.valueOf(ObjectUtils.isEmpty(value)?"0.0"
- :convertCubicMetersToLiters(Double.valueOf(value.toString()))));
- pointBuilder.addField("flow_cur", Double.valueOf(ObjectUtils.isEmpty(value)?"0.0":value.toString()));
- }
- if(newKey.equals("flow_l_total_pos")){
- pointBuilder.addField(newKey, Double.valueOf(ObjectUtils.isEmpty(value)?"0.0"
- :convertCubicMetersToLiters(Double.valueOf(value.toString()))));
- pointBuilder.addField("flow_total_pos", Double.valueOf(ObjectUtils.isEmpty(value)?"0.0":value.toString()));
- }
- }
- break;
- }
- }
- }
- //获取CST 纳秒时间戳
- private static long convertToNanoTimestamp(String dateTimeString) {
- // 定义时间格式,支持可选的小数秒部分
- DateTimeFormatter formatter = new DateTimeFormatterBuilder()
- .appendPattern("yyyy-MM-dd HH:mm:ss")
- .optionalStart()
- .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
- .optionalEnd()
- .toFormatter();
- // 解析字符串为 LocalDateTime 对象
- LocalDateTime dateTime = LocalDateTime.parse(dateTimeString, formatter);
- // 四舍五入到最近的整分
- dateTime = dateTime.truncatedTo(ChronoUnit.MINUTES);
- if (dateTime.getSecond() >= 30) {
- dateTime = dateTime.plusMinutes(1);
- }
- // 确保分钟数可以被2整除
- int minute = dateTime.getMinute();
- if (minute % 2 != 0) {
- dateTime = dateTime.plusMinutes(1);
- }
- // 指定时区为CST(Asia/Shanghai)
- ZoneId shanghaiZoneId = ZoneId.of("Asia/Shanghai");
- // 将 LocalDateTime 对象转换为 ZonedDateTime 对象
- ZonedDateTime zonedDateTime = dateTime.atZone(shanghaiZoneId);
- // 将 ZonedDateTime 对象转换为 Instant 对象(UTC时间点)
- Instant instant = zonedDateTime.toInstant();
- // 计算从1970年1月1日00:00:00 UTC以来的纳秒数
- long nanosecondsSinceEpoch = ChronoUnit.NANOS.between(Instant.EPOCH, instant);
- // 输出调试信息
- // System.out.println("输入的时间(四舍五入到整分且分钟数为偶数): " + dateTime);
- // System.out.println("时区转换后的时间: " + zonedDateTime);
- // System.out.println("转换为UTC的时间: " + instant);
- // System.out.println("计算的纳秒数: " + nanosecondsSinceEpoch);
- return nanosecondsSinceEpoch;
- }
- //TODO 处理kafka所有已存在数据
- public static void InitHistory(){
- // KafkaConsumer kafkaConsumerDemo = new KafkaConsumer("10.127.16.85:9092", "jiangjin");
- // kafkaConsumerDemo.doConsume();
- }
- //TODO 计算采集频率
- // 在方法外部定义集合
- private static Map<String, List<Map<String, Object>>> deviceDataMap = new HashMap<>();
- private static Map<String, Integer> deviceFrequencyMap = new HashMap<>();
- public static void InitDeviceFrequency() {
- try {
- int count = DBMgrProxy.getInstance().applyJiangjinDbApi().getTabWaterRealCount("");
- if (count > 0) {
- List<Map<String, Object>> mapList = DBMgrProxy.getInstance().applyJiangjinDbApi()
- .getListWaterReal("");
- if (!mapList.isEmpty()) {
- for (Map<String, Object> map : mapList) {
- String deviceCode = map.get("DEVICE_CODE").toString();
- String acquisitionTime = map.get("QCQUISITION_TIME").toString(); // 采集时间为字符串
- String tagStandardCode = map.get("TAG_STANDARD_CODE").toString();
- // 使用设备编号和标准代码作为键
- String uniqueKey = deviceCode + "_" +acquisitionTime+ "_" + tagStandardCode;
- // 检查并更新设备数据集合
- deviceDataMap.computeIfAbsent(uniqueKey, k -> new ArrayList<>());
- List<Map<String, Object>> deviceDataList = deviceDataMap.get(uniqueKey);
- // 检查时间戳是否已存在,确保不重复添加相同时间点的数据
- boolean exists = deviceDataList.stream().anyMatch(data ->
- data.get("QCQUISITION_TIME").equals(acquisitionTime));
- if (!exists) {
- if (deviceDataList.size() >= 2) {
- deviceDataList.remove(0); // 保持最新的两条数据
- }
- deviceDataList.add(map);
- // 计算采集频率(假设时间字符串格式为"yyyy-MM-dd HH:mm:ss")
- if (deviceDataList.size() == 2) {
- String time1 = deviceDataList.get(0).get("QCQUISITION_TIME").toString();
- String time2 = deviceDataList.get(1).get("QCQUISITION_TIME").toString();
- // 转换时间字符串为时间戳
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- try {
- long timestamp1 = sdf.parse(time1).getTime();
- long timestamp2 = sdf.parse(time2).getTime();
- int frequency = (int) Math.abs(timestamp2 - timestamp1) / 60000; // 以分钟为单位
- deviceFrequencyMap.put(uniqueKey, frequency);
- } catch (ParseException e) {
- System.err.println("时间格式解析错误:" + e.getMessage());
- }
- }
- }
- }
- } else {
- System.err.println("实时数据采集为空");
- }
- }
- } catch (Exception ex) {
- ex.printStackTrace();
- System.err.println("InitRealDbError: " + ex.getLocalizedMessage());
- }
- }
- //TODO 数据库视图采集
- //TODO 按设备的最新时间作为视图查询条件
- public static void InitRealDb(){
- try {
- int count = DBMgrProxy.getInstance().applyJiangjinDbApi().getTabWaterRealCount("");
- if(count>0){
- List<Map<String,Object>> mapList = DBMgrProxy.getInstance().applyJiangjinDbApi()
- // .getListWaterReal(" DEVICE_CODE = '050101000021100065426' ");
- .getListWaterReal("");
- if(!CollectionUtils.isEmpty(mapList)){
- //TODO 首先进行特殊数据处理
- int i = 1;
- for (Map<String,Object> map:mapList){
- //TODO 不存在过滤的就执行
- if(!KprBaseInitFun.getInstance().closeTag.contains(map.get("TAG_CODE").toString())){
- //TODO 业务过程
- //TODO 根据设备号得到设定好的对应的设备类型名
- Optional<String> foundKey = KprBaseInitFun.getInstance().jiangjinTypeList.entrySet().stream()
- .filter(entry -> entry.getValue().contains(map.get("DEVICE_TYPE_CODE"))) // 过滤出包含特定值的 entry
- .map(Map.Entry::getKey) // 提取 key
- .findFirst(); // 找到第一个匹配的 key
- if(foundKey.isPresent()) {
- String deviceType = foundKey.get().split("_")[1];
- //TODO 数据特殊处理
- if(map.get("DEVICE_NAME").toString().contains("电量表")){
- deviceType = "VoltageSwitchgear";
- }else if(map.get("DEVICE_NAME").toString().contains("压力变送器")){
- deviceType = "WaterMeter";
- }else if(map.get("TAG_DESC").toString().contains("轴承温度")){
- deviceType = "ElectricMotor";
- }
- //第三方对应的字段集
- List<String> params = KprBaseInitFun.getInstance().jiangjinParams.get(deviceType);
- //deviceType为表名
- //time为当前数据time的纳秒时间戳,已经做过整点处理了
- Point pointNanos = createPointFromJson(deviceType,
- map.get("TAG_STANDARD_CODE").toString(),
- map
- , map.get("DEVICE_CODE").toString()
- , params);
- if (pointNanos != null) {
- KprJiangjinWaterBizfun.infulxJiangjinDbUtil.insert(pointNanos);
- }
- }
- i++;
- }
- }
- }else{
- log.error(mStrClassName+";实时数据采集为空:"+ TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)
- +"InitRealDbNullError");
- }
- }
- }catch(Exception ex){
- ex.printStackTrace();
- log.error(mStrClassName+";InitRealDbError:"+ex.getLocalizedMessage());
- }
- }
- //TODO 历史数据,根据调用传递的开始时间作为历史数据接入的起始时间
- public static void initHistoryDb(LocalDateTime startDateTime){
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- if(startDateTime==null){
- return;
- }
- try{
- //TODO
- LocalDateTime todayMidnight = LocalDateTime.now().toLocalDate().atTime(1, 0);
- // 循环从 startDateTime 开始,每 3 小时一次,直到今天 0 点
- LocalDateTime currentDateTime = startDateTime;//每次循环查询的起始时间
- while(currentDateTime.isBefore(todayMidnight)) {
- String startStr = currentDateTime.format(formatter);
- String endStr = currentDateTime.plusHours(3).format(formatter);
- log.info("历史数据执行:"+startStr+"~~~"+endStr);
- String extend = " QCQUISITION_TIME >= TO_DATE('" + startStr + "', 'YYYY-MM-DD HH24:MI:SS') " +
- " AND QCQUISITION_TIME <= TO_DATE('" + endStr + "', 'YYYY-MM-DD HH24:MI:SS')";
- // String extend = " QCQUISITION_TIME >= TO_DATE('" + startStr + "', 'YYYY-MM-DD HH24:MI:SS') " +
- // " AND QCQUISITION_TIME <= TO_DATE('" + endStr + "', 'YYYY-MM-DD HH24:MI:SS') and TAG_CODE = 'OPC.JJZLS.CENTER_MODBUS_SS_LJLL2'";
- int count = DBMgrProxy.getInstance().applyJiangjinDbApi().getTabWaterHistoryCount(extend);
- if(count>0){
- int pageNum = count % 5000 == 0 ? count / 5000 : (count / 5000) + 1;//总页数
- Integer limit = 5000;
- if (pageNum <= 1) {
- limit = count;//说明总数比第一页小
- }
- for (int i = 0; i < pageNum; i++) {
- Integer offset = i * limit;
- List<Map<String,Object>> mapList = DBMgrProxy.getInstance().applyJiangjinDbApi()
- .getPageListWaterHistory(offset+limit,offset,extend);
- if(!CollectionUtils.isEmpty(mapList)){
- for (Map<String,Object> map:mapList){
- //TODO 不存在过滤的就执行
- if(!KprBaseInitFun.getInstance().closeTag.contains(map.get("TAG_CODE").toString())){
- //TODO 业务过程
- //TODO 根据设备号得到设定好的对应的设备类型名
- Optional<String> foundKey = KprBaseInitFun.getInstance().jiangjinTypeList.entrySet().stream()
- .filter(entry -> entry.getValue().contains(map.get("DEVICE_TYPE_CODE"))) // 过滤出包含特定值的 entry
- .map(Map.Entry::getKey) // 提取 key
- .findFirst(); // 找到第一个匹配的 key
- if(foundKey.isPresent()) {
- String deviceType = foundKey.get().split("_")[1];
- //第三方对应的字段集
- List<String> params = KprBaseInitFun.getInstance().jiangjinParams.get(deviceType);
- //deviceType为表名
- //time为当前数据time的纳秒时间戳,已经做过整点处理了
- Point pointNanos = createPointFromJson(deviceType,
- map.get("TAG_STANDARD_CODE").toString(),
- map
- , map.get("DEVICE_CODE").toString()
- , params);
- if (pointNanos != null) {
- KprJiangjinWaterBizfun.infulxJiangjinDbUtil.insert(pointNanos);
- }
- }
- }
- }
- }else{
- log.error(mStrClassName+";历史数据采集为空:"+ TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)
- +"initHistoryDbNullError");
- }
- }
- log.info(startStr+"到"+endStr+"数据整理完成:"+count);
- }
- currentDateTime = currentDateTime.plusHours(3); // 增加 3 小时
- }
- }catch(Exception ex){
- ex.printStackTrace();
- log.error(mStrClassName+";initHistoryDbError:"+ex.getLocalizedMessage());
- }
- }
- }
|