package io.github.pnoker.gateway.bizmgr; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import io.github.pnoker.gateway.comtool.*; import io.github.pnoker.gateway.comtool.dangyangThread.ThreadDinzhiHistoryTask; import io.github.pnoker.gateway.comtool.dangyangThread.ThreadHistoryTask; import io.github.pnoker.gateway.comtool.dangyangThread.ThreadPoolTaskTool; import io.github.pnoker.gateway.comtool.dangyangThread.ThreadTask; import io.github.pnoker.gateway.dbdao.DBMgrProxy; import io.github.pnoker.gateway.dbdao.services.intef.DeviceKindService; import io.github.pnoker.gateway.dbdao.services.intef.TypeDefineService; import io.github.pnoker.gateway.utils.HttpUtil; import io.github.pnoker.gateway.utils.InfulxDbUtil; import org.apache.poi.ss.usermodel.Cell; import org.apache.poi.ss.usermodel.Row; import org.apache.poi.ss.usermodel.Sheet; import org.apache.poi.ss.usermodel.Workbook; import org.apache.poi.xssf.usermodel.XSSFWorkbook; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; import java.io.FileOutputStream; import java.io.IOException; import java.util.*; import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; /** * @ClassName KprDangyangWaterBizFun * @Description: TODO 当阳数据采集 * @Author LX * @Date 2024/9/3 * @Version V1.0 **/ public class KprDangyangWaterBizFun { private static final Logger log = LoggerFactory.getLogger(KprDangyangWaterBizFun.class); private final static String mStrClassName = "KprDangyangWaterBizFun"; private final static String EMPTY_NULL = "NULL"; //TODO 每隔一段时间刷新集合值 public static List> deviceKindList = new ArrayList<>();//设备类型字典 public static List> typeDefineList = new ArrayList<>();//设备类型详细字典 public static DeviceKindService deviceKindService(){ return DBMgrProxy.getInstance().applyDeviceKineApi(); } public static TypeDefineService typeDefineService(){ return DBMgrProxy.getInstance().applyTypeDefineApi(); } public static String dangyangToken = null;//调用数据接口token public static InfulxDbUtil infulxDbUtil = null;//infulx工具类对象 public static JSONArray deviceType = null;//设备类型集合 public static String username = null;//token调用账号 public static String password = null;//token调用密码 public static String tokenUrl = null;//token接口地址 public static String realtimeDataListUrl = null;//实时数据接口地址 public static String hisDataListUrl = null;//历史数据接口地址 public static String listUrl = null;//设备列表接口地址 private static boolean checkApplicationParam(){ if(deviceType==null||infulxDbUtil==null|| StringUtils.isEmpty(dangyangToken) || StringUtils.isEmpty(realtimeDataListUrl) || StringUtils.isEmpty(hisDataListUrl) || StringUtils.isEmpty(listUrl) || StringUtils.isEmpty(tokenUrl) || StringUtils.isEmpty(username) || StringUtils.isEmpty(password)){ return false; }else{ return true; } } //TODO 刷新获取字典信息 public static void flushDefine(){ deviceKindList = deviceKindService().batchQueryWithsEx("","",new HashMap<>(),new HashMap<>(), "",""); Map andWhere = new HashMap<>(); andWhere.put("kind",0); typeDefineList = typeDefineService().batchQueryWithsEx("","",andWhere,new HashMap<>() ,"",""); } //TODO 获取token public static JSONObject getDangyangToken(){ try { Map headers = new HashMap<>(); headers.put("Content-type", "application/json"); headers.put("Tenant-Id", "1"); JSONObject paramObj = new JSONObject(); paramObj.put("username",username); paramObj.put("password",password); String tokenStr = HttpUtil.sendPost(tokenUrl,paramObj.toJSONString(), headers); JSONObject resObj = JSONObject.parseObject(tokenStr); if(resObj!=null&&(resObj.getInteger("code")==0)){ return resObj.getJSONObject("data"); }else{ log.error("调用设备列表失败:"+tokenStr); return null; } }catch(Exception ex){ log.error("获取token数据异常:"+ex.getLocalizedMessage()); return null; } } //TODO 同步历史数据 public static void checkOneMonth(){ try{ if(!checkApplicationParam()){ log.error("获取配置参数错误"); return; } Map headers = new HashMap<>(); headers.put("Authorization", "Bearer "+dangyangToken); for (int i = 0; i > totalList = new ArrayList<>();//当前类型设备列表总集 //TODO 外层循环是要查一个类型的所有设备列表 JSONObject itemType = deviceType.getJSONObject(i); Map paramsTotal = new HashMap<>(); paramsTotal.put("pageNo","1"); paramsTotal.put("pageSize","1"); paramsTotal.put("deviceType",itemType.getString("key")); JSONObject totalRes = JSONObject.parseObject(HttpUtil.sendGet( listUrl,paramsTotal,headers)); if(totalRes!=null&&totalRes.getInteger("code")==0 &&totalRes.getJSONObject("data")!=null &&!totalRes.getJSONObject("data") .getJSONArray("list").isEmpty()) { //TODO 优化 以分页方式查询所有,初始分页行数定为200查询速率较好 int nTotals = totalRes.getJSONObject("data").getInteger("total"); int pageNum = nTotals % 200 == 0 ? nTotals / 200 : (nTotals / 200) + 1;//总页数 Integer limit = 200; if (pageNum <= 1) { limit = nTotals;//说明总数比第一页小 } for (int k = 1; k <= pageNum; k++) { Map paramsPage = new HashMap<>(); paramsPage.put("pageNo", String.valueOf(k)); paramsPage.put("pageSize", String.valueOf(limit)); paramsPage.put("deviceType", itemType.getString("key")); JSONObject pageRes = JSONObject.parseObject(HttpUtil.sendGet( listUrl, paramsPage, headers)); if (pageRes != null && pageRes.getInteger("code") == 0 &&pageRes.getJSONObject("data")!=null &&!pageRes.getJSONObject("data").getJSONArray("list").isEmpty()) { //TODO 将分页获取的列表数据添加至总集中 totalList.addAll(JSON.parseObject(pageRes.getJSONObject("data").getJSONArray("list").toJSONString() , new TypeReference>>() { })); } else { log.error(TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(), TimeTool.TIMESTAMP_FORMAT) + "获取设备列表失败:" + itemType.getString("key") + ";" + (pageRes == null ? null : pageRes.toJSONString()) + ";param:" + JSON.toJSONString(paramsPage)); } } List> nullFrequencyList = totalList.stream() .filter(map -> !map.containsKey("collectionFrequency") || map.get("collectionFrequency") == null) .collect(Collectors.toList()); // 输出结果 nullFrequencyList.forEach(System.out::println); //TODO 获取到当前类型的设备列表总集后,查询总集的历史数据,并添加到infulxdb // 根据 采集频率 collectionFrequency (秒)字段进行分组 // 要根据采集频率取建立定时任务 Map>> groupedByFrequency = totalList.stream() .collect(Collectors.groupingBy(map -> { Object frequency = map.get("collectionFrequency"); return (frequency == null) ? 0 : (Integer) frequency; })); //TODO 创建任务池 ScheduledExecutorService scheduler = ThreadPoolTaskTool.createTaskScheduler(groupedByFrequency.keySet().size()); for (Integer groupKey:groupedByFrequency.keySet()){ String taskId = itemType.getString("key")+"_history"+"_"+groupKey; //TODO 任务时间间隔 if (groupKey==0||groupKey<0){ continue; } // if(itemType.getString("key").equals("FLOW_METER")) {//测试用 ThreadHistoryTask threadHistoryTask = new ThreadHistoryTask(taskId, groupKey , itemType.getString("key"), groupedByFrequency.get(groupKey)); ThreadPoolTaskTool.scheduleHistroyTask(scheduler, taskId, threadHistoryTask, getInitialDelay() , 24 * 60 * 60); // } } } } }catch(Exception ex){ ex.printStackTrace(); log.error(TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT) +"获取历史数据异常:"+ex.getLocalizedMessage()); } } //TODO 同步定制历史数据 暂定每小时去取一天的数据 public static void checkDingzhiHistory(){ try{ if(!checkApplicationParam()){ log.error("获取配置参数错误"); return; } Map headers = new HashMap<>(); headers.put("Authorization", "Bearer "+dangyangToken); for (int i = 0; i > totalList = new ArrayList<>();//当前类型设备列表总集 //TODO 外层循环是要查一个类型的所有设备列表 JSONObject itemType = deviceType.getJSONObject(i); Map paramsTotal = new HashMap<>(); paramsTotal.put("pageNo","1"); paramsTotal.put("pageSize","1"); paramsTotal.put("deviceType",itemType.getString("key")); JSONObject totalRes = JSONObject.parseObject(HttpUtil.sendGet( listUrl,paramsTotal,headers)); if(totalRes!=null&&totalRes.getInteger("code")==0 &&totalRes.getJSONObject("data")!=null &&!totalRes.getJSONObject("data") .getJSONArray("list").isEmpty()) { //TODO 优化 以分页方式查询所有,初始分页行数定为200查询速率较好 int nTotals = totalRes.getJSONObject("data").getInteger("total"); int pageNum = nTotals % 200 == 0 ? nTotals / 200 : (nTotals / 200) + 1;//总页数 Integer limit = 200; if (pageNum <= 1) { limit = nTotals;//说明总数比第一页小 } for (int k = 1; k <= pageNum; k++) { Map paramsPage = new HashMap<>(); paramsPage.put("pageNo", String.valueOf(k)); paramsPage.put("pageSize", String.valueOf(limit)); paramsPage.put("deviceType", itemType.getString("key")); JSONObject pageRes = JSONObject.parseObject(HttpUtil.sendGet( listUrl, paramsPage, headers)); if (pageRes != null && pageRes.getInteger("code") == 0 &&pageRes.getJSONObject("data")!=null&& !pageRes.getJSONObject("data").getJSONArray("list").isEmpty()) { //TODO 将分页获取的列表数据添加至总集中 totalList.addAll(JSON.parseObject(pageRes.getJSONObject("data").getJSONArray("list").toJSONString() , new TypeReference>>() { })); } else { log.error(TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(), TimeTool.TIMESTAMP_FORMAT) + "获取设备列表失败:" + itemType.getString("key") + ";" + (pageRes == null ? null : pageRes.toJSONString()) + ";param:" + JSON.toJSONString(paramsPage)); } } List> nullFrequencyList = totalList.stream() .filter(map -> !map.containsKey("collectionFrequency") || map.get("collectionFrequency") == null) .collect(Collectors.toList()); // 输出结果 nullFrequencyList.forEach(System.out::println); //TODO 获取到当前类型的设备列表总集后,查询总集的历史数据,并添加到infulxdb // 根据 采集频率 collectionFrequency (秒)字段进行分组 // 要根据采集频率取建立定时任务 Map>> groupedByFrequency = totalList.stream() .collect(Collectors.groupingBy(map -> { Object frequency = map.get("collectionFrequency"); return (frequency == null) ? 0 : (Integer) frequency; })); //TODO 创建任务池 ScheduledExecutorService scheduler = ThreadPoolTaskTool.createTaskScheduler(groupedByFrequency.keySet().size()); for (Integer groupKey:groupedByFrequency.keySet()){ String taskId = itemType.getString("key")+"_history_dingzhi"+"_"+groupKey; //TODO 任务时间间隔 if (groupKey==0||groupKey<0){ continue; } // if(itemType.getString("key").equals("FLOW_METER")) {//测试用 ThreadDinzhiHistoryTask dinzhiHistoryTask= new ThreadDinzhiHistoryTask(taskId, groupKey , itemType.getString("key"), groupedByFrequency.get(groupKey)); ThreadPoolTaskTool.scheduleDingzhiHistroyTask(scheduler, taskId, dinzhiHistoryTask, 0L , 30 * 60);//每半小时执行 // } } } } }catch(Exception ex){ ex.printStackTrace(); log.error(TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT) +"获取定制历史数据异常:"+ex.getLocalizedMessage()); } } private static long getInitialDelay() { Calendar now = Calendar.getInstance(); Calendar nextRun = (Calendar) now.clone(); // 设置为明天的0点 nextRun.set(Calendar.HOUR_OF_DAY, 0); nextRun.set(Calendar.MINUTE, 0); nextRun.set(Calendar.SECOND, 0); nextRun.set(Calendar.MILLISECOND, 0); // 如果现在已经过了0点,设置为明天的0点 if (now.after(nextRun)) { nextRun.add(Calendar.DAY_OF_MONTH, 1); } // 计算从现在到下一个0点的延迟时间(以秒为单位) return (nextRun.getTimeInMillis() - now.getTimeInMillis()) / 1000; } //TODO 获取实时数据 /** * ①首先查询所有的设备列表 * ②以列表数据为基准查询实时数据和历史数据 * @param */ public static void checkRealtimeData(){ try{ if(!checkApplicationParam()){ log.error("获取配置参数错误"); return; } Map headers = new HashMap<>(); headers.put("Authorization", "Bearer "+dangyangToken); for (int i = 0; i > totalList = new ArrayList<>();//当前类型设备列表总集 //TODO 外层循环是要查一个类型的所有设备列表 JSONObject itemType = deviceType.getJSONObject(i); Map paramsTotal = new HashMap<>(); paramsTotal.put("pageNo","1"); paramsTotal.put("pageSize","1"); paramsTotal.put("deviceType",itemType.getString("key")); JSONObject totalRes = JSONObject.parseObject(HttpUtil.sendGet( listUrl,paramsTotal,headers)); if(totalRes!=null&&totalRes.getInteger("code")==0&&totalRes.getJSONObject("data")!=null&&!totalRes.getJSONObject("data").getJSONArray("list").isEmpty()) { //TODO 优化 以分页方式查询所有,初始分页行数定为200查询速率较好 int nTotals = totalRes.getJSONObject("data").getInteger("total"); int pageNum = nTotals % 200 == 0 ? nTotals / 200 : (nTotals / 200) + 1;//总页数 Integer limit = 200; if (pageNum <= 1) { limit = nTotals;//说明总数比第一页小 } for (int k = 1; k <= pageNum; k++) { Map paramsPage = new HashMap<>(); paramsPage.put("pageNo",String.valueOf(k)); paramsPage.put("pageSize",String.valueOf(limit)); paramsPage.put("deviceType",itemType.getString("key")); JSONObject pageRes = JSONObject.parseObject(HttpUtil.sendGet( listUrl,paramsPage,headers)); if(pageRes!=null&&pageRes.getInteger("code")==0&&!pageRes.getJSONObject("data").getJSONArray("list").isEmpty()){ //TODO 将分页获取的列表数据添加至总集中 totalList.addAll(JSON.parseObject(pageRes.getJSONObject("data").getJSONArray("list").toJSONString() , new TypeReference>>(){})); }else{ log.error(TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT) +"获取设备列表失败:"+itemType.getString("key")+";"+(pageRes==null?null:pageRes.toJSONString()) +";param:"+ JSON.toJSONString(paramsPage)); } } //TODO 放入定时线程执行设备数据备份 Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { createExcel(itemType.getString("key"), totalList); scheduleHourlyTask(timer,itemType.getString("key"), totalList); } }, 5000); //TODO 获取到当前类型的设备列表总集后,查询总集的实时数据,并添加到infulxdb // 根据 采集频率 collectionFrequency (秒)字段进行分组 // 要根据采集频率取建立定时任务 Map>> groupedByFrequency = totalList.stream() .collect(Collectors.groupingBy(map -> { Object frequency = map.get("collectionFrequency"); return (frequency == null) ? 0 : (Integer) frequency; })); //TODO 创建任务池 ScheduledExecutorService scheduler = ThreadPoolTaskTool.createTaskScheduler(groupedByFrequency.keySet().size()); for (Integer groupKey:groupedByFrequency.keySet()){ String taskId = itemType.getString("key")+"_"+groupKey; //TODO 任务时间间隔 if (groupKey==0||groupKey<0){ continue; } ThreadTask threadTask = new ThreadTask(taskId,groupKey ,itemType.getString("key"),groupedByFrequency.get(groupKey)); // if(itemType.getString("key").equals("ELEC_CTRL_VALVE")) { ThreadPoolTaskTool.scheduleSecTask(scheduler, taskId, threadTask, groupKey); // } } }else{ log.error(TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT) +"获取设备列表总数失败:"+itemType.getString("key")+";"+(totalRes==null?null:totalRes.toJSONString()) +";param:"+ JSON.toJSONString(paramsTotal)); } } }catch(Exception ex){ log.error(TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT) +"获取实时数据异常:"+ex.getLocalizedMessage()); } } //TODO 每小时更新设备列表信息excel private static void scheduleHourlyTask(Timer timer,String key,List> totalList) { // 创建一个每小时执行的任务 TimerTask hourlyTask = new TimerTask() { @Override public void run() { createExcel(key, totalList); } }; // 设置任务每小时执行一次 long delay = 0; // 不延迟,立即开始 long period = 3600000; // 每小时执行一次(3600000毫秒 = 1小时) timer.scheduleAtFixedRate(hourlyTask, delay, period); } public static void createExcel(String deviceType,List> totalList){ // 创建一个新的工作簿 Workbook workbook = new XSSFWorkbook(); // 创建一个新的工作表 Sheet sheet = workbook.createSheet(deviceType); //创建行头 Row headerRow = sheet.createRow(0); Cell cell = headerRow.createCell(0); cell.setCellValue("设备类型"); Cell cell1 = headerRow.createCell(1); cell1.setCellValue("设备编号"); Cell cell2 = headerRow.createCell(2); cell2.setCellValue("设备名称"); Cell cell3 = headerRow.createCell(3); cell3.setCellValue("水厂名称"); Cell cell4 = headerRow.createCell(4); cell4.setCellValue("是否启动"); Cell cell5 = headerRow.createCell(5); cell5.setCellValue("经度"); Cell cell6 = headerRow.createCell(6); cell6.setCellValue("纬度"); Cell cell7 = headerRow.createCell(7); cell7.setCellValue("采集频率/秒"); int k = 1; for (Map map:totalList) { Row row = sheet.createRow((k)); //创建行内容 Cell cellData = row.createCell(0); cellData.setCellValue(String.valueOf(map.get("deviceType"))); Cell cellData1 = row.createCell(1); cellData1.setCellValue(String.valueOf(map.get("deviceCode"))); Cell cellData2 = row.createCell(2); cellData2.setCellValue(String.valueOf(map.get("deviceName"))); Cell cellData3 = row.createCell(3); cellData3.setCellValue(String.valueOf(map.get("waterworks"))); Cell cellData4 = row.createCell(4); cellData4.setCellValue(String.valueOf(map.get("enableFlag"))); Cell cellData5 = row.createCell(5); cellData5.setCellValue(String.valueOf(map.get("longitude"))); Cell cellData6 = row.createCell(6); cellData6.setCellValue(String.valueOf(map.get("latitude"))); Cell cellData7 = row.createCell(7); cellData7.setCellValue(String.valueOf(map.get("collectionFrequency"))); //// 填充数据 // Object[][] data = { // {1, "John Doe", 30}, // {2, "Jane Smith", 25}, // {3, "Mike Brown", 35} // }; // 自动调整列宽 for (int h = 0; h < map.keySet().size(); h++) { sheet.autoSizeColumn(h); } k++; } // 将工作簿写入文件 try (FileOutputStream fileOut = new FileOutputStream("./"+deviceType+".xlsx")) { workbook.write(fileOut); } catch (IOException e) { e.printStackTrace(); } // 关闭工作簿 try { workbook.close(); } catch (IOException e) { e.printStackTrace(); } System.out.println("Excel file generated successfully!"); } }