Просмотр исходного кода

晚上站点时段数据小时分析的部分逻辑

andyliu недель назад: 3
Родитель
Сommit
a7cdde36b4

+ 41 - 80
src/main/java/com/shkpr/service/warncore/bizhandler/SiteDataHandler.java

@@ -4,18 +4,16 @@ import com.global.base.tools.CastUtil;
 import com.global.base.tools.FastJsonUtil;
 import com.shkpr.service.warncore.commtools.CommTool;
 import com.shkpr.service.warncore.commtools.TimeTool;
-import com.shkpr.service.warncore.components.locks.CountDownLatchEx;
-import com.shkpr.service.warncore.components.locks.OrdWarnPlanLockMgr;
 import com.shkpr.service.warncore.constants.CommFieldStatus;
 import com.shkpr.service.warncore.constants.FrequencyUnit;
 import com.shkpr.service.warncore.dbdao.DBMgrProxy;
 import com.shkpr.service.warncore.dbdao.services.intef.OrdWarnPlanInfoDBService;
 import com.shkpr.service.warncore.dbdao.services.intef.OrdWarnPlanRulesDBService;
 import com.shkpr.service.warncore.dbdao.tables.OrdWarnPlanInfoTable;
-import com.shkpr.service.warncore.dbdao.tables.OrdWarnPlanRulesTable;
 import com.shkpr.service.warncore.dto.*;
 import com.shkpr.service.warncore.jsonbean.JPGetRegionData;
-import com.shkpr.service.warncore.jsonbean.JPStrIdsSK;
+import com.shkpr.service.warncore.jsonbean.JPGetRegionReport;
+import com.shkpr.service.warncore.jsonbean.JPGetRegionReportItem;
 import com.shkpr.service.warncore.services.ServiceMgrProxy;
 import org.springframework.util.StringUtils;
 
@@ -24,7 +22,7 @@ import java.util.*;
 /**
  * 站点数据处理
  */
-public class SiteDataHandler {
+public class SiteDataWarnHandler {
     private static OrdWarnPlanInfoDBService getWarnPlanInfoDBService(){
         return DBMgrProxy.getInstance().applyXXXApi(OrdWarnPlanInfoDBService.class);
     }
@@ -33,67 +31,7 @@ public class SiteDataHandler {
         return DBMgrProxy.getInstance().applyXXXApi(OrdWarnPlanRulesDBService.class);
     }
 
-    //先获取方案信息,然后规则列表,再加锁
-    public void handlerData(String planId){
-        ResponseCode code = ResponseCode.RESULT_NORMAL;
-        String step = "Start Get Plan Info";
-        List<OrdWarnPlanRules> rules = null;
-        OrdWarnPlanDetail planDetail = null;
-        do {
-            Map<String, Object> infoDb = getWarnPlanInfoDBService().fastGetFiled("", "", planId);
-            if (infoDb == null || infoDb.size() <= 0){
-                code = ResponseCode.RESULT_BAD;
-                step = "Get Warn Plan Info Failed";
-                break;
-            }
-
-            planDetail = FastJsonUtil.map2Obj(infoDb, OrdWarnPlanDetail.class,true);
-            if (planDetail == null || planDetail.getStatus() != CommFieldStatus.ENABLE){
-                code = ResponseCode.RESULT_BAD;
-                step = "Warn Plan Info Invalid";
-                break;
-            }
-            infoDb.clear();
-            List<Map<String, Object>> rulesDb = getWarnPlanInfoDBService().batchQueryWithsEx("", ""
-                    , new HashMap<String, Object>(){{put(OrdWarnPlanRulesTable.R_INFO.PLAN_ID, planId);}}
-                    , null, OrdWarnPlanRulesTable.R_INFO.DEFAULT_ORDER, "");
-            if (CommTool.listSize(rulesDb) <= 0){
-                code = ResponseCode.RESULT_BAD;
-                step = "Warn Plan Rules Invalid";
-                break;
-            }
-
-            rules = OrdWarnPlanRules.batchFromJsonEx(rulesDb);
-            rulesDb.clear();
-        }while (false);
-
-        if (code == ResponseCode.RESULT_NORMAL){
-            CountDownLatchEx latchEx = null;
-            int nRetry = 3;
-            do {
-                try {
-                    latchEx = OrdWarnPlanLockMgr.tryLatchForPlan(planId, 2000);
-                }catch (Exception e){
-                    latchEx = null;
-                }
-
-                if (latchEx != null){
-                    handlerDataFun(planDetail, rules);
-                    break;
-                }
-            }while ((--nRetry) > 0);
-
-            if (latchEx != null){
-                latchEx.countDown();
-            }else{
-                step = "Request Lock For Analysis Failed";
-                code = ResponseCode.RESULT_BAD;
-                //handlerAnalysisForPlan(planDetail, siteDataResult);
-            }
-        }
-    }
-
-    private ResponseCode handlerDataFun(OrdWarnPlanDetail planDetail, List<OrdWarnPlanRules> rules){
+    public static ResponseCode handlerDataFun(OrdWarnPlanDetail planDetail, List<OrdWarnPlanRules> rules){
         ResponseCode code = ResponseCode.RESULT_NORMAL;
         String step = "Start to pre analysis.";
         if (getWarnPlanInfoDBService().existsLineEx(OrdWarnPlanInfoTable.R_INFO.TABLE
@@ -195,13 +133,13 @@ public class SiteDataHandler {
                 //并赋值给thisTempStep
             }
 
-            if (planDetail.getItemKey().endsWith("miss")){//数据缺失
-
+            if (planDetail.getItemKey().endsWith("empty")){//无数据
+                return analysisDataForReportByHour(planDetail, rules, thisStepTempRes);
             }else if (planDetail.getItemKey().endsWith("swo")){//小时水量
-                return analysisDataForWO(planDetail, rules, thisStepTempRes);
+                return analysisDataForReportByHour(planDetail, rules, thisStepTempRes);
             }else {
                 if ("set".equals(planDetail.getDoPickWay())){//时段曲线
-                    return analysisDataBySet(planDetail, rules, thisStepTempRes);
+                    return analysisDataForRealBySet(planDetail, rules, thisStepTempRes);
                 }
             }
         }while (false);
@@ -214,8 +152,8 @@ public class SiteDataHandler {
      * @param condition
      * @return -1: 未知错误;0 -- 未匹配;1 -- 已匹配
      */
-    private int matchedToRuleItem(String originValue, OrdWarnRuleCondition condition){
-        if (StringUtils.isEmpty(originValue))
+    private static int matchedToRuleItem(String originValue, OrdWarnRuleCondition condition){
+        if (StringUtils.isEmpty(originValue))//指定字段为空串,可表示未匹配
             return 0;
         int matched = -1;
         try {
@@ -248,8 +186,8 @@ public class SiteDataHandler {
         return matched;
     }
 
-    //处理时段样本数据
-    private ResponseCode analysisDataBySet(OrdWarnPlanDetail planDetail, List<OrdWarnPlanRules> rules, OrdWarnPlanTempStep thisTempStep){
+    //处理站点的实时样本曲线
+    private static ResponseCode analysisDataForRealBySet(OrdWarnPlanDetail planDetail, List<OrdWarnPlanRules> rules, OrdWarnPlanTempStep thisTempStep){
         ResponseCode code = ResponseCode.RESULT_NORMAL;
         String step = "Start to deep analysis";
         long curTime = TimeTool.getCurMsUTC();
@@ -291,7 +229,7 @@ public class SiteDataHandler {
             for (SiteSampleGroupItem sampleItem:siteSampleData.getData().get(0).getRecords()){
                 long sampleRecordTm = sampleItem.getTime();
                 if (sampleRecordTm <= 0L || CommTool.listSize(sampleItem.getFields()) <= 0)
-                    continue;
+                    continue;//数据条目缺失可认为,还在等待录入则跳过
                 if (planDetail.getDayTriggerUpperTimes() > 0
                         && thisTempStep.getDayTriggerTimes() >= planDetail.getDayTriggerUpperTimes()){
                     step = String.format("Event arrive at max limit");
@@ -401,8 +339,8 @@ public class SiteDataHandler {
         return code;
     }
 
-    //处理时段的用水量
-    private ResponseCode analysisDataForWO(OrdWarnPlanDetail planDetail, List<OrdWarnPlanRules> rules, OrdWarnPlanTempStep thisTempStep){
+    //处理站点时段的小时统计数据
+    private static ResponseCode analysisDataForReportByHour(OrdWarnPlanDetail planDetail, List<OrdWarnPlanRules> rules, OrdWarnPlanTempStep thisTempStep){
         ResponseCode code = ResponseCode.RESULT_NORMAL;
         String step = "Start to deep analysis";
         long curTime = TimeTool.getCurMsUTC();
@@ -417,6 +355,30 @@ public class SiteDataHandler {
         }while (tmpStepToStep < thisTempStep.getQueryEndUTC());
 
         do {
+            String[] storeTagAndBackTag = rules.get(0).getConditions().get(0).getFormat().split("@");
+            if (storeTagAndBackTag == null || storeTagAndBackTag.length < 2){
+                step = String.format("Format param valid in rules");
+                code = ResponseCode.RESULT_BAD;
+                break;
+            }
+            JPGetRegionReport queryJP = new JPGetRegionReport();
+            queryJP.setData(new ArrayList<>());
+            for (Long beginTm:tagTime){
+                JPGetRegionReportItem item = new JPGetRegionReportItem();
+                item.setUid(siteId);
+                item.setField(storeTagAndBackTag[0]);
+                item.setTag(String.valueOf(beginTm));
+                item.setBeginTime(beginTm);
+                item.setEndTime(beginTm+TimeTool.MS_ONE_HOUR);
+                queryJP.getData().add(item);
+            }
+            queryJP.setTotal(queryJP.getData().size());
+            try {
+                ResponseRes<String> resHttp = ServiceMgrProxy.getInstance().applyDataQeServiceApi().siteRegionReport(queryJP);
+                if (ResponseCode.RESULT_NORMAL.toStrCode().equals(resHttp.getRescode()))
+                    siteStatsData = FastJsonUtil.fromJSONByGson(resHttp.getResdata(), SiteStatsDataResult.class);
+            }catch (Exception e){}
+
             if (siteStatsData == null){
                 step = String.format("Get site stats data failed");
                 code = ResponseCode.RESULT_BAD;
@@ -439,7 +401,7 @@ public class SiteDataHandler {
                 if (sampleRecordTm <= 0L
                         || sampleItem.getCode() != 0
                         || CommTool.listSize(sampleItem.getReport()) <= 0)
-                    continue;
+                    continue;//数据条目缺失可认为,还在等待录入则跳过
                 if (planDetail.getDayTriggerUpperTimes() > 0
                         && thisTempStep.getDayTriggerTimes() >= planDetail.getDayTriggerUpperTimes()){
                     step = String.format("Event arrive at max limit");
@@ -461,7 +423,7 @@ public class SiteDataHandler {
 
                 for (OrdWarnRuleCondition condition:findRule.getConditions()){
                     String originValue = "";
-                    String backTag = condition.getFormat().substring(condition.getFormat().lastIndexOf("@")+1);
+                    String backTag = storeTagAndBackTag[1];
                     for (KeyValue kv:sampleItem.getReport()){
                         if (backTag.equals(kv.getKey())){
                             originValue = kv.getValue();
@@ -580,7 +542,6 @@ public class SiteDataHandler {
                     }
                 }
             }
-
         }while (false);
         return code;
     }

+ 76 - 3
src/main/java/com/shkpr/service/warncore/bizhandler/TimeCheckWarnPlanMgr.java

@@ -3,16 +3,18 @@ package com.shkpr.service.warncore.bizhandler;
 import com.global.base.tools.FastJsonUtil;
 import com.shkpr.service.warncore.commtools.CommTool;
 import com.shkpr.service.warncore.commtools.TimeTool;
+import com.shkpr.service.warncore.components.locks.CountDownLatchEx;
+import com.shkpr.service.warncore.components.locks.OrdWarnPlanLockMgr;
 import com.shkpr.service.warncore.constants.CommFieldStatus;
 import com.shkpr.service.warncore.constants.FrequencyUnit;
 import com.shkpr.service.warncore.constants.OrdWarnObjSrc;
 import com.shkpr.service.warncore.constants.TaskQueueDataTypeEx;
 import com.shkpr.service.warncore.dbdao.DBMgrProxy;
 import com.shkpr.service.warncore.dbdao.services.intef.OrdWarnPlanInfoDBService;
+import com.shkpr.service.warncore.dbdao.services.intef.OrdWarnPlanRulesDBService;
 import com.shkpr.service.warncore.dbdao.tables.OrdWarnPlanInfoTable;
-import com.shkpr.service.warncore.dto.CommTotalMinMax;
-import com.shkpr.service.warncore.dto.OrdWarnPlanCheckBean;
-import com.shkpr.service.warncore.dto.OrdWarnPlanWillDo;
+import com.shkpr.service.warncore.dbdao.tables.OrdWarnPlanRulesTable;
+import com.shkpr.service.warncore.dto.*;
 import com.shkpr.service.warncore.globalmgr.AsyncTaskQueueMgr;
 
 import java.util.*;
@@ -22,6 +24,10 @@ public class TimeCheckWarnPlanMgr {
         return DBMgrProxy.getInstance().applyXXXApi(OrdWarnPlanInfoDBService.class);
     }
 
+    private static OrdWarnPlanRulesDBService getWarnPlanRuleDBService(){
+        return DBMgrProxy.getInstance().applyXXXApi(OrdWarnPlanRulesDBService.class);
+    }
+
     public static void checkWarnPlanByMinute(int curClock, int curMinute){
         long thisDayBegin = TimeTool.getTodayBeginUTC();
         Map<String, Object> mapSel = getThisDBService().totalWillAnalysisCounts(new HashMap<String, Object>(){{
@@ -96,4 +102,71 @@ public class TimeCheckWarnPlanMgr {
             }
         }
     }
+
+    /**
+     * 先获取方案信息,然后获取规则列表,再加锁
+     * @param planWillDo
+     */
+    public void handlerPlanWillDo(OrdWarnPlanWillDo planWillDo){
+        ResponseCode code = ResponseCode.RESULT_NORMAL;
+        String step = "Start Get Plan Info";
+        List<OrdWarnPlanRules> rules = null;
+        OrdWarnPlanDetail planDetail = null;
+        do {
+            Map<String, Object> infoDb = getThisDBService().fastGetFiled("", "", planWillDo.getPlanId());
+            if (infoDb == null || infoDb.size() <= 0){
+                code = ResponseCode.RESULT_BAD;
+                step = "Get Warn Plan Info Failed";
+                break;
+            }
+
+            planDetail = FastJsonUtil.map2Obj(infoDb, OrdWarnPlanDetail.class,true);
+            if (planDetail == null || planDetail.getStatus() != CommFieldStatus.ENABLE){
+                code = ResponseCode.RESULT_BAD;
+                step = "Warn Plan Info Invalid";
+                break;
+            }
+            infoDb.clear();
+            List<Map<String, Object>> rulesDb = getWarnPlanRuleDBService().batchQueryWithsEx("", ""
+                    , new HashMap<String, Object>(){{put(OrdWarnPlanRulesTable.R_INFO.PLAN_ID, planWillDo.getPlanId());}}
+                    , null, OrdWarnPlanRulesTable.R_INFO.DEFAULT_ORDER, "");
+            if (CommTool.listSize(rulesDb) <= 0){
+                code = ResponseCode.RESULT_BAD;
+                step = "Warn Plan Rules Invalid";
+                break;
+            }
+
+            rules = OrdWarnPlanRules.batchFromJsonEx(rulesDb);
+            rulesDb.clear();
+        }while (false);
+
+        if (code == ResponseCode.RESULT_NORMAL){
+            CountDownLatchEx latchEx = null;
+            int nRetry = 3;
+            do {
+                try {
+                    latchEx = OrdWarnPlanLockMgr.tryLatchForPlan(planWillDo.getPlanId(), 2000);
+                }catch (Exception e){
+                    latchEx = null;
+                }
+
+                if (latchEx != null){
+                    if (OrdWarnObjSrc.SITE.equals(planWillDo.getObjSrc())){
+                        SiteDataWarnHandler.handlerDataFun(planDetail, rules);
+                    }else if (OrdWarnObjSrc.ZONE.equals(planWillDo.getObjSrc())){
+                    }else if (OrdWarnObjSrc.ZTYPE.equals(planWillDo.getObjSrc())){
+                    }else if (OrdWarnObjSrc.TANK.equals(planWillDo.getObjSrc())){
+                    }
+                    break;
+                }
+            }while ((--nRetry) > 0);
+
+            if (latchEx != null){
+                latchEx.countDown();
+            }else{
+                step = "Request Lock For Analysis Failed";
+                code = ResponseCode.RESULT_BAD;
+            }
+        }
+    }
 }

+ 1 - 1
src/main/java/com/shkpr/service/warncore/bizhandler/ZoneDataHandler.java

@@ -3,5 +3,5 @@ package com.shkpr.service.warncore.bizhandler;
 /**
  * 分区分析统计数据处理
  */
-public class ZoneDataHandler {
+public class ZoneDataWarnHandler {
 }

+ 21 - 0
src/main/java/com/shkpr/service/warncore/jsonbean/JPGetRegionReport.java

@@ -0,0 +1,21 @@
+package com.shkpr.service.warncore.jsonbean;
+
+import com.global.base.tools.FastJsonUtil;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.List;
+
+@Getter
+@Setter
+public class JPGetRegionReport {
+    private int total = 0;
+    private List<JPGetRegionReportItem> data = null;
+
+    public JPGetRegionReport() {
+    }
+
+    public String toJsonStr(){
+        return FastJsonUtil.toJSON(this);
+    }
+}

+ 17 - 0
src/main/java/com/shkpr/service/warncore/jsonbean/JPGetRegionReportItem.java

@@ -0,0 +1,17 @@
+package com.shkpr.service.warncore.jsonbean;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+public class JPGetRegionReportItem {
+    private String uid = "";
+    private String field = "";
+    private String tag = "";
+    private long beginTime = 0L;
+    private long endTime = 0L;
+
+    public JPGetRegionReportItem() {
+    }
+}

+ 25 - 0
src/main/java/com/shkpr/service/warncore/services/CloudDataQeService.java

@@ -11,6 +11,7 @@ import com.shkpr.service.warncore.dto.ResponseCode;
 import com.shkpr.service.warncore.dto.ResponseRes;
 import com.shkpr.service.warncore.globalcache.GlobalData;
 import com.shkpr.service.warncore.jsonbean.JPGetRegionData;
+import com.shkpr.service.warncore.jsonbean.JPGetRegionReport;
 import com.shkpr.service.warncore.jsonbean.JPStrIdsSK;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
@@ -44,6 +45,7 @@ public class CloudDataQeService {
 
     private String mSiteLastMapDataPath = "";
     private String mSiteRegionDataPath = "";
+    private String mSiteRegionReportPath = "";
     private HttpHeaders headers = null;
 
     @PostConstruct
@@ -54,6 +56,7 @@ public class CloudDataQeService {
             mStrAddress += "/";
         mSiteLastMapDataPath = String.format("%s%s", mStrAddress, "v3/stats/site-data/last-map-show-group");
         mSiteRegionDataPath = String.format("%s%s", mStrAddress, "v3/stats/site-data/regions-group");
+        mSiteRegionReportPath = String.format("%s%s", mStrAddress, "v3/stats/site-data/regions-report-field");
 
         headers = new HttpHeaders();
         headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
@@ -105,4 +108,26 @@ public class CloudDataQeService {
         }
         return resResult;
     }
+
+    @Retryable(value = {RemoteAccessException.class}, maxAttempts = 3, backoff = @Backoff(delay = 200L, multiplier = 1))
+    public ResponseRes<String> siteRegionReport(final JPGetRegionReport jsonParam) throws Exception{
+        ResponseRes<String> resResult = new ResponseRes<String>();
+        resResult.setRescode(ResponseCode.RESULT_BAD.toStrCode());
+        resResult.setResmsg(MSG_FAILED);
+        resResult.setResdata("");
+
+        HttpEntity<String> request = new HttpEntity<>(jsonParam.toJsonStr(), headers);
+        try {
+            ResponseEntity<String> response = restTemplate.postForEntity(mSiteRegionReportPath, request, String.class);
+            if (response.getStatusCode() == HttpStatus.OK){
+                String strBody = response.getBody();
+                return FastJsonUtil.fromJSONByGson(strBody, ResponseRes.class);
+            }
+        }catch (Exception e){
+            LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_WARN, LogFlagBusiType.BUSI_CALL_DATA_QE_AS.toStrValue(), strClassName, String.format("siteRegionReport from CloudDataQeService(url:%s) failed(%s)...", mSiteRegionReportPath, e.getMessage()));
+            //达到maxAttempts次数将返回RemoteAccessException
+            throw new RemoteAccessException("Retry...");
+        }
+        return resResult;
+    }
 }