|
@@ -3,6 +3,7 @@ package com.shkpr.service.alambizplugin.bizservice;
|
|
|
import com.global.base.log.LogLevelFlag;
|
|
|
import com.global.base.log.LogPrintMgr;
|
|
|
import com.shkpr.service.alambizplugin.apiparam.GisSurveyCheckParams;
|
|
|
+import com.shkpr.service.alambizplugin.commproperties.AsyncTaskProperties;
|
|
|
import com.shkpr.service.alambizplugin.commproperties.GisSurveySystemCheckProperties;
|
|
|
import com.shkpr.service.alambizplugin.components.GisSurveySystemCheckResultManager;
|
|
|
import com.shkpr.service.alambizplugin.components.GisSurveySystemChecker;
|
|
@@ -14,17 +15,19 @@ import com.shkpr.service.alambizplugin.dbdao.services.intef.GisSurveyProjectInfo
|
|
|
import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckId;
|
|
|
import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckResult;
|
|
|
import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckSubtask;
|
|
|
+import org.springframework.beans.factory.annotation.Qualifier;
|
|
|
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
|
|
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
import org.springframework.util.concurrent.ListenableFuture;
|
|
|
|
|
|
import java.time.Duration;
|
|
|
+import java.time.Instant;
|
|
|
import java.time.LocalDateTime;
|
|
|
-import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ExecutionException;
|
|
|
-import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
* 系统检查管理service
|
|
@@ -33,6 +36,7 @@ import java.util.stream.Collectors;
|
|
|
* @since 1.0.0
|
|
|
*/
|
|
|
@Component
|
|
|
+@EnableConfigurationProperties({GisSurveySystemCheckProperties.class})
|
|
|
public class GisSurveySystemCheckBizService {
|
|
|
/**
|
|
|
* 任务缓存
|
|
@@ -53,17 +57,26 @@ public class GisSurveySystemCheckBizService {
|
|
|
private final String mStrClassName;
|
|
|
private final String mBizType;
|
|
|
|
|
|
+ private final AsyncTaskProperties asyncTaskProperties;
|
|
|
private final GisSurveySystemCheckProperties systemCheckProperties;
|
|
|
+ private final ThreadPoolTaskScheduler taskScheduler;
|
|
|
private final GisSurveySystemCheckResultManager systemCheckFileManager;
|
|
|
private final GisSurveySystemChecker systemChecker;
|
|
|
private final GisSurveyProjectInfoService projectInfoService;
|
|
|
private final GisSurveyJobInfoService jobInfoService;
|
|
|
|
|
|
- public GisSurveySystemCheckBizService(GisSurveySystemCheckProperties systemCheckProperties, GisSurveySystemCheckResultManager systemCheckFileManager
|
|
|
- , GisSurveySystemChecker systemChecker, GisSurveyProjectInfoService projectInfoService, GisSurveyJobInfoService jobInfoService) {
|
|
|
+ public GisSurveySystemCheckBizService(AsyncTaskProperties asyncTaskProperties
|
|
|
+ , GisSurveySystemCheckProperties systemCheckProperties
|
|
|
+ , @Qualifier("timeThreadPoolTaskScheduler") ThreadPoolTaskScheduler taskScheduler
|
|
|
+ , GisSurveySystemCheckResultManager systemCheckFileManager
|
|
|
+ , GisSurveySystemChecker systemChecker
|
|
|
+ , GisSurveyProjectInfoService projectInfoService
|
|
|
+ , GisSurveyJobInfoService jobInfoService) {
|
|
|
mStrClassName = "GisSurveySystemCheckBizService";
|
|
|
mBizType = LogFlagBusiType.BUSI_GIS_SURVEY.toStrValue();
|
|
|
+ this.asyncTaskProperties = asyncTaskProperties;
|
|
|
this.systemCheckProperties = systemCheckProperties;
|
|
|
+ this.taskScheduler = taskScheduler;
|
|
|
this.systemCheckFileManager = systemCheckFileManager;
|
|
|
this.systemChecker = systemChecker;
|
|
|
this.projectInfoService = projectInfoService;
|
|
@@ -144,43 +157,23 @@ public class GisSurveySystemCheckBizService {
|
|
|
*/
|
|
|
public GisSurveySystemCheckResult cancelCheck(GisSurveyCheckParams params) {
|
|
|
//任务标识
|
|
|
- GisSurveySystemCheckId taskId = GisSurveySystemCheckId.generateId(params);
|
|
|
- if (taskId == null) return GisSurveySystemCheckResult.fail(params);
|
|
|
+ GisSurveySystemCheckId systemCheckId = GisSurveySystemCheckId.generateId(params);
|
|
|
+ if (systemCheckId == null) return GisSurveySystemCheckResult.fail(params);
|
|
|
//如无缓存,则直接返回不存在
|
|
|
- if (!TASK_CACHE.containsKey(taskId)) return GisSurveySystemCheckResult.notExists(params);
|
|
|
+ if (!TASK_CACHE.containsKey(systemCheckId)) return GisSurveySystemCheckResult.notExists(params);
|
|
|
//关闭检查任务
|
|
|
- return stopTask(taskId) ? GisSurveySystemCheckResult.success(params) : GisSurveySystemCheckResult.fail(params);
|
|
|
- }
|
|
|
+ if (stopTask(TASK_CACHE.get(systemCheckId))) {
|
|
|
+ removeCache(systemCheckId);
|
|
|
|
|
|
- /**
|
|
|
- * 过期任务
|
|
|
- * <p>用于检查和使任务过期</p>
|
|
|
- */
|
|
|
- public void expireResult(Duration ttl) {
|
|
|
- //获取超时的id
|
|
|
- List<GisSurveySystemCheckId> systemCheckIds = TIME_CACHE.entrySet().stream()
|
|
|
- .filter(entry ->
|
|
|
- Duration.between(entry.getValue(), LocalDateTime.now()).compareTo(ttl) > 0
|
|
|
- )
|
|
|
- .map(Map.Entry::getKey)
|
|
|
- .collect(Collectors.toList());
|
|
|
- //停止超时的任务并删除任务缓存
|
|
|
- for (GisSurveySystemCheckId systemCheckId : systemCheckIds) {
|
|
|
- //如任务不存在,则删除时间缓存
|
|
|
- if (!TASK_CACHE.containsKey(systemCheckId)) TIME_CACHE.remove(systemCheckId);
|
|
|
- //停用缓存
|
|
|
- if (stopTask(systemCheckId)) TIME_CACHE.remove(systemCheckId);
|
|
|
- else {
|
|
|
- //打印报错信息
|
|
|
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
|
|
|
- , String.format(
|
|
|
- "过期任务停止失败 ttl:%s 系统检查id:%s"
|
|
|
- , ttl
|
|
|
- , systemCheckId
|
|
|
- )
|
|
|
- );
|
|
|
- }
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
|
|
|
+ , String.format(
|
|
|
+ "系统检查停止成功,清除缓存;系统检查id: %s"
|
|
|
+ , systemCheckId
|
|
|
+ )
|
|
|
+ );
|
|
|
+ return GisSurveySystemCheckResult.success(params);
|
|
|
}
|
|
|
+ return GisSurveySystemCheckResult.fail(params);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -213,14 +206,26 @@ public class GisSurveySystemCheckBizService {
|
|
|
if (previousFuture != null && (previousFuture.isDone() || previousFuture.isCancelled()))
|
|
|
removeCache(systemCheckId);
|
|
|
//异步执行系统检查任务
|
|
|
- ListenableFuture<GisSurveySystemCheckResult> checkFuture = systemChecker.systemCheckTask(params,
|
|
|
+ ListenableFuture<GisSurveySystemCheckResult> future = systemChecker.systemCheckTask(params,
|
|
|
//缓存子任务句柄
|
|
|
subtask -> SUBTASK_CACHE.put(systemCheckId, subtask),
|
|
|
//删除子任务句柄
|
|
|
subtaskSystemCheckId -> SUBTASK_CACHE.remove(systemCheckId)
|
|
|
);
|
|
|
+ //任务超时
|
|
|
+ taskScheduler.schedule(() -> {
|
|
|
+ if (!future.isCancelled() && !future.isDone() && stopTask(future)) {
|
|
|
+ removeCache(systemCheckId);
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
|
|
|
+ , String.format(
|
|
|
+ "系统检查超时,成功停止任务;系统检查id: %s"
|
|
|
+ , systemCheckId
|
|
|
+ )
|
|
|
+ );
|
|
|
+ }
|
|
|
+ }, Instant.now().plusMillis(asyncTaskProperties.getSystemCheckTimeout().toMillis()));
|
|
|
//缓存任务句柄
|
|
|
- TASK_CACHE.put(systemCheckId, checkFuture);
|
|
|
+ TASK_CACHE.put(systemCheckId, future);
|
|
|
//缓存时间
|
|
|
TIME_CACHE.put(systemCheckId, LocalDateTime.now());
|
|
|
}
|
|
@@ -228,24 +233,14 @@ public class GisSurveySystemCheckBizService {
|
|
|
/**
|
|
|
* 停止任务
|
|
|
*
|
|
|
- * @param systemCheckId 系统检查id
|
|
|
+ * @param future 系统检查任务
|
|
|
* @return 关闭状态
|
|
|
*/
|
|
|
- private Boolean stopTask(GisSurveySystemCheckId systemCheckId) {
|
|
|
- ListenableFuture<GisSurveySystemCheckResult> future = TASK_CACHE.get(systemCheckId);
|
|
|
+ private Boolean stopTask(ListenableFuture<?> future) {
|
|
|
//完成判断,完成删除缓存
|
|
|
- if (future.isCancelled() || future.isDone()) {
|
|
|
- removeCache(systemCheckId);
|
|
|
- return true;
|
|
|
- }
|
|
|
+ if (future.isCancelled() || future.isDone()) return true;
|
|
|
//尝试清除任务
|
|
|
- boolean cancel = future.cancel(true);
|
|
|
- //清除成功,删除缓存
|
|
|
- if (cancel) {
|
|
|
- removeCache(systemCheckId);
|
|
|
- return true;
|
|
|
- }
|
|
|
- return false;
|
|
|
+ return future.cancel(true);
|
|
|
}
|
|
|
|
|
|
/**
|