KprJiangjinWaterBizfun.java 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. package io.github.pnoker.gateway.bizmgr;
  2. import com.alibaba.fastjson.JSONObject;
  3. import io.github.pnoker.gateway.SpringContextUtil;
  4. import io.github.pnoker.gateway.bizmgr.baseInit.KprBaseInitFun;
  5. import io.github.pnoker.gateway.comtool.TimeTool;
  6. import io.github.pnoker.gateway.comtool.jiangjinThread.KafkaConsumer;
  7. import io.github.pnoker.gateway.dbdao.DBMgrProxy;
  8. import io.github.pnoker.gateway.utils.InfulxDbUtil;
  9. import io.github.pnoker.gateway.utils.InfulxJiangjinDbUtil;
  10. import org.influxdb.dto.Point;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. import org.springframework.util.CollectionUtils;
  14. import org.springframework.util.ObjectUtils;
  15. import java.math.BigDecimal;
  16. import java.text.ParseException;
  17. import java.text.SimpleDateFormat;
  18. import java.time.*;
  19. import java.time.format.DateTimeFormatter;
  20. import java.time.format.DateTimeFormatterBuilder;
  21. import java.time.temporal.ChronoField;
  22. import java.time.temporal.ChronoUnit;
  23. import java.util.*;
  24. import java.util.concurrent.TimeUnit;
  25. import java.util.stream.Collectors;
  26. import static io.github.pnoker.gateway.comtool.jiangjinThread.KafkaConsumer.convertCubicMetersToLiters;
  27. /**
  28. * @ClassName KprJiangjinWaterBizfun
  29. * @Description: TODO
  30. * @Author LX
  31. * @Date 2024/11/21
  32. * @Version V1.0
  33. **/
  34. public class KprJiangjinWaterBizfun {
  35. private static final Logger log = LoggerFactory.getLogger(KprJiangjinWaterBizfun.class);
  36. private final static String mStrClassName = "KprJiangjinWaterBizfun";
  37. private final static String EMPTY_NULL = "NULL";
  38. public static InfulxJiangjinDbUtil infulxJiangjinDbUtil = null;//infulx工具类对象
  39. public static Point createPointFromJson(String deviceType,String standardCode,Map<String,Object> mapEntity,String cmCode,List<String> params) {
  40. // 获取时间戳
  41. long nanoTimestamp = convertToNanoTimestamp(mapEntity.get("QCQUISITION_TIME").toString());
  42. if(nanoTimestamp==0L){
  43. return null;
  44. }
  45. String measurement = KprBaseInitFun.getInstance().jiangjinMeasurementMap.get(deviceType);
  46. // 创建 Point.Builder 对象
  47. Point.Builder pointBuilder = Point.measurement(measurement)
  48. .tag("dev_id",cmCode)
  49. .time(nanoTimestamp, TimeUnit.NANOSECONDS);
  50. // 遍历 JSON 对象的键值对,并添加到 Point.Builder 中
  51. processField(pointBuilder,standardCode, mapEntity.get("VAL"),params);
  52. if(!pointBuilder.hasFields()){
  53. return null;
  54. }
  55. // 构建 Point 对象
  56. try {
  57. return pointBuilder.build();
  58. }catch(Exception ex){
  59. ex.printStackTrace();
  60. }
  61. return null;
  62. }
  63. //指定列处理
  64. private static void processField(Point.Builder pointBuilder, String key, Object value,List<String> params) {
  65. if (value == null) {
  66. return; // 跳过空值
  67. }
  68. //String[]长度如果为0,则说明本系统没有对应字段,则用它本身,反之用第二个元素作为字段名
  69. List<String[]> newParams = params.stream()
  70. .map(s -> s.split("___"))
  71. .collect(Collectors.toList());
  72. for (String[] clies:newParams){
  73. boolean exists = Arrays.asList(clies).contains(key);
  74. if(exists){
  75. String newKey = clies.length>1?clies[1]:clies[0];
  76. if (value instanceof Integer) {
  77. pointBuilder.addField(newKey, (Integer) value);
  78. } else if (value instanceof Long) {
  79. pointBuilder.addField(newKey, (Long) value);
  80. } else if (value instanceof Boolean) {
  81. pointBuilder.addField(newKey, ((Boolean)value)?1:0);
  82. } else if (value instanceof String) {
  83. if (((String)value).equals("01")||((String)value).equals("00")){
  84. pointBuilder.addField(newKey, ((String) value).replace("0",""));
  85. }else {
  86. pointBuilder.addField(newKey, (String) value);
  87. }
  88. }else if (value instanceof BigDecimal) {
  89. pointBuilder.addField(newKey, ((BigDecimal) value).doubleValue());
  90. }else{
  91. pointBuilder.addField(newKey, (String.valueOf(value)) );
  92. }
  93. if(newKey.equals("flow_l_cur")||newKey.equals("flow_l_total_pos")){
  94. pointBuilder.addField(newKey, Double.valueOf(ObjectUtils.isEmpty(value)?"0.0":value.toString()));
  95. if(newKey.equals("flow_l_cur")){
  96. pointBuilder.addField(newKey, Double.valueOf(ObjectUtils.isEmpty(value)?"0.0"
  97. :convertCubicMetersToLiters(Double.valueOf(value.toString()))));
  98. pointBuilder.addField("flow_cur", Double.valueOf(ObjectUtils.isEmpty(value)?"0.0":value.toString()));
  99. }
  100. if(newKey.equals("flow_l_total_pos")){
  101. pointBuilder.addField(newKey, Double.valueOf(ObjectUtils.isEmpty(value)?"0.0"
  102. :convertCubicMetersToLiters(Double.valueOf(value.toString()))));
  103. pointBuilder.addField("flow_total_pos", Double.valueOf(ObjectUtils.isEmpty(value)?"0.0":value.toString()));
  104. }
  105. }
  106. break;
  107. }
  108. }
  109. }
  110. //获取CST 纳秒时间戳
  111. private static long convertToNanoTimestamp(String dateTimeString) {
  112. // 定义时间格式,支持可选的小数秒部分
  113. DateTimeFormatter formatter = new DateTimeFormatterBuilder()
  114. .appendPattern("yyyy-MM-dd HH:mm:ss")
  115. .optionalStart()
  116. .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
  117. .optionalEnd()
  118. .toFormatter();
  119. // 解析字符串为 LocalDateTime 对象
  120. LocalDateTime dateTime = LocalDateTime.parse(dateTimeString, formatter);
  121. // 四舍五入到最近的整分
  122. dateTime = dateTime.truncatedTo(ChronoUnit.MINUTES);
  123. if (dateTime.getSecond() >= 30) {
  124. dateTime = dateTime.plusMinutes(1);
  125. }
  126. // 确保分钟数可以被2整除
  127. int minute = dateTime.getMinute();
  128. if (minute % 2 != 0) {
  129. dateTime = dateTime.plusMinutes(1);
  130. }
  131. // 指定时区为CST(Asia/Shanghai)
  132. ZoneId shanghaiZoneId = ZoneId.of("Asia/Shanghai");
  133. // 将 LocalDateTime 对象转换为 ZonedDateTime 对象
  134. ZonedDateTime zonedDateTime = dateTime.atZone(shanghaiZoneId);
  135. // 将 ZonedDateTime 对象转换为 Instant 对象(UTC时间点)
  136. Instant instant = zonedDateTime.toInstant();
  137. // 计算从1970年1月1日00:00:00 UTC以来的纳秒数
  138. long nanosecondsSinceEpoch = ChronoUnit.NANOS.between(Instant.EPOCH, instant);
  139. // 输出调试信息
  140. // System.out.println("输入的时间(四舍五入到整分且分钟数为偶数): " + dateTime);
  141. // System.out.println("时区转换后的时间: " + zonedDateTime);
  142. // System.out.println("转换为UTC的时间: " + instant);
  143. // System.out.println("计算的纳秒数: " + nanosecondsSinceEpoch);
  144. return nanosecondsSinceEpoch;
  145. }
  146. //TODO 处理kafka所有已存在数据
  147. public static void InitHistory(){
  148. // KafkaConsumer kafkaConsumerDemo = new KafkaConsumer("10.127.16.85:9092", "jiangjin");
  149. // kafkaConsumerDemo.doConsume();
  150. }
  151. //TODO 计算采集频率
  152. // 在方法外部定义集合
  153. private static Map<String, List<Map<String, Object>>> deviceDataMap = new HashMap<>();
  154. private static Map<String, Integer> deviceFrequencyMap = new HashMap<>();
  155. public static void InitDeviceFrequency() {
  156. try {
  157. int count = DBMgrProxy.getInstance().applyJiangjinDbApi().getTabWaterRealCount("");
  158. if (count > 0) {
  159. List<Map<String, Object>> mapList = DBMgrProxy.getInstance().applyJiangjinDbApi()
  160. .getListWaterReal("");
  161. if (!mapList.isEmpty()) {
  162. for (Map<String, Object> map : mapList) {
  163. String deviceCode = map.get("DEVICE_CODE").toString();
  164. String acquisitionTime = map.get("QCQUISITION_TIME").toString(); // 采集时间为字符串
  165. String tagStandardCode = map.get("TAG_STANDARD_CODE").toString();
  166. // 使用设备编号和标准代码作为键
  167. String uniqueKey = deviceCode + "_" +acquisitionTime+ "_" + tagStandardCode;
  168. // 检查并更新设备数据集合
  169. deviceDataMap.computeIfAbsent(uniqueKey, k -> new ArrayList<>());
  170. List<Map<String, Object>> deviceDataList = deviceDataMap.get(uniqueKey);
  171. // 检查时间戳是否已存在,确保不重复添加相同时间点的数据
  172. boolean exists = deviceDataList.stream().anyMatch(data ->
  173. data.get("QCQUISITION_TIME").equals(acquisitionTime));
  174. if (!exists) {
  175. if (deviceDataList.size() >= 2) {
  176. deviceDataList.remove(0); // 保持最新的两条数据
  177. }
  178. deviceDataList.add(map);
  179. // 计算采集频率(假设时间字符串格式为"yyyy-MM-dd HH:mm:ss")
  180. if (deviceDataList.size() == 2) {
  181. String time1 = deviceDataList.get(0).get("QCQUISITION_TIME").toString();
  182. String time2 = deviceDataList.get(1).get("QCQUISITION_TIME").toString();
  183. // 转换时间字符串为时间戳
  184. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  185. try {
  186. long timestamp1 = sdf.parse(time1).getTime();
  187. long timestamp2 = sdf.parse(time2).getTime();
  188. int frequency = (int) Math.abs(timestamp2 - timestamp1) / 60000; // 以分钟为单位
  189. deviceFrequencyMap.put(uniqueKey, frequency);
  190. } catch (ParseException e) {
  191. System.err.println("时间格式解析错误:" + e.getMessage());
  192. }
  193. }
  194. }
  195. }
  196. } else {
  197. System.err.println("实时数据采集为空");
  198. }
  199. }
  200. } catch (Exception ex) {
  201. ex.printStackTrace();
  202. System.err.println("InitRealDbError: " + ex.getLocalizedMessage());
  203. }
  204. }
  205. //TODO 数据库视图采集
  206. //TODO 按设备的最新时间作为视图查询条件
  207. public static void InitRealDb(){
  208. try {
  209. int count = DBMgrProxy.getInstance().applyJiangjinDbApi().getTabWaterRealCount("");
  210. if(count>0){
  211. List<Map<String,Object>> mapList = DBMgrProxy.getInstance().applyJiangjinDbApi()
  212. // .getListWaterReal(" DEVICE_CODE = '050101000021100065426' ");
  213. .getListWaterReal("");
  214. if(!CollectionUtils.isEmpty(mapList)){
  215. //TODO 首先进行特殊数据处理
  216. int i = 1;
  217. for (Map<String,Object> map:mapList){
  218. //TODO 不存在过滤的就执行
  219. if(!KprBaseInitFun.getInstance().closeTag.contains(map.get("TAG_CODE").toString())){
  220. //TODO 业务过程
  221. //TODO 根据设备号得到设定好的对应的设备类型名
  222. Optional<String> foundKey = KprBaseInitFun.getInstance().jiangjinTypeList.entrySet().stream()
  223. .filter(entry -> entry.getValue().contains(map.get("DEVICE_TYPE_CODE"))) // 过滤出包含特定值的 entry
  224. .map(Map.Entry::getKey) // 提取 key
  225. .findFirst(); // 找到第一个匹配的 key
  226. if(foundKey.isPresent()) {
  227. String deviceType = foundKey.get().split("_")[1];
  228. //TODO 数据特殊处理
  229. if(map.get("DEVICE_NAME").toString().contains("电量表")){
  230. deviceType = "VoltageSwitchgear";
  231. }else if(map.get("DEVICE_NAME").toString().contains("压力变送器")){
  232. deviceType = "WaterMeter";
  233. }else if(map.get("TAG_DESC").toString().contains("轴承温度")){
  234. deviceType = "ElectricMotor";
  235. }
  236. //第三方对应的字段集
  237. List<String> params = KprBaseInitFun.getInstance().jiangjinParams.get(deviceType);
  238. //deviceType为表名
  239. //time为当前数据time的纳秒时间戳,已经做过整点处理了
  240. Point pointNanos = createPointFromJson(deviceType,
  241. map.get("TAG_STANDARD_CODE").toString(),
  242. map
  243. , map.get("DEVICE_CODE").toString()
  244. , params);
  245. if (pointNanos != null) {
  246. KprJiangjinWaterBizfun.infulxJiangjinDbUtil.insert(pointNanos);
  247. }
  248. }
  249. i++;
  250. }
  251. }
  252. }else{
  253. log.error(mStrClassName+";实时数据采集为空:"+ TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)
  254. +"InitRealDbNullError");
  255. }
  256. }
  257. }catch(Exception ex){
  258. ex.printStackTrace();
  259. log.error(mStrClassName+";InitRealDbError:"+ex.getLocalizedMessage());
  260. }
  261. }
  262. //TODO 历史数据,根据调用传递的开始时间作为历史数据接入的起始时间
  263. public static void initHistoryDb(LocalDateTime startDateTime){
  264. DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  265. if(startDateTime==null){
  266. return;
  267. }
  268. try{
  269. //TODO
  270. LocalDateTime todayMidnight = LocalDateTime.now().toLocalDate().atTime(1, 0);
  271. // 循环从 startDateTime 开始,每 3 小时一次,直到今天 0 点
  272. LocalDateTime currentDateTime = startDateTime;//每次循环查询的起始时间
  273. while(currentDateTime.isBefore(todayMidnight)) {
  274. String startStr = currentDateTime.format(formatter);
  275. String endStr = currentDateTime.plusHours(3).format(formatter);
  276. log.info("历史数据执行:"+startStr+"~~~"+endStr);
  277. String extend = " QCQUISITION_TIME >= TO_DATE('" + startStr + "', 'YYYY-MM-DD HH24:MI:SS') " +
  278. " AND QCQUISITION_TIME <= TO_DATE('" + endStr + "', 'YYYY-MM-DD HH24:MI:SS')";
  279. // String extend = " QCQUISITION_TIME >= TO_DATE('" + startStr + "', 'YYYY-MM-DD HH24:MI:SS') " +
  280. // " AND QCQUISITION_TIME <= TO_DATE('" + endStr + "', 'YYYY-MM-DD HH24:MI:SS') and TAG_CODE = 'OPC.JJZLS.CENTER_MODBUS_SS_LJLL2'";
  281. int count = DBMgrProxy.getInstance().applyJiangjinDbApi().getTabWaterHistoryCount(extend);
  282. if(count>0){
  283. int pageNum = count % 5000 == 0 ? count / 5000 : (count / 5000) + 1;//总页数
  284. Integer limit = 5000;
  285. if (pageNum <= 1) {
  286. limit = count;//说明总数比第一页小
  287. }
  288. for (int i = 0; i < pageNum; i++) {
  289. Integer offset = i * limit;
  290. List<Map<String,Object>> mapList = DBMgrProxy.getInstance().applyJiangjinDbApi()
  291. .getPageListWaterHistory(offset+limit,offset,extend);
  292. if(!CollectionUtils.isEmpty(mapList)){
  293. for (Map<String,Object> map:mapList){
  294. //TODO 不存在过滤的就执行
  295. if(!KprBaseInitFun.getInstance().closeTag.contains(map.get("TAG_CODE").toString())){
  296. //TODO 业务过程
  297. //TODO 根据设备号得到设定好的对应的设备类型名
  298. Optional<String> foundKey = KprBaseInitFun.getInstance().jiangjinTypeList.entrySet().stream()
  299. .filter(entry -> entry.getValue().contains(map.get("DEVICE_TYPE_CODE"))) // 过滤出包含特定值的 entry
  300. .map(Map.Entry::getKey) // 提取 key
  301. .findFirst(); // 找到第一个匹配的 key
  302. if(foundKey.isPresent()) {
  303. String deviceType = foundKey.get().split("_")[1];
  304. //第三方对应的字段集
  305. List<String> params = KprBaseInitFun.getInstance().jiangjinParams.get(deviceType);
  306. //deviceType为表名
  307. //time为当前数据time的纳秒时间戳,已经做过整点处理了
  308. Point pointNanos = createPointFromJson(deviceType,
  309. map.get("TAG_STANDARD_CODE").toString(),
  310. map
  311. , map.get("DEVICE_CODE").toString()
  312. , params);
  313. if (pointNanos != null) {
  314. KprJiangjinWaterBizfun.infulxJiangjinDbUtil.insert(pointNanos);
  315. }
  316. }
  317. }
  318. }
  319. }else{
  320. log.error(mStrClassName+";历史数据采集为空:"+ TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT)
  321. +"initHistoryDbNullError");
  322. }
  323. }
  324. log.info(startStr+"到"+endStr+"数据整理完成:"+count);
  325. }
  326. currentDateTime = currentDateTime.plusHours(3); // 增加 3 小时
  327. }
  328. }catch(Exception ex){
  329. ex.printStackTrace();
  330. log.error(mStrClassName+";initHistoryDbError:"+ex.getLocalizedMessage());
  331. }
  332. }
  333. }