ThreadTask.java 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. package io.github.pnoker.gateway.comtool;
  2. import com.alibaba.fastjson.JSONObject;
  3. import io.github.pnoker.gateway.bizmgr.KprDangyangWaterBizFun;
  4. import io.github.pnoker.gateway.bizmgr.baseInit.KprBaseInitFun;
  5. import io.github.pnoker.gateway.dbdao.DBMgrProxy;
  6. import io.github.pnoker.gateway.dbdao.services.intef.DeviceKindService;
  7. import io.github.pnoker.gateway.dbdao.services.intef.TypeDefineService;
  8. import io.github.pnoker.gateway.utils.HttpUtil;
  9. import org.influxdb.dto.Point;
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. import org.springframework.util.ObjectUtils;
  13. import java.math.BigDecimal;
  14. import java.text.ParseException;
  15. import java.text.SimpleDateFormat;
  16. import java.time.*;
  17. import java.time.format.DateTimeFormatter;
  18. import java.time.temporal.ChronoUnit;
  19. import java.util.*;
  20. import java.util.concurrent.TimeUnit;
  21. import java.util.stream.Collectors;
  22. /**
  23. * @ClassName ThreadTask
  24. * @Description: TODO 当阳任务线程
  25. * @Author LX
  26. * @Date 2024/9/3
  27. * @Version V1.0
  28. **/
  29. public class ThreadTask implements Runnable{
  30. private static final Logger log = LoggerFactory.getLogger(ThreadTask.class);
  31. private final String taskId;
  32. private final Integer collectionFrequency;//采集频率
  33. private String deviceType;
  34. private List<Map<String,Object>> deviceList = new ArrayList<>();
  35. public ThreadTask(String taskId,Integer collectionFrequency,String deviceType,List<Map<String,Object>> deviceList) {
  36. this.taskId = taskId;
  37. this.collectionFrequency = collectionFrequency;
  38. this.deviceType = deviceType;
  39. this.deviceList = deviceList;
  40. }
  41. @Override
  42. public void run() {
  43. try {
  44. List<JSONObject> deviceRealtimeDataList = new ArrayList<>();
  45. for (Map<String, Object> map : deviceList) {
  46. JSONObject devicerealtimeDataTotal = null;
  47. try {
  48. Map<String, String> paramRealtime = new HashMap<>();
  49. paramRealtime.put("deviceType", deviceType);
  50. paramRealtime.put("deviceCode", String.valueOf(map.get("deviceCode")));
  51. Map<String, String> headers = new HashMap<>();
  52. headers.put("Authorization", "Bearer " + KprDangyangWaterBizFun.dangyangToken);
  53. devicerealtimeDataTotal = JSONObject.parseObject(HttpUtil.sendGet(
  54. KprDangyangWaterBizFun.realtimeDataListUrl, paramRealtime, headers));
  55. } catch (Exception ex) {
  56. log.error("任务线程" + taskId + " 调用异常:" + ex.getLocalizedMessage());
  57. }
  58. try {
  59. if (devicerealtimeDataTotal != null && devicerealtimeDataTotal.getInteger("code") == 0 &&
  60. devicerealtimeDataTotal.getJSONObject("data") != null
  61. && devicerealtimeDataTotal.getJSONObject("data").getJSONArray("list") != null
  62. && !devicerealtimeDataTotal.getJSONObject("data").getJSONArray("list").isEmpty()) {
  63. //TODO 查询数据不为空,插入infulxdb
  64. JSONObject deviceRealtimeData = devicerealtimeDataTotal.getJSONObject("data")
  65. .getJSONArray("list").getJSONObject(0);
  66. adjustTime(deviceRealtimeData, collectionFrequency);//整点rand处理
  67. deviceRealtimeDataList.add(deviceRealtimeData);
  68. } else {
  69. log.error("任务线程" + taskId + " 执行设备"
  70. + String.valueOf(map.get("deviceCode")) +
  71. "失败:" + (devicerealtimeDataTotal == null ?
  72. null : devicerealtimeDataTotal.toJSONString()));
  73. }
  74. } catch (Exception ex) {
  75. log.error("任务线程" + taskId + " " + String.valueOf(map.get("deviceCode")) + "查询异常:" + ex.getLocalizedMessage() + ";resJson:" + devicerealtimeDataTotal.toJSONString());
  76. }
  77. }
  78. //TODO 开始插入数据库
  79. //第三方对应的字段集
  80. List<String> params = KprBaseInitFun.getInstance().dangyangParams.get(deviceType);
  81. for(JSONObject jsonObject : deviceRealtimeDataList){
  82. //deviceType为表名
  83. //time为当前数据time的纳秒时间戳,已经做过整点处理了
  84. Point pointNanos = createPointFromJson(deviceType,jsonObject,params);
  85. KprDangyangWaterBizFun.infulxDbUtil.insert(pointNanos);
  86. }
  87. }catch(Exception ex){
  88. ex.printStackTrace();
  89. }
  90. }
  91. public static Point createPointFromJson(String deviceType, JSONObject jsonObject,List<String> params) {
  92. // 获取时间戳
  93. long nanoTimestamp = convertToNanoTimestamp(jsonObject.getString("time"));
  94. // 创建 Point.Builder 对象
  95. Point.Builder pointBuilder = Point.measurement(KprBaseInitFun.getInstance().measurementMap.get(deviceType))
  96. .tag("dev_id",jsonObject.getString("deviceCode"))
  97. .time(nanoTimestamp, TimeUnit.NANOSECONDS);
  98. // 遍历 JSON 对象的键值对,并添加到 Point.Builder 中
  99. for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
  100. String key = entry.getKey();
  101. if (!key.equals("time")&&!key.equals("deviceCode")) {
  102. processField(pointBuilder,key, entry.getValue(),params);
  103. }
  104. }
  105. // 构建 Point 对象
  106. try {
  107. return pointBuilder.build();
  108. }catch(Exception ex){
  109. }
  110. return null;
  111. }
  112. //指定列处理
  113. private static void processField(Point.Builder pointBuilder, String key, Object value,List<String> params) {
  114. if (value == null) {
  115. return; // 跳过空值
  116. }
  117. //String[]长度如果为0,则说明本系统没有对应字段,则用它本身,反之用第二个元素作为字段名
  118. List<String[]> newParams = params.stream()
  119. .map(s -> s.split("___"))
  120. .collect(Collectors.toList());
  121. for (String[] clies:newParams){
  122. boolean exists = Arrays.asList(clies).contains(key);
  123. if(exists){
  124. String newKey = clies.length>1?clies[1]:clies[0];
  125. if (value instanceof Integer) {
  126. pointBuilder.addField(newKey, (Integer) value);
  127. } else if (value instanceof Long) {
  128. pointBuilder.addField(newKey, (Long) value);
  129. } else if (value instanceof Boolean) {
  130. pointBuilder.addField(newKey, (Boolean) value);
  131. } else if (value instanceof String) {
  132. pointBuilder.addField(newKey, (String) value);
  133. }else if (value instanceof BigDecimal) {
  134. pointBuilder.addField(newKey, ((BigDecimal) value).doubleValue());
  135. }else{
  136. pointBuilder.addField(newKey, (String.valueOf(value)) );
  137. }
  138. break;
  139. }
  140. }
  141. // switch (key) {
  142. // case "flow":
  143. // pointBuilder.addField("flow_cur", ((BigDecimal) value).doubleValue());
  144. // break;
  145. // default:
  146. // if (value instanceof Integer) {
  147. // pointBuilder.addField(key, (Integer) value);
  148. // } else if (value instanceof Long) {
  149. // pointBuilder.addField(key, (Long) value);
  150. // } else if (value instanceof Boolean) {
  151. // pointBuilder.addField(key, (Boolean) value);
  152. // } else if (value instanceof String) {
  153. // pointBuilder.addField(key, (String) value);
  154. // }else if (value instanceof BigDecimal) {
  155. // pointBuilder.addField(key, ((BigDecimal) value).doubleValue());
  156. // }
  157. // else {
  158. //// System.err.println("Unsupported data type for key " + key + ": " + value.getClass().getName());
  159. // }
  160. // break;
  161. // }
  162. }
  163. //获取CST 纳秒时间戳
  164. private static long convertToNanoTimestamp(String dateTimeString) {
  165. // 定义日期时间格式
  166. DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  167. // 解析字符串为 LocalDateTime 对象
  168. LocalDateTime dateTime = LocalDateTime.parse(dateTimeString, formatter);
  169. // 指定时区为CST(Asia/Shanghai)
  170. ZoneId shanghaiZoneId = ZoneId.of("Asia/Shanghai");
  171. // 将LocalDateTime对象转换为ZonedDateTime对象
  172. ZonedDateTime zonedDateTime = dateTime.atZone(shanghaiZoneId);
  173. // 转换为秒级别的时间戳
  174. long epochSecond = dateTime.toEpochSecond(ZoneOffset.UTC);
  175. // 将ZonedDateTime对象转换为Instant对象(UTC时间点)
  176. Instant instant = zonedDateTime.toInstant();
  177. // 计算从1970年1月1日00:00:00 UTC以来的纳秒数
  178. long nanosecondsSinceEpoch = ChronoUnit.NANOS.between(Instant.EPOCH, instant);
  179. return nanosecondsSinceEpoch;
  180. }
  181. private static final SimpleDateFormat outputDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  182. public static void adjustTime(JSONObject jsonObject, int collectionFrequency) throws ParseException {
  183. String timeStr = jsonObject.getString("time");
  184. Long oriLong = TimeTool.convertDateStr2UTC(timeStr);
  185. Date originalDate = new Date(oriLong);
  186. // 计算最近的整分钟时间
  187. Date roundedDate = getNearestTime(originalDate, collectionFrequency);
  188. // 格式化回字符串
  189. String adjustedTimeStr = outputDateFormat.format(roundedDate);
  190. // 更新 JSON 对象中的时间
  191. jsonObject.put("time", adjustedTimeStr);
  192. }
  193. private static Date getNearestTime(Date date, int collectionFrequency) {
  194. long milliseconds = date.getTime();
  195. long minutes = milliseconds / (60 * 1000);
  196. long remainder = minutes % collectionFrequency;
  197. long adjustment = remainder < collectionFrequency / 2 ? -remainder : collectionFrequency - remainder;
  198. // 调整到最近的整分钟时间
  199. long adjustedMinutes = minutes + adjustment;
  200. return new Date(adjustedMinutes * 60 * 1000);
  201. }
  202. }