package io.github.pnoker.gateway.comtool; import com.alibaba.fastjson.JSONObject; import io.github.pnoker.gateway.bizmgr.KprDangyangWaterBizFun; import io.github.pnoker.gateway.bizmgr.baseInit.KprBaseInitFun; import io.github.pnoker.gateway.dbdao.DBMgrProxy; import io.github.pnoker.gateway.dbdao.services.intef.DeviceKindService; import io.github.pnoker.gateway.dbdao.services.intef.TypeDefineService; import io.github.pnoker.gateway.utils.HttpUtil; import org.influxdb.dto.Point; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.ObjectUtils; import java.math.BigDecimal; import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.*; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** * @ClassName ThreadTask * @Description: TODO 当阳任务线程 * @Author LX * @Date 2024/9/3 * @Version V1.0 **/ public class ThreadTask implements Runnable{ private static final Logger log = LoggerFactory.getLogger(ThreadTask.class); private final String taskId; private final Integer collectionFrequency;//采集频率 private String deviceType; private List> deviceList = new ArrayList<>(); public ThreadTask(String taskId,Integer collectionFrequency,String deviceType,List> deviceList) { this.taskId = taskId; this.collectionFrequency = collectionFrequency; this.deviceType = deviceType; this.deviceList = deviceList; } @Override public void run() { try { List deviceRealtimeDataList = new ArrayList<>(); for (Map map : deviceList) { JSONObject devicerealtimeDataTotal = null; try { Map paramRealtime = new HashMap<>(); paramRealtime.put("deviceType", deviceType); paramRealtime.put("deviceCode", String.valueOf(map.get("deviceCode"))); Map headers = new HashMap<>(); headers.put("Authorization", "Bearer " + KprDangyangWaterBizFun.dangyangToken); devicerealtimeDataTotal = JSONObject.parseObject(HttpUtil.sendGet( KprDangyangWaterBizFun.realtimeDataListUrl, paramRealtime, headers)); } catch (Exception ex) { log.error("任务线程" + taskId + " 调用异常:" + ex.getLocalizedMessage()); } try { if (devicerealtimeDataTotal != null && devicerealtimeDataTotal.getInteger("code") == 0 && devicerealtimeDataTotal.getJSONObject("data") != null && devicerealtimeDataTotal.getJSONObject("data").getJSONArray("list") != null && !devicerealtimeDataTotal.getJSONObject("data").getJSONArray("list").isEmpty()) { //TODO 查询数据不为空,插入infulxdb JSONObject deviceRealtimeData = devicerealtimeDataTotal.getJSONObject("data") .getJSONArray("list").getJSONObject(0); adjustTime(deviceRealtimeData, collectionFrequency);//整点rand处理 deviceRealtimeDataList.add(deviceRealtimeData); } else { log.error("任务线程" + taskId + " 执行设备" + String.valueOf(map.get("deviceCode")) + "失败:" + (devicerealtimeDataTotal == null ? null : devicerealtimeDataTotal.toJSONString())); } } catch (Exception ex) { log.error("任务线程" + taskId + " " + String.valueOf(map.get("deviceCode")) + "查询异常:" + ex.getLocalizedMessage() + ";resJson:" + devicerealtimeDataTotal.toJSONString()); } } //TODO 开始插入数据库 //第三方对应的字段集 List params = KprBaseInitFun.getInstance().dangyangParams.get(deviceType); for(JSONObject jsonObject : deviceRealtimeDataList){ //deviceType为表名 //time为当前数据time的纳秒时间戳,已经做过整点处理了 Point pointNanos = createPointFromJson(deviceType,jsonObject,params); KprDangyangWaterBizFun.infulxDbUtil.insert(pointNanos); } }catch(Exception ex){ ex.printStackTrace(); } } public static Point createPointFromJson(String deviceType, JSONObject jsonObject,List params) { // 获取时间戳 long nanoTimestamp = convertToNanoTimestamp(jsonObject.getString("time")); // 创建 Point.Builder 对象 Point.Builder pointBuilder = Point.measurement(KprBaseInitFun.getInstance().measurementMap.get(deviceType)) .tag("dev_id",jsonObject.getString("deviceCode")) .time(nanoTimestamp, TimeUnit.NANOSECONDS); // 遍历 JSON 对象的键值对,并添加到 Point.Builder 中 for (Map.Entry entry : jsonObject.entrySet()) { String key = entry.getKey(); if (!key.equals("time")&&!key.equals("deviceCode")) { processField(pointBuilder,key, entry.getValue(),params); } } // 构建 Point 对象 try { return pointBuilder.build(); }catch(Exception ex){ } return null; } //指定列处理 private static void processField(Point.Builder pointBuilder, String key, Object value,List params) { if (value == null) { return; // 跳过空值 } //String[]长度如果为0,则说明本系统没有对应字段,则用它本身,反之用第二个元素作为字段名 List newParams = params.stream() .map(s -> s.split("___")) .collect(Collectors.toList()); for (String[] clies:newParams){ boolean exists = Arrays.asList(clies).contains(key); if(exists){ String newKey = clies.length>1?clies[1]:clies[0]; if (value instanceof Integer) { pointBuilder.addField(newKey, (Integer) value); } else if (value instanceof Long) { pointBuilder.addField(newKey, (Long) value); } else if (value instanceof Boolean) { pointBuilder.addField(newKey, (Boolean) value); } else if (value instanceof String) { pointBuilder.addField(newKey, (String) value); }else if (value instanceof BigDecimal) { pointBuilder.addField(newKey, ((BigDecimal) value).doubleValue()); }else{ pointBuilder.addField(newKey, (String.valueOf(value)) ); } break; } } // switch (key) { // case "flow": // pointBuilder.addField("flow_cur", ((BigDecimal) value).doubleValue()); // break; // default: // if (value instanceof Integer) { // pointBuilder.addField(key, (Integer) value); // } else if (value instanceof Long) { // pointBuilder.addField(key, (Long) value); // } else if (value instanceof Boolean) { // pointBuilder.addField(key, (Boolean) value); // } else if (value instanceof String) { // pointBuilder.addField(key, (String) value); // }else if (value instanceof BigDecimal) { // pointBuilder.addField(key, ((BigDecimal) value).doubleValue()); // } // else { //// System.err.println("Unsupported data type for key " + key + ": " + value.getClass().getName()); // } // break; // } } //获取CST 纳秒时间戳 private static long convertToNanoTimestamp(String dateTimeString) { // 定义日期时间格式 DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); // 解析字符串为 LocalDateTime 对象 LocalDateTime dateTime = LocalDateTime.parse(dateTimeString, formatter); // 指定时区为CST(Asia/Shanghai) ZoneId shanghaiZoneId = ZoneId.of("Asia/Shanghai"); // 将LocalDateTime对象转换为ZonedDateTime对象 ZonedDateTime zonedDateTime = dateTime.atZone(shanghaiZoneId); // 转换为秒级别的时间戳 long epochSecond = dateTime.toEpochSecond(ZoneOffset.UTC); // 将ZonedDateTime对象转换为Instant对象(UTC时间点) Instant instant = zonedDateTime.toInstant(); // 计算从1970年1月1日00:00:00 UTC以来的纳秒数 long nanosecondsSinceEpoch = ChronoUnit.NANOS.between(Instant.EPOCH, instant); return nanosecondsSinceEpoch; } private static final SimpleDateFormat outputDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static void adjustTime(JSONObject jsonObject, int collectionFrequency) throws ParseException { String timeStr = jsonObject.getString("time"); Long oriLong = TimeTool.convertDateStr2UTC(timeStr); Date originalDate = new Date(oriLong); // 计算最近的整分钟时间 Date roundedDate = getNearestTime(originalDate, collectionFrequency); // 格式化回字符串 String adjustedTimeStr = outputDateFormat.format(roundedDate); // 更新 JSON 对象中的时间 jsonObject.put("time", adjustedTimeStr); } private static Date getNearestTime(Date date, int collectionFrequency) { long milliseconds = date.getTime(); long minutes = milliseconds / (60 * 1000); long remainder = minutes % collectionFrequency; long adjustment = remainder < collectionFrequency / 2 ? -remainder : collectionFrequency - remainder; // 调整到最近的整分钟时间 long adjustedMinutes = minutes + adjustment; return new Date(adjustedMinutes * 60 * 1000); } }