|
@@ -0,0 +1,651 @@
|
|
|
+package io.github.pnoker.gateway.bizmgr;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import io.github.pnoker.gateway.comtool.TimeTool;
|
|
|
+import io.github.pnoker.gateway.dbdao.DBMgrProxy;
|
|
|
+import io.github.pnoker.gateway.dbdao.services.intef.WaterTapWaterService;
|
|
|
+import org.influxdb.dto.QueryResult;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
+import org.springframework.util.ObjectUtils;
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
+
|
|
|
+import java.time.Duration;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.time.LocalTime;
|
|
|
+import java.time.YearMonth;
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
+import java.time.temporal.TemporalAdjusters;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.ThreadLocalRandom;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @ClassName KprJiangjinAimWaterBizFun
|
|
|
+ * @Description: TODO 江津水量预测相关
|
|
|
+ * @Author LX
|
|
|
+ * @Date 2024/12/5
|
|
|
+ * @Version V1.0
|
|
|
+ **/
|
|
|
+public class KprJiangjinAimWaterBizFun {
|
|
|
+ private static final Logger log = LoggerFactory.getLogger(KprJiangjinAimWaterBizFun.class);
|
|
|
+
|
|
|
+ private final static String mStrClassName = "KprJiangjinAimWaterBizFun";
|
|
|
+ private final static String EMPTY_NULL = "NULL";
|
|
|
+
|
|
|
+ public static WaterTapWaterService getWaterTapWaterApi(){
|
|
|
+ return DBMgrProxy.getInstance().applyWaterTapWaterService();
|
|
|
+ }
|
|
|
+ static DateTimeFormatter formater = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
+
|
|
|
+ //TODO 初始化添加对比远通数据
|
|
|
+ public static void initTapWaterData(String startDate){
|
|
|
+ //TODO ① 首先查询水厂设备配置信息
|
|
|
+ try {
|
|
|
+ List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList(null);
|
|
|
+ if(!CollectionUtils.isEmpty(configList)){
|
|
|
+ //声明总数据的数据数组
|
|
|
+ List<Map<String,Object>> newRecordAll = Collections.synchronizedList(new LinkedList<Map<String,Object>>());
|
|
|
+ //TODO ②开启多线程并发处理各水厂设备的数据
|
|
|
+ System.out.println("原始数据采集开始时间:"+ TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
|
|
|
+ System.out.println("原始数据采集进行中,起始时间("+startDate+"):......");
|
|
|
+ final CountDownLatch latch = new CountDownLatch(configList.size());
|
|
|
+ for(Map<String,Object> item:configList){
|
|
|
+ try{
|
|
|
+ new Thread(() -> {
|
|
|
+ //TODO 根据当前配置信息item 查询远通数据中的历史数据
|
|
|
+ //TODO 首先查询当前水厂设备的2023-11-01之后到最新数据的数据总数,然后分页形式获取数据
|
|
|
+ Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(" WHERE TAG_CODE = '"+item.get("collcation_tag")+"' and QCQUISITION_TIME >= to_date('"+startDate+"', 'yyyy-mm-dd hh24:mi:ss')");
|
|
|
+ if(itemCount!=null&&itemCount>0) {
|
|
|
+ //TODO 优化 以分页方式查询所有,初始分页行数定为2000查询速率较好
|
|
|
+ int pageNum = itemCount%2000==0?itemCount/2000:(itemCount/2000)+1;//总页数
|
|
|
+ Integer limit = 2000;
|
|
|
+ if(pageNum<=1){
|
|
|
+ limit = itemCount;//说明总数比第一页小
|
|
|
+ }
|
|
|
+ for (int i = 0; i < pageNum; i++) {
|
|
|
+ Integer offset = i*limit;
|
|
|
+ //tapWaterHistoryList 为远通水量数据源
|
|
|
+ //TODO 因为是orcale数据库 分页方式以伪列rownum为分页条件,因此limit实际上永远比offset大2000,因此只要不是第一页,limit=offset+2000
|
|
|
+ Integer limitNew = 0;
|
|
|
+ if(i>0){
|
|
|
+ limitNew = offset+2000;
|
|
|
+ }else{
|
|
|
+ limitNew = 2000;
|
|
|
+ }
|
|
|
+ List<Map<String,Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(limitNew,offset," AND TAG_CODE = '"+item.get("collcation_tag")+"' and QCQUISITION_TIME >= to_date('"+startDate+"', 'yyyy-mm-dd hh24:mi:ss')");
|
|
|
+ if(!CollectionUtils.isEmpty(tapWaterHistoryList)){
|
|
|
+ //TODO 循环远通水量数据列表,查询数据不存在的话就插入
|
|
|
+ for (int j = 0; j < tapWaterHistoryList.size(); j++) {
|
|
|
+ List<Map<String,Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordList(1,0,
|
|
|
+ " WHERE collcation_tag = '"+tapWaterHistoryList.get(j).get("TAG_CODE")
|
|
|
+ +"' AND zone_name = '"+tapWaterHistoryList.get(j).get("ORG_NAME")+"' AND time = '"+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+"'");
|
|
|
+ if(CollectionUtils.isEmpty(queryWaterRecord)){
|
|
|
+ //TODO 说明没插入过本系统,执行插入
|
|
|
+ String extend = " ('"+tapWaterHistoryList.get(j).get("TAG_CODE")+"','"+tapWaterHistoryList.get(j).get("ORG_NAME")+"',"
|
|
|
+ + ""+TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("QCQUISITION_TIME").toString())+","
|
|
|
+ + "'"+tapWaterHistoryList.get(j).get("VAL")+"',"
|
|
|
+ + TimeTool.convertDateStr2UTC(tapWaterHistoryList.get(j).get("UPDATE_TIME").toString())
|
|
|
+ +") ";
|
|
|
+ int insertCode = getWaterTapWaterApi().insertWaterCollectionRecord(extend);
|
|
|
+ if(insertCode<=0){
|
|
|
+ log.error(String.format("Batch initTapWaterDataThread 未成功:{%s} ",
|
|
|
+ extend));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ latch.countDown();
|
|
|
+
|
|
|
+ }).start();
|
|
|
+ }catch(Exception ex){
|
|
|
+ log.error(String.format("Batch initTapWaterDataThread 水厂:{%s},标识:{%s} ERROR:{%s} ",
|
|
|
+ item.get("zone_name"),
|
|
|
+ item.get("collcation_tag")
|
|
|
+ , ex.getLocalizedMessage()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ latch.await();
|
|
|
+ System.out.println("原始数据采集检查机制("+startDate+")结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
|
|
|
+ }
|
|
|
+ }catch(Exception ex){
|
|
|
+ log.error(String.format("Batch initTapWaterData ERROR:{%s} "
|
|
|
+ , ex.getLocalizedMessage()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //TODO 初始化添加计算水厂所有设备每日的每小时用水量计算
|
|
|
+ public static void initWaterCollecationReacordAll(String startFindTime){
|
|
|
+ //TODO ① 首先查询水厂设备配置信息
|
|
|
+ try {
|
|
|
+
|
|
|
+ System.out.println("计算小时用水量开始时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
|
|
|
+ System.out.println("计算小时用水量起始时间:"+startFindTime+"进行中:......");
|
|
|
+ List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList(null);
|
|
|
+ if (!CollectionUtils.isEmpty(configList)) {
|
|
|
+ //TODO 按照组织机构分组
|
|
|
+ Map<Object, List<Map<String, Object>>> groupedData =
|
|
|
+ configList.stream().collect(Collectors.groupingBy(item -> item.get("zone_name")));
|
|
|
+ final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
|
|
|
+
|
|
|
+ //TODO 外层循环组织机构
|
|
|
+ for (Object key:groupedData.keySet()){
|
|
|
+ try{
|
|
|
+ new Thread(() -> {
|
|
|
+ //TODO 根据当前配置信息item 查询远通数据中的历史数据
|
|
|
+ //TODO 首先查询当前水厂设备的从2023-11-01之后到得到数据
|
|
|
+ LocalDateTime startDateTime = LocalDateTime.parse(startFindTime,DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
|
|
|
+
|
|
|
+ //TODO 需计算的循环天数
|
|
|
+ Long days = 0L;
|
|
|
+ days = Duration.between(startDateTime, LocalDateTime.now()).toDays();
|
|
|
+ days+=1;
|
|
|
+
|
|
|
+ //TODO 此循环天数每一天所查的是所有设备每小时数据合
|
|
|
+ final CountDownLatch latch2 = new CountDownLatch(days.intValue());
|
|
|
+ for(Long k = 0L;k<days;k++) {
|
|
|
+ LocalDateTime newStartDateTime = startDateTime.minusDays(-k.intValue());
|
|
|
+ String startDate = newStartDateTime.format(formater);
|
|
|
+ String endDate = newStartDateTime.minusDays(-1).format(formater);
|
|
|
+ try {
|
|
|
+ new Thread(() -> {
|
|
|
+
|
|
|
+ List<Map<String, Object>> deviceList = groupedData.get(key);
|
|
|
+
|
|
|
+ //TODO 循环获取该天该水厂每个设备数据
|
|
|
+
|
|
|
+ //TODO 查询当前天日期内每小时的设备数据
|
|
|
+ for (int i = 0; i < 24; i++) {
|
|
|
+ String startTime = newStartDateTime.withHour(i).format(formater);
|
|
|
+ String endTime = newStartDateTime.minusHours(-(i + 1)).withMinute(0).withSecond(0).format(formater);
|
|
|
+
|
|
|
+ Map<String, Object> recordAllEntity = new HashMap<>();//需要添加的实体数据
|
|
|
+ recordAllEntity.put("zone_name", key.toString());//水厂
|
|
|
+ recordAllEntity.put("time", endTime);//采集时间(小时的最后时间)
|
|
|
+ recordAllEntity.put("value", null);
|
|
|
+ recordAllEntity.put("value_tag", "water");
|
|
|
+ recordAllEntity.put("collcation_tag_array", JSONObject.toJSON(deviceList));
|
|
|
+ Double value = null;
|
|
|
+ //TODO 此循环计算该小时所有设备的用水量
|
|
|
+ for (Map<String, Object> item : deviceList) {
|
|
|
+ // 定义字符串日期时间的格式
|
|
|
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
+ // 解析字符串以创建 LocalDateTime 实例
|
|
|
+ LocalDateTime dateTime = LocalDateTime.parse(endTime, formatter);
|
|
|
+ Integer itemCount = getWaterTapWaterApi().getTabWaterHistoryCount(
|
|
|
+ " WHERE TAG_CODE = '" + item.get("collcation_tag") + "' " +
|
|
|
+ " and QCQUISITION_TIME >= to_date('" + startDate + "', 'yyyy-mm-dd hh24:mi:ss')" +
|
|
|
+ " and QCQUISITION_TIME <= to_date('" + endDate + "', 'yyyy-mm-dd hh24:mi:ss')");
|
|
|
+ if (itemCount != null && itemCount > 0) {
|
|
|
+ List<Map<String, Object>> tapWaterHistoryList = getWaterTapWaterApi().getPageZILAISHUI_HISTORY2(itemCount, 0,
|
|
|
+ " AND TAG_CODE = '" + item.get("collcation_tag") + "' " +
|
|
|
+ " and QCQUISITION_TIME >= to_date('" + startTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
|
|
|
+ " and QCQUISITION_TIME <= to_date('" + endTime + "', 'yyyy-mm-dd hh24:mi:ss')" +
|
|
|
+ " order by QCQUISITION_TIME");
|
|
|
+ //TODO tapWaterHistoryList此集合为该设备该小时内的数据,取第一条和最后一条的相差值作为用水量
|
|
|
+ if (!CollectionUtils.isEmpty(tapWaterHistoryList) && tapWaterHistoryList.size() > 1) {
|
|
|
+ Double firstValue = tapWaterHistoryList.get(0).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get(0).get("val").toString()) : null;
|
|
|
+ Double lastValue = tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("VAL") != null ? Double.valueOf(tapWaterHistoryList.get((tapWaterHistoryList.size() - 1)).get("val").toString()) : null;
|
|
|
+ if (firstValue != null && lastValue != null) {
|
|
|
+ //到此处是该小时一个设备的用水量已加上
|
|
|
+ if (value == null) {
|
|
|
+ value = 0.00;
|
|
|
+ }
|
|
|
+ value += Math.abs(lastValue - firstValue);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }else {
|
|
|
+ //TODO 查不到就说明采集平台数据库没有,就从infulxdb iot查询数据
|
|
|
+ QueryResult queryResult = KprJiangjinWaterBizfun.infulxJiangjinDbUtil.queryWithCondition("WaterMeter",
|
|
|
+ " dev_id='"+item.get("device_code")+"' " +
|
|
|
+ " and time >= '"+startTime+"' and time <='"+endTime+"'");
|
|
|
+ if(queryResult.hasError()) {
|
|
|
+ log.error(String.format("Batch" +
|
|
|
+ " queryIot ERROR:{%s} "
|
|
|
+ , "数据未查询到"));
|
|
|
+ }else{
|
|
|
+ for (QueryResult.Result result : queryResult.getResults()) {
|
|
|
+ if (result.getSeries() != null) {
|
|
|
+ for (QueryResult.Series series : result.getSeries()) {
|
|
|
+ List<String> columns = series.getColumns();
|
|
|
+ int flowTotalPosIndex = columns.indexOf("flow_total_pos");
|
|
|
+ if (flowTotalPosIndex != -1) {
|
|
|
+ Double firstValue = findFirstNonNullValue(series.getValues(), flowTotalPosIndex);
|
|
|
+ Double lastValue = findLastNonNullValue(series.getValues(), flowTotalPosIndex);
|
|
|
+ if (firstValue != null && lastValue != null) {
|
|
|
+ //到此处是该小时一个设备的用水量已加上
|
|
|
+ if (value == null) {
|
|
|
+ value = 0.00;
|
|
|
+ }
|
|
|
+ value += Math.abs(lastValue - firstValue);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ System.out.println("'flow_total_pos' column not found in the series.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+// System.out.println("No series found in the result.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ recordAllEntity.put("value", value);
|
|
|
+ List<Map<String, Object>> queryWaterRecord = getWaterTapWaterApi().getWaterCollectionRecordAllList(1, 0,
|
|
|
+ " WHERE zone_name = '" + recordAllEntity.get("zone_name")
|
|
|
+ + "' AND time = '" + recordAllEntity.get("time") + "' AND value_tag = '" + recordAllEntity.get("value_tag") + "'");
|
|
|
+ if (CollectionUtils.isEmpty(queryWaterRecord)) {
|
|
|
+ //TODO 说明不存在,进行插入
|
|
|
+ if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
|
|
|
+ int insertCode = getWaterTapWaterApi().insertWaterCollectionRecordAll(" (" +
|
|
|
+ "'" + recordAllEntity.get("zone_name") + "'," +
|
|
|
+ "'" + recordAllEntity.get("time") + "'," +
|
|
|
+ "'" + recordAllEntity.get("value") + "'," +
|
|
|
+ "'" + recordAllEntity.get("value_tag") + "'," +
|
|
|
+ "'" + recordAllEntity.get("collcation_tag_array") + "'" +
|
|
|
+ ") ");
|
|
|
+ if (insertCode < 0) {
|
|
|
+ log.error(String.format("Batch initTapWaterDataThread 未成功:{%s} ",
|
|
|
+ JSONObject.toJSON(recordAllEntity)));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }else{
|
|
|
+ //TODO 说明存在,进行修改
|
|
|
+ if(!ObjectUtils.isEmpty(recordAllEntity.get("value"))) {
|
|
|
+ int updateCode = getWaterTapWaterApi().updateWaterCollectionRecordAll(String.valueOf(value)," WHERE " +
|
|
|
+ "(" +
|
|
|
+ " zone_name = '" + recordAllEntity.get("zone_name") + "' and" +
|
|
|
+ " \"time\" = '" + recordAllEntity.get("time") + "' and" +
|
|
|
+ " value_tag = '" + recordAllEntity.get("value_tag") + "'" +
|
|
|
+ ") ");
|
|
|
+ if (updateCode < 0) {
|
|
|
+ log.error(String.format("Batch updateWaterCollectionRecordAll 未成功:{%s} ",
|
|
|
+ JSONObject.toJSON(recordAllEntity)));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ latch2.countDown();
|
|
|
+
|
|
|
+ }).start();
|
|
|
+ }catch(Exception ex){
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ latch2.await();
|
|
|
+ }catch(Exception ex){
|
|
|
+
|
|
|
+ }
|
|
|
+ latch.countDown();
|
|
|
+
|
|
|
+ }).start();
|
|
|
+ }catch(Exception ex){
|
|
|
+ log.error(String.format("Batch" +
|
|
|
+ " initWaterReacordAll ERROR:{%s} "
|
|
|
+ , ex.getLocalizedMessage()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ latch.await();
|
|
|
+ System.out.println("计算小时用水量检查机制("+startFindTime+")结束时间:"+TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
|
|
|
+ }
|
|
|
+ }catch(Exception ex){
|
|
|
+ log.error(String.format("Batch initWaterCollecationReacordAll ERROR:{%s} "
|
|
|
+ , ex.getLocalizedMessage()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static Double findFirstNonNullValue(List<List<Object>> values, int index) {
|
|
|
+ for (List<Object> row : values) {
|
|
|
+ Object value = row.get(index);
|
|
|
+ if (value != null) {
|
|
|
+ return Double.valueOf(value.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static Double findLastNonNullValue(List<List<Object>> values, int index) {
|
|
|
+ for (int i = values.size() - 1; i >= 0; i--) {
|
|
|
+ Object value = values.get(i).get(index);
|
|
|
+ if (value != null) {
|
|
|
+ return Double.valueOf(value.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ static JSONObject pumpObj = JSONObject.parseObject("{\n" +
|
|
|
+ "\t\"UZD2A9DF3794E400878JA\": {\n" +
|
|
|
+ "\t\t\"outRoom\": {\n" +
|
|
|
+ "\t\t\t\"pumps\": [{\n" +
|
|
|
+ "\t\t\t\t\t\"label\": \"1\",\n" +
|
|
|
+ "\t\t\t\t\t\"status\": 1,\n" +
|
|
|
+ "\t\t\t\t\t\"forecastStatus\": 0\n" +
|
|
|
+ "\t\t\t\t},\n" +
|
|
|
+ "\t\t\t\t{\n" +
|
|
|
+ "\t\t\t\t\t\"label\": \"2\",\n" +
|
|
|
+ "\t\t\t\t\t\"status\": 0,\n" +
|
|
|
+ "\t\t\t\t\t\"forecastStatus\": 0\n" +
|
|
|
+ "\t\t\t\t},\n" +
|
|
|
+ "\t\t\t\t{\n" +
|
|
|
+ "\t\t\t\t\t\"label\": \"3\",\n" +
|
|
|
+ "\t\t\t\t\t\"status\": 0,\n" +
|
|
|
+ "\t\t\t\t\t\"forecastStatus\": 0\n" +
|
|
|
+ "\t\t\t\t},\n" +
|
|
|
+ "\t\t\t\t{\n" +
|
|
|
+ "\t\t\t\t\t\"label\": \"4\",\n" +
|
|
|
+ "\t\t\t\t\t\"status\": 1,\n" +
|
|
|
+ "\t\t\t\t\t\"forecastStatus\": 1\n" +
|
|
|
+ "\t\t\t\t}\n" +
|
|
|
+ "\t\t\t]\n" +
|
|
|
+ "\t\t}\n" +
|
|
|
+ "\t},\n" +
|
|
|
+ "\t\"UZD2A9DF38A304002A0M1\": {\n" +
|
|
|
+ "\t\t\"outRoom\": {\n" +
|
|
|
+ "\t\t\t\"pumps\": [{\n" +
|
|
|
+ "\t\t\t\t\t\"label\": \"1\",\n" +
|
|
|
+ "\t\t\t\t\t\"status\": 1,\n" +
|
|
|
+ "\t\t\t\t\t\"forecastStatus\": 1\n" +
|
|
|
+ "\t\t\t\t},\n" +
|
|
|
+ "\t\t\t\t{\n" +
|
|
|
+ "\t\t\t\t\t\"label\": \"2\",\n" +
|
|
|
+ "\t\t\t\t\t\"status\": 0,\n" +
|
|
|
+ "\t\t\t\t\t\"forecastStatus\": 0\n" +
|
|
|
+ "\t\t\t\t},\n" +
|
|
|
+ "\t\t\t\t{\n" +
|
|
|
+ "\t\t\t\t\t\"label\": \"3\",\n" +
|
|
|
+ "\t\t\t\t\t\"status\": 0,\n" +
|
|
|
+ "\t\t\t\t\t\"forecastStatus\": 1\n" +
|
|
|
+ "\t\t\t\t},\n" +
|
|
|
+ "\t\t\t\t{\n" +
|
|
|
+ "\t\t\t\t\t\"label\": \"4\",\n" +
|
|
|
+ "\t\t\t\t\t\"status\": 0,\n" +
|
|
|
+ "\t\t\t\t\t\"forecastStatus\": 0\n" +
|
|
|
+ "\t\t\t\t}\n" +
|
|
|
+ "\t\t\t]\n" +
|
|
|
+ "\t\t}\n" +
|
|
|
+ "\t},\n" +
|
|
|
+ "\t\"UZD2A9DF39435800149VR\": {\n" +
|
|
|
+ "\t\t\"outRoom\": {\n" +
|
|
|
+ "\t\t\t\"pumps\": [{\n" +
|
|
|
+ "\t\t\t\t\t\"label\": \"1\",\n" +
|
|
|
+ "\t\t\t\t\t\"status\": 1,\n" +
|
|
|
+ "\t\t\t\t\t\"forecastStatus\": 1\n" +
|
|
|
+ "\t\t\t\t},\n" +
|
|
|
+ "\t\t\t\t{\n" +
|
|
|
+ "\t\t\t\t\t\"label\": \"2\",\n" +
|
|
|
+ "\t\t\t\t\t\"status\": 0,\n" +
|
|
|
+ "\t\t\t\t\t\"forecastStatus\": 1\n" +
|
|
|
+ "\t\t\t\t},\n" +
|
|
|
+ "\t\t\t\t{\n" +
|
|
|
+ "\t\t\t\t\t\"label\": \"3\",\n" +
|
|
|
+ "\t\t\t\t\t\"status\": 1,\n" +
|
|
|
+ "\t\t\t\t\t\"forecastStatus\": 1\n" +
|
|
|
+ "\t\t\t\t}\n" +
|
|
|
+ "\t\t\t]\n" +
|
|
|
+ "\t\t}\n" +
|
|
|
+ "\t}\n" +
|
|
|
+ "}");
|
|
|
+
|
|
|
+ //TODO 定时任务 定时添加小时预测数据 转变为之做假泵数据
|
|
|
+ public static void insertYuceHourDataScheduled(String monthNow){
|
|
|
+ System.out.println("添加预测小时数据正在进行"+ TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
|
|
|
+ try {
|
|
|
+ //先获取配置项 根据配置项来进行逻辑添加
|
|
|
+ List<Map<String, Object>> configRes = getWaterTapWaterApi().getWaterYuceConfig(false, 0, 0, "");
|
|
|
+ if(!CollectionUtils.isEmpty(configRes)){
|
|
|
+ Map<String,Object> config = configRes.get(0);
|
|
|
+ Integer isMonth = Integer.valueOf(config.get("is_month").toString());//是否预测指定月份 0 是 1否(判定是否走自添加逻辑)
|
|
|
+ String month = config.get("month").toString();//指定预测年月 yyyy-mm
|
|
|
+ Double randomCode = Double.valueOf(config.get("random_code").toString());//上下随机石百分比范围
|
|
|
+ if(isMonth==0){
|
|
|
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
|
|
+ String yearStr = month.split("-")[0];
|
|
|
+ String monthStr = month.split("-")[1];
|
|
|
+ if(!StringUtils.isEmpty(monthNow)){
|
|
|
+ yearStr = monthNow.split("-")[0];
|
|
|
+ monthStr = monthNow.split("-")[1];
|
|
|
+ }
|
|
|
+ LocalDateTime startDate = LocalDateTime.now().withYear(Integer.valueOf(yearStr))
|
|
|
+ .withMonth(Integer.valueOf(monthStr)).withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0).withNano(0);
|
|
|
+ LocalDateTime endDate = startDate.with(TemporalAdjusters.lastDayOfMonth());
|
|
|
+ //TODO 总体逻辑: 先查询指定月上一年的每日平均值,再按平均值的上下啊百分比去插入指定的浮动百分比数据
|
|
|
+ List<Map<String, Object>> configList = getWaterTapWaterApi().getWaterCollectionConfigList(null);
|
|
|
+ List<Map<String,Object>> orgConfig = getWaterTapWaterApi().getOrgConfig(false,0,0,"");
|
|
|
+ if (!CollectionUtils.isEmpty(configList)&&!CollectionUtils.isEmpty(orgConfig)) {
|
|
|
+ //TODO 按照组织机构分组
|
|
|
+ Map<Object, List<Map<String, Object>>> groupedData =
|
|
|
+ configList.stream().collect(Collectors.groupingBy(item -> item.get("zone_name")));
|
|
|
+ final CountDownLatch latch = new CountDownLatch(groupedData.keySet().size());
|
|
|
+ for (Object key:groupedData.keySet()){
|
|
|
+ try {
|
|
|
+ new Thread(() -> {
|
|
|
+ //TODO ①计算水厂指定月每日小时平均值
|
|
|
+ List<Map<String,Object>> recordAllRes = getWaterTapWaterApi()
|
|
|
+ .getWaterCollectionRecordAllListAll(" WHERE 1=1 " +
|
|
|
+ " AND zone_name = '"+key.toString()+"'" +
|
|
|
+ " AND \"time\"::timestamp BETWEEN '"+startDate.format(formatter)+"' "+"AND '"+endDate.format(formatter)+"'");
|
|
|
+ if(!CollectionUtils.isEmpty(recordAllRes)){
|
|
|
+ int days = YearMonth.from(startDate).lengthOfMonth();//获取月份天数
|
|
|
+ for (int i = 1;i<=days;i++){
|
|
|
+ LocalDateTime dateNow = startDate.withDayOfMonth(i);//当前循环时间
|
|
|
+ if(dateNow.isAfter(LocalDateTime.now().minusDays(-4))){
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ Double numAll = null;//日总值
|
|
|
+ Double numAge = 0.0;//日小时平均值
|
|
|
+ //TODO 筛选成当前循环时间的集合
|
|
|
+ List<Map<String,Object>> daysRecord = recordAllRes.stream().filter(item ->
|
|
|
+ LocalDateTime.parse(item.get("time").toString(),formatter).toLocalDate().equals(dateNow.toLocalDate()))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ for (Map<String,Object> mapEntity : daysRecord){
|
|
|
+ //TODO 筛选与当前天一样的数据在计算平均值,数据相加除以24小时 (recordAllRes内容为这天每小时的数据)
|
|
|
+ if(numAll == null){
|
|
|
+ numAll = 0.0;
|
|
|
+ }
|
|
|
+ numAll += Double.valueOf(mapEntity.get("value").toString());
|
|
|
+ }
|
|
|
+ if (numAll!=null){
|
|
|
+ numAge = numAll/24;
|
|
|
+ }
|
|
|
+ //TODO ②正式插入或修改日预测数据表
|
|
|
+ Optional<String> orgIdOptional = orgConfig.stream()
|
|
|
+ .filter(item -> key.toString().replace("(", "(").replace(")", ")")
|
|
|
+ .equals(item.get("zone_name").toString().replace("(", "(").replace(")", ")")))
|
|
|
+ .map(item -> item.get("zone_id").toString())
|
|
|
+ .findFirst();
|
|
|
+ String orgId = orgIdOptional.orElse(null);
|
|
|
+ if (!StringUtils.isEmpty(orgId)) {
|
|
|
+ //预测比对时间如果不是今年,则变为今年的时间
|
|
|
+ //TODO 取泵号
|
|
|
+ JSONObject pumpItem = (JSONObject)pumpObj.get(orgId);
|
|
|
+ JSONArray pumpArray = ((JSONObject)pumpItem.get("outRoom")).getJSONArray("pumps");
|
|
|
+ //TODO 一天24小时
|
|
|
+ for (int j = 0;j<24;j++) {
|
|
|
+ //TODO 数据库操作 A 调用对应时间预测接口 B 在修改实际值
|
|
|
+ DateTimeFormatter formatter2 = DateTimeFormatter.ofPattern("yyyy-MM-dd");
|
|
|
+ DateTimeFormatter formatter3 = DateTimeFormatter.ofPattern("HH:mm:ss");
|
|
|
+ String dateStr = dateNow.withYear(LocalDateTime.now().getYear()).toLocalDate().format(formatter2);
|
|
|
+ // 创建一个LocalTime实例,设置为指定小时,分钟和秒数为0
|
|
|
+ LocalTime time = LocalTime.of(j, 0, 0);
|
|
|
+ // 创建一个DateTimeFormatter用于格式化时间
|
|
|
+ String timeStr = time.format(formatter3);
|
|
|
+
|
|
|
+// //TODO A
|
|
|
+// JP3TPHour tpHour = new JP3TPHour();
|
|
|
+// tpHour.setOrgId(orgId);
|
|
|
+// tpHour.setDate(dateStr);
|
|
|
+// try {
|
|
|
+// ResponseRes<String> tpRes = ServiceMgrProxy.getInstance()
|
|
|
+// .applyCloud3tpServiceApi().dayHourPredictSupply(tpHour);
|
|
|
+// if(ResponseCode.RESULT_BAD.toStrCode().equals(tpRes.getRescode())){
|
|
|
+// LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
|
|
|
+// , mStrClassName
|
|
|
+// , mStrClassName
|
|
|
+// , String.format("预测小时数据"+dateStr+"调用失败 ERROR:{%s} ",
|
|
|
+// tpRes.getResdata()));
|
|
|
+// break;
|
|
|
+// }
|
|
|
+// }catch(Exception ex){
|
|
|
+// LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
|
|
|
+// , mStrClassName
|
|
|
+// , mStrClassName
|
|
|
+// , String.format("预测小时数据"+dateStr+"调用异常 ERROR:{%s} ",
|
|
|
+// ex.getLocalizedMessage()));
|
|
|
+// }
|
|
|
+
|
|
|
+ //TODO B
|
|
|
+ Map<String, Object> recordAllEntity = new LinkedHashMap<>();//需要添加的实体数据,此处要用有序map
|
|
|
+
|
|
|
+ recordAllEntity.put("date", dateStr);
|
|
|
+ recordAllEntity.put("hour", timeStr);
|
|
|
+// recordAllEntity.put("HourForecastWaterWithdrawals", null);
|
|
|
+ //TODO 随机数逻辑
|
|
|
+ int randomUpOrDown = ThreadLocalRandom.current().nextInt(2);//随机向上或者向下 0表示向下 1表示向上
|
|
|
+ Double randomWater = randomUpOrDown == 0 ?
|
|
|
+ numAge - (numAge * (randomCode / 100)) :
|
|
|
+ numAge + (numAge * (randomCode / 100));
|
|
|
+// recordAllEntity.put("HourForecastActualWaterSupply", randomWater);
|
|
|
+// recordAllEntity.put("HourActualWaterWithdrawals", "");
|
|
|
+ //实际从该时间查询结果中得出
|
|
|
+ List<Map<String,Object>> newRecordAllRes = getWaterTapWaterApi()
|
|
|
+ .getWaterCollectionRecordAllListAll(" WHERE 1=1 " +
|
|
|
+ " AND zone_name = '"+key.toString()+"'" +
|
|
|
+ " AND \"time\" = '"+dateNow.withYear(LocalDateTime.now().getYear()).withHour(j).format(formatter)+"' ");
|
|
|
+ String hourActualWaterSupply = null;
|
|
|
+ if(!CollectionUtils.isEmpty(newRecordAllRes)){
|
|
|
+ hourActualWaterSupply = newRecordAllRes.get(0).get("value").toString();
|
|
|
+ }
|
|
|
+ recordAllEntity.put("hour_actual_water_supply",hourActualWaterSupply);
|
|
|
+ recordAllEntity.put("last_modify_time", LocalDateTime.now().format(formatter));
|
|
|
+ // 如果找到了zone_id,就将其赋值给orgId,否则orgId为null
|
|
|
+ recordAllEntity.put("zone_id", orgId);
|
|
|
+
|
|
|
+// //TODO 添加or修改
|
|
|
+// Integer insertRes = getWaterTapApi().insertOrUpdateTbmHourWater(recordAllEntity);
|
|
|
+// if (insertRes < 1) {
|
|
|
+// LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR
|
|
|
+// , mStrClassName
|
|
|
+// , mStrClassName
|
|
|
+// , String.format("Batch insertYuceDataScheduled ERROR:{%s} ",
|
|
|
+// "新增或修改失败"));
|
|
|
+// }
|
|
|
+
|
|
|
+ //TODO ③ 泵集合数据添加或修改
|
|
|
+ // 生成[1, pumpArray.size()]范围内的随机数
|
|
|
+ int randomNum = (int) (Math.random() * (pumpArray.size() - 1)) + 1;
|
|
|
+ List<Double> splitRandom = splitValueRandomlyWithZeros(Double.valueOf((hourActualWaterSupply==null
|
|
|
+ ?"0.0":hourActualWaterSupply)),pumpArray.size(),randomNum);//实际值随机数集合
|
|
|
+ List<Double> splitRandomYuce = splitValueRandomlyWithZeros((randomWater==null
|
|
|
+ ?0.0:randomWater),pumpArray.size(),randomNum);//预测值随机数集合
|
|
|
+ //泵集合
|
|
|
+ int kIndex = 0;
|
|
|
+ for (Object obj:pumpArray){
|
|
|
+ JSONObject itemObj = (JSONObject)obj;
|
|
|
+ Map<String,Object> map = new LinkedHashMap<>();
|
|
|
+ //TODO 泵状态根据值是否为0设置
|
|
|
+ map.put("Date", dateNow.withYear(LocalDateTime.now().getYear()).toLocalDate()
|
|
|
+ .format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
|
|
|
+ // 创建一个LocalTime实例,设置为指定小时,分钟和秒数为0
|
|
|
+ // 创建一个DateTimeFormatter用于格式化时间
|
|
|
+ map.put("Hour", time.format(formatter3));
|
|
|
+ map.put("PumpID",itemObj.get("label"));
|
|
|
+
|
|
|
+ if(splitRandom.get(kIndex)==0){
|
|
|
+ //TODO 说明泵实际值为0,状态为关
|
|
|
+ map.put("PumpStatus",0);
|
|
|
+ }else{
|
|
|
+ map.put("PumpStatus",1);
|
|
|
+ }
|
|
|
+ map.put("HourForecastActualWaterSupply",splitRandomYuce.get(kIndex));//预测泵的供水量
|
|
|
+ map.put("PumpWater",splitRandom.get(kIndex));//实际泵的供水量
|
|
|
+ if(splitRandom.get(kIndex)==0){
|
|
|
+ //TODO 说明泵实际值为0,状态为关
|
|
|
+ map.put("RealPumpStatus",0);
|
|
|
+ }else{
|
|
|
+ map.put("RealPumpStatus",1);
|
|
|
+ }
|
|
|
+
|
|
|
+ map.put("PumpEnergy",splitRandomYuce.get(kIndex)/1000*180);
|
|
|
+ map.put("RealPumpEnergy",splitRandom.get(kIndex)/1000*200);
|
|
|
+ map.put("LastModifyTime", LocalDateTime.now().format(formatter));
|
|
|
+ map.put("orgId", orgId);
|
|
|
+ //TODO 添加or修改
|
|
|
+ Integer insertRes2 = getWaterTapWaterApi().insertOrUpdateTbmHourwaterWatersupply(map);
|
|
|
+ if (insertRes2 < 1) {
|
|
|
+ log.error(String.format("Batch insertOrUpdateTbmHourwaterWatersupply ERROR:{%s} ",
|
|
|
+ "新增或修改失败"));
|
|
|
+ }
|
|
|
+ kIndex++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ latch.countDown();
|
|
|
+
|
|
|
+ }).start();
|
|
|
+ }catch(Exception ex){
|
|
|
+ log.error(String.format("Batch insertYuceDataScheduled ERROR:{%s} ",
|
|
|
+ ex.getLocalizedMessage()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ try{latch.await();}catch(Exception ex){}
|
|
|
+ }
|
|
|
+ System.out.println("添加预测泵小时数据结束"+ TimeTool.convertUTC2DateStr(TimeTool.getCurMsUTC(),TimeTool.TIMESTAMP_FORMAT));
|
|
|
+ }else{
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }catch(Exception ex){
|
|
|
+ ex.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //生成指定份额的随机数,并且带有0值
|
|
|
+ public static List<Double> splitValueRandomlyWithZeros(double totalValue, int parts, int zeroCount) {
|
|
|
+ if (zeroCount >= parts) {
|
|
|
+ throw new IllegalArgumentException("Zero count must be less than the number of parts.");
|
|
|
+ }
|
|
|
+
|
|
|
+ Random random = new Random();
|
|
|
+ List<Double> points = new ArrayList<>();
|
|
|
+ int nonZeroParts = parts - zeroCount;
|
|
|
+
|
|
|
+ // 添加初始点
|
|
|
+ points.add(0.0);
|
|
|
+
|
|
|
+ // 生成分割点
|
|
|
+ for (int i = 0; i < nonZeroParts - 1; i++) {
|
|
|
+ points.add(random.nextDouble() * totalValue);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 添加终点
|
|
|
+ points.add(totalValue);
|
|
|
+
|
|
|
+ // 排序
|
|
|
+ Collections.sort(points);
|
|
|
+
|
|
|
+ // 计算每份的大小
|
|
|
+ List<Double> splits = new ArrayList<>();
|
|
|
+ for (int i = 1; i < points.size(); i++) {
|
|
|
+ splits.add(points.get(i) - points.get(i - 1));
|
|
|
+ }
|
|
|
+
|
|
|
+ // 添加0值份额
|
|
|
+ for (int i = 0; i < zeroCount; i++) {
|
|
|
+ splits.add(0.0);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 打乱列表,以确保0值分布均匀
|
|
|
+ Collections.shuffle(splits);
|
|
|
+
|
|
|
+ return splits;
|
|
|
+ }
|
|
|
+}
|