|
@@ -1,22 +1,24 @@
|
|
|
package com.shkpr.service.alambizplugin.bizservice;
|
|
|
|
|
|
+import com.fasterxml.jackson.core.type.TypeReference;
|
|
|
import com.global.base.log.LogLevelFlag;
|
|
|
import com.global.base.log.LogPrintMgr;
|
|
|
import com.shkpr.service.alambizplugin.apiparam.GisSurveyCheckParams;
|
|
|
import com.shkpr.service.alambizplugin.commproperties.AsyncTaskProperties;
|
|
|
-import com.shkpr.service.alambizplugin.commproperties.GisSurveySystemCheckProperties;
|
|
|
-import com.shkpr.service.alambizplugin.components.GisSurveySystemCheckResultManager;
|
|
|
+import com.shkpr.service.alambizplugin.components.AsyncResultManager;
|
|
|
import com.shkpr.service.alambizplugin.components.GisSurveySystemChecker;
|
|
|
import com.shkpr.service.alambizplugin.constants.CommAsyncStatusEnum;
|
|
|
import com.shkpr.service.alambizplugin.constants.GisSurveyCheckTypeEnum;
|
|
|
import com.shkpr.service.alambizplugin.constants.LogFlagBusiType;
|
|
|
import com.shkpr.service.alambizplugin.dbdao.services.intef.GisSurveyJobInfoService;
|
|
|
import com.shkpr.service.alambizplugin.dbdao.services.intef.GisSurveyProjectInfoService;
|
|
|
+import com.shkpr.service.alambizplugin.dbdao.services.intef.GisSurveySystemCheckDefineService;
|
|
|
+import com.shkpr.service.alambizplugin.dto.CommAsyncCache;
|
|
|
+import com.shkpr.service.alambizplugin.dto.CommAsyncResult;
|
|
|
import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckId;
|
|
|
-import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckResult;
|
|
|
-import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckSubtask;
|
|
|
+import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckResultDetail;
|
|
|
+import org.apache.commons.collections4.CollectionUtils;
|
|
|
import org.springframework.beans.factory.annotation.Qualifier;
|
|
|
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
|
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
import org.springframework.util.concurrent.ListenableFuture;
|
|
@@ -24,10 +26,12 @@ import org.springframework.util.concurrent.ListenableFuture;
|
|
|
import java.time.Duration;
|
|
|
import java.time.Instant;
|
|
|
import java.time.LocalDateTime;
|
|
|
+import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
* 系统检查管理service
|
|
@@ -36,21 +40,21 @@ import java.util.concurrent.ExecutionException;
|
|
|
* @since 1.0.0
|
|
|
*/
|
|
|
@Component
|
|
|
-@EnableConfigurationProperties({GisSurveySystemCheckProperties.class})
|
|
|
public class GisSurveySystemCheckBizService {
|
|
|
/**
|
|
|
- * 任务缓存
|
|
|
+ * 任务缓存{@code <系统检查id,异步任务<异步结果<系统检查key,结果详情>>>}
|
|
|
*/
|
|
|
- private final static Map<GisSurveySystemCheckId, ListenableFuture<GisSurveySystemCheckResult>> TASK_CACHE = new ConcurrentHashMap<>();
|
|
|
+ private final static Map<GisSurveySystemCheckId, ListenableFuture<CommAsyncResult<Map<String, GisSurveySystemCheckResultDetail>>>>
|
|
|
+ TASK_CACHE = new ConcurrentHashMap<>();
|
|
|
/**
|
|
|
- * 子任务缓存
|
|
|
+ * 子任务缓存{@code <系统检查id,异步任务<异步结果<结果详情>>>}
|
|
|
*/
|
|
|
- private final static Map<GisSurveySystemCheckId, GisSurveySystemCheckSubtask> SUBTASK_CACHE = new ConcurrentHashMap<>();
|
|
|
-
|
|
|
+ private final static Map<GisSurveySystemCheckId, Map<String, ListenableFuture<GisSurveySystemCheckResultDetail>>>
|
|
|
+ SUBTASK_CACHE = new ConcurrentHashMap<>();
|
|
|
/**
|
|
|
- * 开始时间缓存
|
|
|
+ * 信息缓存
|
|
|
*/
|
|
|
- private final static Map<GisSurveySystemCheckId, LocalDateTime> TIME_CACHE = new ConcurrentHashMap<>();
|
|
|
+ private final static Map<GisSurveySystemCheckId, CommAsyncCache> INFO_CACHE = new ConcurrentHashMap<>();
|
|
|
/**
|
|
|
* log
|
|
|
*/
|
|
@@ -58,29 +62,29 @@ public class GisSurveySystemCheckBizService {
|
|
|
private final String mBizType;
|
|
|
|
|
|
private final AsyncTaskProperties asyncTaskProperties;
|
|
|
- private final GisSurveySystemCheckProperties systemCheckProperties;
|
|
|
private final ThreadPoolTaskScheduler taskScheduler;
|
|
|
- private final GisSurveySystemCheckResultManager systemCheckFileManager;
|
|
|
+ private final AsyncResultManager asyncResultManager;
|
|
|
private final GisSurveySystemChecker systemChecker;
|
|
|
private final GisSurveyProjectInfoService projectInfoService;
|
|
|
private final GisSurveyJobInfoService jobInfoService;
|
|
|
+ private final GisSurveySystemCheckDefineService systemCheckDefineService;
|
|
|
|
|
|
public GisSurveySystemCheckBizService(AsyncTaskProperties asyncTaskProperties
|
|
|
- , GisSurveySystemCheckProperties systemCheckProperties
|
|
|
, @Qualifier("timeThreadPoolTaskScheduler") ThreadPoolTaskScheduler taskScheduler
|
|
|
- , GisSurveySystemCheckResultManager systemCheckFileManager
|
|
|
+ , AsyncResultManager asyncResultManager
|
|
|
, GisSurveySystemChecker systemChecker
|
|
|
, GisSurveyProjectInfoService projectInfoService
|
|
|
- , GisSurveyJobInfoService jobInfoService) {
|
|
|
+ , GisSurveyJobInfoService jobInfoService
|
|
|
+ , GisSurveySystemCheckDefineService systemCheckDefineService) {
|
|
|
mStrClassName = "GisSurveySystemCheckBizService";
|
|
|
mBizType = LogFlagBusiType.BUSI_GIS_SURVEY.toStrValue();
|
|
|
this.asyncTaskProperties = asyncTaskProperties;
|
|
|
- this.systemCheckProperties = systemCheckProperties;
|
|
|
this.taskScheduler = taskScheduler;
|
|
|
- this.systemCheckFileManager = systemCheckFileManager;
|
|
|
+ this.asyncResultManager = asyncResultManager;
|
|
|
this.systemChecker = systemChecker;
|
|
|
this.projectInfoService = projectInfoService;
|
|
|
this.jobInfoService = jobInfoService;
|
|
|
+ this.systemCheckDefineService = systemCheckDefineService;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -89,64 +93,127 @@ public class GisSurveySystemCheckBizService {
|
|
|
* @param params 系统检查参数
|
|
|
* @return 检查状态
|
|
|
*/
|
|
|
- public GisSurveySystemCheckResult sysCheckFun(GisSurveyCheckParams params) {
|
|
|
+ public CommAsyncResult<Map<String, GisSurveySystemCheckResultDetail>> sysCheckFun(GisSurveyCheckParams params) {
|
|
|
//系统检查id
|
|
|
GisSurveySystemCheckId systemCheckId = GisSurveySystemCheckId.generateId(params);
|
|
|
- if (systemCheckId == null) return GisSurveySystemCheckResult.notExists(params);
|
|
|
- //获取已存在的任务
|
|
|
- ListenableFuture<GisSurveySystemCheckResult> previousFuture = TASK_CACHE.get(systemCheckId);
|
|
|
- //如任务已完成,则检查历史失败
|
|
|
- if (!params.getIgnoreFail() && previousFuture != null && previousFuture.isDone()) {
|
|
|
- try {
|
|
|
- //获取结果,并检查失败
|
|
|
- GisSurveySystemCheckResult gisSurveySystemCheckResult = previousFuture.get();
|
|
|
- if (Objects.equals(gisSurveySystemCheckResult.getCheckStatus(), CommAsyncStatusEnum.FAIL.getCode()))
|
|
|
- return gisSurveySystemCheckResult;
|
|
|
- } catch (InterruptedException | ExecutionException e) {
|
|
|
- //打印报错信息(不太可能走到这)
|
|
|
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
|
|
|
- , String.format("检查历史失败异常 系统检查id:%s msg:%s", systemCheckId, e.getMessage())
|
|
|
- );
|
|
|
- }
|
|
|
+ if (systemCheckId == null) return CommAsyncResult.fail(null);
|
|
|
|
|
|
- }
|
|
|
+ //获取上次结果
|
|
|
+ CommAsyncResult<Map<String, GisSurveySystemCheckResultDetail>> lastResult = getTempResult(systemCheckId);
|
|
|
+ //失败/进行中处理
|
|
|
+ if (lastResult != null && !Objects.equals(CommAsyncStatusEnum.SUCCESS.getCode(), lastResult.getStatus()))
|
|
|
+ return lastResult;
|
|
|
|
|
|
- //获取已存在的结果
|
|
|
- GisSurveySystemCheckResult result = systemCheckFileManager.getResult(systemCheckId);
|
|
|
- //获取元素更新时间
|
|
|
+ //文件结果flag
|
|
|
+ final String FLAG = systemCheckId.getFlag();
|
|
|
+ //获取文件结果
|
|
|
+ CommAsyncResult<Map<String, GisSurveySystemCheckResultDetail>> fileResult = asyncResultManager.getResult(FLAG, new TypeReference<CommAsyncResult<Map<String, GisSurveySystemCheckResultDetail>>>() {
|
|
|
+ });
|
|
|
+ //获取数据更新时间
|
|
|
LocalDateTime refreshTime = getRefreshTimeDuration(systemCheckId);
|
|
|
//回填元素更新时间
|
|
|
params.setRefreshTime(refreshTime);
|
|
|
+ //获取支持检查项
|
|
|
+ List<String> supportedKeys = systemCheckDefineService.findKeys();
|
|
|
+ //对入参取交集
|
|
|
+ params.getSubitemKeys().retainAll(supportedKeys);
|
|
|
+
|
|
|
//判断结果是否存在
|
|
|
- if (result != null) {
|
|
|
- //判断时间间隔,如滞后未到临界,则直接返回结果
|
|
|
- if (result.getRefreshTime() != null && refreshTime != null) {
|
|
|
- //结果滞后数据库的时间
|
|
|
- Duration lags = Duration.between(result.getRefreshTime(), refreshTime);
|
|
|
- //规定滞后时间大于真实滞后时间,直接返回结果
|
|
|
- if (systemCheckProperties.getResultLagDuration().compareTo(lags) >= 0) {
|
|
|
- return result;
|
|
|
+ if (fileResult != null && refreshTime != null) {
|
|
|
+ //过滤启动key
|
|
|
+ List<String> startKeys = params.getSubitemKeys().stream().filter(key -> {
|
|
|
+ //文件结果包含数据库时间
|
|
|
+ if (fileResult.getRefreshTimes() != null && fileResult.getRefreshTimes().containsKey(key)
|
|
|
+ //文件结果数据包含当前项
|
|
|
+ && fileResult.getData() != null && fileResult.getData().containsKey(key)) {
|
|
|
+
|
|
|
+ //文件结果落后数据间隔
|
|
|
+ Duration lags = Duration.between(fileResult.getRefreshTimes().get(key), refreshTime);
|
|
|
+ //规定落后时间小于落后数据间隔(数据落后),需要启动
|
|
|
+ return asyncTaskProperties.getSystemCheckResultLag().compareTo(lags) < 0;
|
|
|
}
|
|
|
- }
|
|
|
+ return true;
|
|
|
+ }).collect(Collectors.toList());
|
|
|
+ params.setSubitemKeys(startKeys);
|
|
|
}
|
|
|
|
|
|
- //系统检查子任务
|
|
|
- GisSurveySystemCheckSubtask checkSubtask = SUBTASK_CACHE.get(systemCheckId);
|
|
|
- //进行中判断(未完成且未清除)
|
|
|
- if (previousFuture != null && !previousFuture.isDone() && !previousFuture.isCancelled())
|
|
|
- return GisSurveySystemCheckResult.inProgress(params, checkSubtask, TIME_CACHE.get(systemCheckId));
|
|
|
//启动检查任务
|
|
|
- if (Boolean.TRUE.equals(params.getCheckStart())) {
|
|
|
+ if (Boolean.TRUE.equals(params.getCheckStart()) && CollectionUtils.isNotEmpty(params.getSubitemKeys())) {
|
|
|
startTask(systemCheckId, params);
|
|
|
//如需要返回上次结果,则直接返回结果
|
|
|
- if (Boolean.TRUE.equals(params.getReturnLastResult())) {
|
|
|
- return result;
|
|
|
- }
|
|
|
+ if (Boolean.TRUE.equals(params.getReturnLastResult())) return fileResult;
|
|
|
//返回进行中
|
|
|
- return GisSurveySystemCheckResult.inProgress(params, checkSubtask, LocalDateTime.now());
|
|
|
+ return CommAsyncResult.inProgress(systemCheckId.toString(), LocalDateTime.now(), params.getOperator());
|
|
|
}
|
|
|
//未启动返回结果
|
|
|
- return result;
|
|
|
+ return fileResult;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取缓存结果
|
|
|
+ *
|
|
|
+ * @param systemCheckId 系统检查id
|
|
|
+ * @return 缓存结果
|
|
|
+ */
|
|
|
+ private CommAsyncResult<Map<String, GisSurveySystemCheckResultDetail>> getTempResult(GisSurveySystemCheckId systemCheckId) {
|
|
|
+ //获取已存在的任务
|
|
|
+ ListenableFuture<CommAsyncResult<Map<String, GisSurveySystemCheckResultDetail>>> previousFuture
|
|
|
+ = TASK_CACHE.get(systemCheckId);
|
|
|
+
|
|
|
+ //判断完成
|
|
|
+ if (previousFuture != null && previousFuture.isDone()) {
|
|
|
+ try {
|
|
|
+ //获取结果
|
|
|
+ CommAsyncResult<Map<String, GisSurveySystemCheckResultDetail>> result = previousFuture.get();
|
|
|
+ //如成功/失败,则直接返回
|
|
|
+ if (Objects.equals(result.getStatus(), CommAsyncStatusEnum.FAIL.getCode()) ||
|
|
|
+ Objects.equals(result.getStatus(), CommAsyncStatusEnum.SUCCESS.getCode()))
|
|
|
+ removeCache(systemCheckId);
|
|
|
+ return result;
|
|
|
+ } catch (InterruptedException | ExecutionException e) {
|
|
|
+ //打印报错信息
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
|
|
|
+ , String.format("查询系统检查异常 异步任务id:%s error:%s", systemCheckId, e)
|
|
|
+ );
|
|
|
+ return CommAsyncResult.fail(systemCheckId.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //进行中判断(未完成且未清除)
|
|
|
+ if (previousFuture != null && !previousFuture.isDone() && !previousFuture.isCancelled()) {
|
|
|
+ //获取缓存信息
|
|
|
+ CommAsyncCache asyncCache = INFO_CACHE.get(systemCheckId);
|
|
|
+ //设置进行中信息,并返回
|
|
|
+ if (asyncCache != null) {
|
|
|
+ CommAsyncResult<Map<String, GisSurveySystemCheckResultDetail>> tempResult = CommAsyncResult.inProgress(
|
|
|
+ systemCheckId.toString(), asyncCache.getStartTime(), asyncCache.getOperator());
|
|
|
+ tempResult.setSubitemKeys(asyncCache.getSubitemKeys());
|
|
|
+ tempResult.setData(getProgressData(systemCheckId));
|
|
|
+ return tempResult;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //取消判断
|
|
|
+ if (previousFuture != null && previousFuture.isCancelled()) removeCache(systemCheckId);
|
|
|
+
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取进行中数据
|
|
|
+ *
|
|
|
+ * @return 进行中详情
|
|
|
+ */
|
|
|
+ private Map<String, GisSurveySystemCheckResultDetail> getProgressData(GisSurveySystemCheckId systemCheckId) {
|
|
|
+ return SUBTASK_CACHE.get(systemCheckId).entrySet().stream()
|
|
|
+ .collect(Collectors.toMap(
|
|
|
+ Map.Entry::getKey,
|
|
|
+ entry -> {
|
|
|
+ GisSurveySystemCheckResultDetail resultDetail = new GisSurveySystemCheckResultDetail();
|
|
|
+ resultDetail.setDone(entry.getValue().isDone());
|
|
|
+ return resultDetail;
|
|
|
+ }
|
|
|
+ ));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -155,12 +222,12 @@ public class GisSurveySystemCheckBizService {
|
|
|
* @param params 系统检查参数
|
|
|
* @return 检查结果(可能进行中)
|
|
|
*/
|
|
|
- public GisSurveySystemCheckResult cancelCheck(GisSurveyCheckParams params) {
|
|
|
+ public int cancelCheck(GisSurveyCheckParams params) {
|
|
|
//任务标识
|
|
|
GisSurveySystemCheckId systemCheckId = GisSurveySystemCheckId.generateId(params);
|
|
|
- if (systemCheckId == null) return GisSurveySystemCheckResult.fail(params);
|
|
|
+ if (systemCheckId == null) return CommAsyncStatusEnum.NOT_EXISTS.getCode();
|
|
|
//如无缓存,则直接返回不存在
|
|
|
- if (!TASK_CACHE.containsKey(systemCheckId)) return GisSurveySystemCheckResult.notExists(params);
|
|
|
+ if (!TASK_CACHE.containsKey(systemCheckId)) return CommAsyncStatusEnum.NOT_EXISTS.getCode();
|
|
|
//关闭检查任务
|
|
|
if (stopTask(TASK_CACHE.get(systemCheckId))) {
|
|
|
removeCache(systemCheckId);
|
|
@@ -171,9 +238,9 @@ public class GisSurveySystemCheckBizService {
|
|
|
, systemCheckId
|
|
|
)
|
|
|
);
|
|
|
- return GisSurveySystemCheckResult.success(params);
|
|
|
+ return CommAsyncStatusEnum.SUCCESS.getCode();
|
|
|
}
|
|
|
- return GisSurveySystemCheckResult.fail(params);
|
|
|
+ return CommAsyncStatusEnum.FAIL.getCode();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -201,12 +268,12 @@ public class GisSurveySystemCheckBizService {
|
|
|
*/
|
|
|
private void startTask(GisSurveySystemCheckId systemCheckId, GisSurveyCheckParams params) {
|
|
|
//获取已存在的任务
|
|
|
- ListenableFuture<GisSurveySystemCheckResult> previousFuture = TASK_CACHE.get(systemCheckId);
|
|
|
+ ListenableFuture<CommAsyncResult<Map<String, GisSurveySystemCheckResultDetail>>> previousFuture = TASK_CACHE.get(systemCheckId);
|
|
|
//已结束判断,删除缓存
|
|
|
if (previousFuture != null && (previousFuture.isDone() || previousFuture.isCancelled()))
|
|
|
removeCache(systemCheckId);
|
|
|
//异步执行系统检查任务
|
|
|
- ListenableFuture<GisSurveySystemCheckResult> future = systemChecker.systemCheckTask(params,
|
|
|
+ ListenableFuture<CommAsyncResult<Map<String, GisSurveySystemCheckResultDetail>>> future = systemChecker.systemCheckTask(params,
|
|
|
//缓存子任务句柄
|
|
|
subtask -> SUBTASK_CACHE.put(systemCheckId, subtask),
|
|
|
//删除子任务句柄
|
|
@@ -227,7 +294,7 @@ public class GisSurveySystemCheckBizService {
|
|
|
//缓存任务句柄
|
|
|
TASK_CACHE.put(systemCheckId, future);
|
|
|
//缓存时间
|
|
|
- TIME_CACHE.put(systemCheckId, LocalDateTime.now());
|
|
|
+ INFO_CACHE.put(systemCheckId, new CommAsyncCache(LocalDateTime.now(), params.getOperator(), params.getSubitemKeys()));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -251,6 +318,6 @@ public class GisSurveySystemCheckBizService {
|
|
|
private void removeCache(GisSurveySystemCheckId systemCheckId) {
|
|
|
TASK_CACHE.remove(systemCheckId);
|
|
|
SUBTASK_CACHE.remove(systemCheckId);
|
|
|
- TIME_CACHE.remove(systemCheckId);
|
|
|
+ INFO_CACHE.remove(systemCheckId);
|
|
|
}
|
|
|
}
|