123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- package com.shkpr.service.alambizplugin.bizservice;
- import com.global.base.log.LogLevelFlag;
- import com.global.base.log.LogPrintMgr;
- import com.shkpr.service.alambizplugin.apiparam.GisSurveyCheckParams;
- import com.shkpr.service.alambizplugin.commproperties.GisSurveySystemCheckProperties;
- import com.shkpr.service.alambizplugin.components.GisSurveySystemCheckResultManager;
- import com.shkpr.service.alambizplugin.components.GisSurveySystemChecker;
- import com.shkpr.service.alambizplugin.constants.GisSurveyCheckStatusEnum;
- import com.shkpr.service.alambizplugin.constants.GisSurveyCheckTypeEnum;
- import com.shkpr.service.alambizplugin.constants.LogFlagBusiType;
- import com.shkpr.service.alambizplugin.dbdao.services.intef.GisSurveyJobInfoService;
- import com.shkpr.service.alambizplugin.dbdao.services.intef.GisSurveyProjectInfoService;
- import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckId;
- import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckResult;
- import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckSubtask;
- import org.springframework.stereotype.Component;
- import org.springframework.util.concurrent.ListenableFuture;
- import java.time.Duration;
- import java.time.LocalDateTime;
- import java.util.List;
- import java.util.Map;
- import java.util.Objects;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ExecutionException;
- import java.util.stream.Collectors;
- /**
- * 系统检查管理service
- *
- * @author 欧阳劲驰
- * @since 1.0.0
- */
- @Component
- public class GisSurveySystemCheckBizService {
- /**
- * 任务缓存
- */
- private final static Map<GisSurveySystemCheckId, ListenableFuture<GisSurveySystemCheckResult>> TASK_CACHE = new ConcurrentHashMap<>();
- /**
- * 子任务缓存
- */
- private final static Map<GisSurveySystemCheckId, GisSurveySystemCheckSubtask> SUBTASK_CACHE = new ConcurrentHashMap<>();
- /**
- * 开始时间缓存
- */
- private final static Map<GisSurveySystemCheckId, LocalDateTime> TIME_CACHE = new ConcurrentHashMap<>();
- /**
- * log
- */
- private final String mStrClassName;
- private final String mBizType;
- private final GisSurveySystemCheckProperties systemCheckProperties;
- private final GisSurveySystemCheckResultManager systemCheckFileManager;
- private final GisSurveySystemChecker systemChecker;
- private final GisSurveyProjectInfoService projectInfoService;
- private final GisSurveyJobInfoService jobInfoService;
- public GisSurveySystemCheckBizService(GisSurveySystemCheckProperties systemCheckProperties, GisSurveySystemCheckResultManager systemCheckFileManager
- , GisSurveySystemChecker systemChecker, GisSurveyProjectInfoService projectInfoService, GisSurveyJobInfoService jobInfoService) {
- mStrClassName = "GisSurveySystemCheckBizService";
- mBizType = LogFlagBusiType.BUSI_GIS_SURVEY.toStrValue();
- this.systemCheckProperties = systemCheckProperties;
- this.systemCheckFileManager = systemCheckFileManager;
- this.systemChecker = systemChecker;
- this.projectInfoService = projectInfoService;
- this.jobInfoService = jobInfoService;
- }
- /**
- * 系统检查
- *
- * @param params 系统检查参数
- * @return 检查状态
- */
- public GisSurveySystemCheckResult sysCheckFun(GisSurveyCheckParams params) {
- //系统检查id
- GisSurveySystemCheckId systemCheckId = GisSurveySystemCheckId.generateId(params);
- if (systemCheckId == null) return GisSurveySystemCheckResult.notExists(params);
- //获取已存在的任务
- ListenableFuture<GisSurveySystemCheckResult> previousFuture = TASK_CACHE.get(systemCheckId);
- //如任务已完成,则检查历史失败
- if (!params.getIgnoreFail() && previousFuture != null && previousFuture.isDone()) {
- try {
- //获取结果,并检查失败
- GisSurveySystemCheckResult gisSurveySystemCheckResult = previousFuture.get();
- if (Objects.equals(gisSurveySystemCheckResult.getCheckStatus(), GisSurveyCheckStatusEnum.FAIL.getCode()))
- return gisSurveySystemCheckResult;
- } catch (InterruptedException | ExecutionException e) {
- //打印报错信息(不太可能走到这)
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
- , String.format("检查历史失败异常 系统检查id:%s msg:%s", systemCheckId, e.getMessage())
- );
- }
- }
- //获取已存在的结果
- GisSurveySystemCheckResult result = systemCheckFileManager.getResult(systemCheckId);
- //获取元素更新时间
- LocalDateTime refreshTime = getRefreshTimeDuration(systemCheckId);
- //回填元素更新时间
- params.setRefreshTime(refreshTime);
- //判断结果是否存在
- if (result != null) {
- //判断时间间隔,如滞后未到临界,则直接返回结果
- if (result.getRefreshTime() != null && refreshTime != null) {
- //结果滞后数据库的时间
- Duration lags = Duration.between(result.getRefreshTime(), refreshTime);
- //规定滞后时间大于真实滞后时间,直接返回结果
- if (systemCheckProperties.getResultLagDuration().compareTo(lags) >= 0) {
- return result;
- }
- }
- }
- //系统检查子任务
- GisSurveySystemCheckSubtask checkSubtask = SUBTASK_CACHE.get(systemCheckId);
- //进行中判断(未完成且未清除)
- if (previousFuture != null && !previousFuture.isDone() && !previousFuture.isCancelled())
- return GisSurveySystemCheckResult.inProgress(params, checkSubtask, TIME_CACHE.get(systemCheckId));
- //启动检查任务
- if (Boolean.TRUE.equals(params.getCheckStart())) {
- startTask(systemCheckId, params);
- //如需要返回上次结果,则直接返回结果
- if (Boolean.TRUE.equals(params.getReturnLastResult())) {
- return result;
- }
- //返回进行中
- return GisSurveySystemCheckResult.inProgress(params, checkSubtask, LocalDateTime.now());
- }
- //未启动返回结果
- return result;
- }
- /**
- * 取消检查
- *
- * @param params 系统检查参数
- * @return 检查结果(可能进行中)
- */
- public GisSurveySystemCheckResult cancelCheck(GisSurveyCheckParams params) {
- //任务标识
- GisSurveySystemCheckId taskId = GisSurveySystemCheckId.generateId(params);
- if (taskId == null) return GisSurveySystemCheckResult.fail(params);
- //如无缓存,则直接返回不存在
- if (!TASK_CACHE.containsKey(taskId)) return GisSurveySystemCheckResult.notExists(params);
- //关闭检查任务
- return stopTask(taskId) ? GisSurveySystemCheckResult.success(params) : GisSurveySystemCheckResult.fail(params);
- }
- /**
- * 过期任务
- * <p>用于检查和使任务过期</p>
- */
- public void expireResult(Duration ttl) {
- //获取超时的id
- List<GisSurveySystemCheckId> systemCheckIds = TIME_CACHE.entrySet().stream()
- .filter(entry ->
- Duration.between(entry.getValue(), LocalDateTime.now()).compareTo(ttl) > 0
- )
- .map(Map.Entry::getKey)
- .collect(Collectors.toList());
- //停止超时的任务并删除任务缓存
- for (GisSurveySystemCheckId systemCheckId : systemCheckIds) {
- //如任务不存在,则删除时间缓存
- if (!TASK_CACHE.containsKey(systemCheckId)) TIME_CACHE.remove(systemCheckId);
- //停用缓存
- if (stopTask(systemCheckId)) TIME_CACHE.remove(systemCheckId);
- else {
- //打印报错信息
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
- , String.format(
- "过期任务停止失败 ttl:%s 系统检查id:%s"
- , ttl
- , systemCheckId
- )
- );
- }
- }
- }
- /**
- * 获取更新时间
- *
- * @param systemCheckId 系统检查id
- * @return 元素更新时间
- */
- private LocalDateTime getRefreshTimeDuration(GisSurveySystemCheckId systemCheckId) {
- //默认为当前时间
- LocalDateTime refreshTime = LocalDateTime.now();
- //根据检查维度获取数据库里的更新时间
- if (systemCheckId.getCheckType() == GisSurveyCheckTypeEnum.PROJECT)
- refreshTime = projectInfoService.findRefreshTimeByUid(systemCheckId.getCode());
- if (systemCheckId.getCheckType() == GisSurveyCheckTypeEnum.JOB)
- refreshTime = jobInfoService.findRefreshTimeByUid(systemCheckId.getCode());
- return refreshTime;
- }
- /**
- * 启动任务
- *
- * @param systemCheckId 系统检查id
- * @param params 检查参数
- */
- private void startTask(GisSurveySystemCheckId systemCheckId, GisSurveyCheckParams params) {
- //获取已存在的任务
- ListenableFuture<GisSurveySystemCheckResult> previousFuture = TASK_CACHE.get(systemCheckId);
- //已结束判断,删除缓存
- if (previousFuture != null && (previousFuture.isDone() || previousFuture.isCancelled()))
- removeCache(systemCheckId);
- //异步执行系统检查任务
- ListenableFuture<GisSurveySystemCheckResult> checkFuture = systemChecker.systemCheckTask(params,
- //缓存子任务句柄
- subtask -> SUBTASK_CACHE.put(systemCheckId, subtask),
- //删除子任务句柄
- subtaskSystemCheckId -> SUBTASK_CACHE.remove(systemCheckId)
- );
- //缓存任务句柄
- TASK_CACHE.put(systemCheckId, checkFuture);
- //缓存时间
- TIME_CACHE.put(systemCheckId, LocalDateTime.now());
- }
- /**
- * 停止任务
- *
- * @param systemCheckId 系统检查id
- * @return 关闭状态
- */
- private Boolean stopTask(GisSurveySystemCheckId systemCheckId) {
- ListenableFuture<GisSurveySystemCheckResult> future = TASK_CACHE.get(systemCheckId);
- //完成判断,完成删除缓存
- if (future.isCancelled() || future.isDone()) {
- removeCache(systemCheckId);
- return true;
- }
- //尝试清除任务
- boolean cancel = future.cancel(true);
- //清除成功,删除缓存
- if (cancel) {
- removeCache(systemCheckId);
- return true;
- }
- return false;
- }
- /**
- * 清除缓存
- *
- * @param systemCheckId 系统检查id
- */
- private void removeCache(GisSurveySystemCheckId systemCheckId) {
- TASK_CACHE.remove(systemCheckId);
- SUBTASK_CACHE.remove(systemCheckId);
- TIME_CACHE.remove(systemCheckId);
- }
- }
|