123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- package io.github.pnoker.gateway.comtool;
- import com.alibaba.fastjson.JSONObject;
- import io.github.pnoker.gateway.bizmgr.KprDangyangWaterBizFun;
- import io.github.pnoker.gateway.bizmgr.baseInit.KprBaseInitFun;
- import io.github.pnoker.gateway.dbdao.DBMgrProxy;
- import io.github.pnoker.gateway.dbdao.services.intef.DeviceKindService;
- import io.github.pnoker.gateway.dbdao.services.intef.TypeDefineService;
- import io.github.pnoker.gateway.utils.HttpUtil;
- import org.influxdb.dto.Point;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.util.ObjectUtils;
- import java.math.BigDecimal;
- import java.text.ParseException;
- import java.text.SimpleDateFormat;
- import java.time.*;
- import java.time.format.DateTimeFormatter;
- import java.time.temporal.ChronoUnit;
- import java.util.*;
- import java.util.concurrent.TimeUnit;
- import java.util.stream.Collectors;
- /**
- * @ClassName ThreadTask
- * @Description: TODO 当阳任务线程
- * @Author LX
- * @Date 2024/9/3
- * @Version V1.0
- **/
- public class ThreadTask implements Runnable{
- private static final Logger log = LoggerFactory.getLogger(ThreadTask.class);
- private final String taskId;
- private final Integer collectionFrequency;//采集频率
- private String deviceType;
- private List<Map<String,Object>> deviceList = new ArrayList<>();
- public ThreadTask(String taskId,Integer collectionFrequency,String deviceType,List<Map<String,Object>> deviceList) {
- this.taskId = taskId;
- this.collectionFrequency = collectionFrequency;
- this.deviceType = deviceType;
- this.deviceList = deviceList;
- }
- @Override
- public void run() {
- try {
- List<JSONObject> deviceRealtimeDataList = new ArrayList<>();
- for (Map<String, Object> map : deviceList) {
- JSONObject devicerealtimeDataTotal = null;
- try {
- Map<String, String> paramRealtime = new HashMap<>();
- paramRealtime.put("deviceType", deviceType);
- paramRealtime.put("deviceCode", String.valueOf(map.get("deviceCode")));
- Map<String, String> headers = new HashMap<>();
- headers.put("Authorization", "Bearer " + KprDangyangWaterBizFun.dangyangToken);
- devicerealtimeDataTotal = JSONObject.parseObject(HttpUtil.sendGet(
- KprDangyangWaterBizFun.realtimeDataListUrl, paramRealtime, headers));
- } catch (Exception ex) {
- log.error("任务线程" + taskId + " 调用异常:" + ex.getLocalizedMessage());
- }
- try {
- if (devicerealtimeDataTotal != null && devicerealtimeDataTotal.getInteger("code") == 0 &&
- devicerealtimeDataTotal.getJSONObject("data") != null
- && devicerealtimeDataTotal.getJSONObject("data").getJSONArray("list") != null
- && !devicerealtimeDataTotal.getJSONObject("data").getJSONArray("list").isEmpty()) {
- //TODO 查询数据不为空,插入infulxdb
- JSONObject deviceRealtimeData = devicerealtimeDataTotal.getJSONObject("data")
- .getJSONArray("list").getJSONObject(0);
- adjustTime(deviceRealtimeData, collectionFrequency);//整点rand处理
- deviceRealtimeDataList.add(deviceRealtimeData);
- } else {
- log.error("任务线程" + taskId + " 执行设备"
- + String.valueOf(map.get("deviceCode")) +
- "失败:" + (devicerealtimeDataTotal == null ?
- null : devicerealtimeDataTotal.toJSONString()));
- }
- } catch (Exception ex) {
- log.error("任务线程" + taskId + " " + String.valueOf(map.get("deviceCode")) + "查询异常:" + ex.getLocalizedMessage() + ";resJson:" + devicerealtimeDataTotal.toJSONString());
- }
- }
- //TODO 开始插入数据库
- //第三方对应的字段集
- List<String> params = KprBaseInitFun.getInstance().dangyangParams.get(deviceType);
- for(JSONObject jsonObject : deviceRealtimeDataList){
- //deviceType为表名
- //time为当前数据time的纳秒时间戳,已经做过整点处理了
- Point pointNanos = createPointFromJson(deviceType,jsonObject,params);
- KprDangyangWaterBizFun.infulxDbUtil.insert(pointNanos);
- }
- }catch(Exception ex){
- ex.printStackTrace();
- }
- }
- public static Point createPointFromJson(String deviceType, JSONObject jsonObject,List<String> params) {
- // 获取时间戳
- long nanoTimestamp = convertToNanoTimestamp(jsonObject.getString("time"));
- // 创建 Point.Builder 对象
- Point.Builder pointBuilder = Point.measurement(KprBaseInitFun.getInstance().measurementMap.get(deviceType))
- .tag("dev_id",jsonObject.getString("deviceCode"))
- .time(nanoTimestamp, TimeUnit.NANOSECONDS);
- // 遍历 JSON 对象的键值对,并添加到 Point.Builder 中
- for (Map.Entry<String, Object> entry : jsonObject.entrySet()) {
- String key = entry.getKey();
- if (!key.equals("time")&&!key.equals("deviceCode")) {
- processField(pointBuilder,key, entry.getValue(),params);
- }
- }
- // 构建 Point 对象
- try {
- return pointBuilder.build();
- }catch(Exception ex){
- }
- return null;
- }
- //指定列处理
- private static void processField(Point.Builder pointBuilder, String key, Object value,List<String> params) {
- if (value == null) {
- return; // 跳过空值
- }
- //String[]长度如果为0,则说明本系统没有对应字段,则用它本身,反之用第二个元素作为字段名
- List<String[]> newParams = params.stream()
- .map(s -> s.split("___"))
- .collect(Collectors.toList());
- for (String[] clies:newParams){
- boolean exists = Arrays.asList(clies).contains(key);
- if(exists){
- String newKey = clies.length>1?clies[1]:clies[0];
- if (value instanceof Integer) {
- pointBuilder.addField(newKey, (Integer) value);
- } else if (value instanceof Long) {
- pointBuilder.addField(newKey, (Long) value);
- } else if (value instanceof Boolean) {
- pointBuilder.addField(newKey, (Boolean) value);
- } else if (value instanceof String) {
- pointBuilder.addField(newKey, (String) value);
- }else if (value instanceof BigDecimal) {
- pointBuilder.addField(newKey, ((BigDecimal) value).doubleValue());
- }else{
- pointBuilder.addField(newKey, (String.valueOf(value)) );
- }
- break;
- }
- }
- // switch (key) {
- // case "flow":
- // pointBuilder.addField("flow_cur", ((BigDecimal) value).doubleValue());
- // break;
- // default:
- // if (value instanceof Integer) {
- // pointBuilder.addField(key, (Integer) value);
- // } else if (value instanceof Long) {
- // pointBuilder.addField(key, (Long) value);
- // } else if (value instanceof Boolean) {
- // pointBuilder.addField(key, (Boolean) value);
- // } else if (value instanceof String) {
- // pointBuilder.addField(key, (String) value);
- // }else if (value instanceof BigDecimal) {
- // pointBuilder.addField(key, ((BigDecimal) value).doubleValue());
- // }
- // else {
- //// System.err.println("Unsupported data type for key " + key + ": " + value.getClass().getName());
- // }
- // break;
- // }
- }
- //获取CST 纳秒时间戳
- private static long convertToNanoTimestamp(String dateTimeString) {
- // 定义日期时间格式
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- // 解析字符串为 LocalDateTime 对象
- LocalDateTime dateTime = LocalDateTime.parse(dateTimeString, formatter);
- // 指定时区为CST(Asia/Shanghai)
- ZoneId shanghaiZoneId = ZoneId.of("Asia/Shanghai");
- // 将LocalDateTime对象转换为ZonedDateTime对象
- ZonedDateTime zonedDateTime = dateTime.atZone(shanghaiZoneId);
- // 转换为秒级别的时间戳
- long epochSecond = dateTime.toEpochSecond(ZoneOffset.UTC);
- // 将ZonedDateTime对象转换为Instant对象(UTC时间点)
- Instant instant = zonedDateTime.toInstant();
- // 计算从1970年1月1日00:00:00 UTC以来的纳秒数
- long nanosecondsSinceEpoch = ChronoUnit.NANOS.between(Instant.EPOCH, instant);
- return nanosecondsSinceEpoch;
- }
- private static final SimpleDateFormat outputDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- public static void adjustTime(JSONObject jsonObject, int collectionFrequency) throws ParseException {
- String timeStr = jsonObject.getString("time");
- Long oriLong = TimeTool.convertDateStr2UTC(timeStr);
- Date originalDate = new Date(oriLong);
- // 计算最近的整分钟时间
- Date roundedDate = getNearestTime(originalDate, collectionFrequency);
- // 格式化回字符串
- String adjustedTimeStr = outputDateFormat.format(roundedDate);
- // 更新 JSON 对象中的时间
- jsonObject.put("time", adjustedTimeStr);
- }
- private static Date getNearestTime(Date date, int collectionFrequency) {
- long milliseconds = date.getTime();
- long minutes = milliseconds / (60 * 1000);
- long remainder = minutes % collectionFrequency;
- long adjustment = remainder < collectionFrequency / 2 ? -remainder : collectionFrequency - remainder;
- // 调整到最近的整分钟时间
- long adjustedMinutes = minutes + adjustment;
- return new Date(adjustedMinutes * 60 * 1000);
- }
- }
|