package com.shkpr.service.alambizplugin.bizservice; import com.global.base.log.LogLevelFlag; import com.global.base.log.LogPrintMgr; import com.shkpr.service.alambizplugin.apiparam.GisSurveyCheckParams; import com.shkpr.service.alambizplugin.commproperties.GisSurveySystemCheckProperties; import com.shkpr.service.alambizplugin.components.GisSurveySystemCheckResultManager; import com.shkpr.service.alambizplugin.components.GisSurveySystemChecker; import com.shkpr.service.alambizplugin.constants.GisSurveyCheckStatusEnum; import com.shkpr.service.alambizplugin.constants.GisSurveyCheckTypeEnum; import com.shkpr.service.alambizplugin.constants.LogFlagBusiType; import com.shkpr.service.alambizplugin.dbdao.services.intef.GisSurveyJobInfoService; import com.shkpr.service.alambizplugin.dbdao.services.intef.GisSurveyProjectInfoService; import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckId; import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckResult; import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckSubtask; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import java.time.Duration; import java.time.LocalDateTime; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; /** * 系统检查管理service * * @author 欧阳劲驰 * @since 1.0.0 */ @Component public class GisSurveySystemCheckBizService { /** * 任务缓存 */ private final static Map> TASK_CACHE = new ConcurrentHashMap<>(); /** * 子任务缓存 */ private final static Map SUBTASK_CACHE = new ConcurrentHashMap<>(); /** * 开始时间缓存 */ private final static Map TIME_CACHE = new ConcurrentHashMap<>(); /** * log */ private final String mStrClassName; private final String mBizType; private final GisSurveySystemCheckProperties systemCheckProperties; private final GisSurveySystemCheckResultManager systemCheckFileManager; private final GisSurveySystemChecker systemChecker; private final GisSurveyProjectInfoService projectInfoService; private final GisSurveyJobInfoService jobInfoService; public GisSurveySystemCheckBizService(GisSurveySystemCheckProperties systemCheckProperties, GisSurveySystemCheckResultManager systemCheckFileManager , GisSurveySystemChecker systemChecker, GisSurveyProjectInfoService projectInfoService, GisSurveyJobInfoService jobInfoService) { mStrClassName = "GisSurveySystemCheckBizService"; mBizType = LogFlagBusiType.BUSI_GIS_SURVEY.toStrValue(); this.systemCheckProperties = systemCheckProperties; this.systemCheckFileManager = systemCheckFileManager; this.systemChecker = systemChecker; this.projectInfoService = projectInfoService; this.jobInfoService = jobInfoService; } /** * 系统检查 * * @param params 系统检查参数 * @return 检查状态 */ public GisSurveySystemCheckResult sysCheckFun(GisSurveyCheckParams params) { //系统检查id GisSurveySystemCheckId systemCheckId = GisSurveySystemCheckId.generateId(params); if (systemCheckId == null) return GisSurveySystemCheckResult.notExists(params); //获取已存在的任务 ListenableFuture previousFuture = TASK_CACHE.get(systemCheckId); //如任务已完成,则检查历史失败 if (!params.getIgnoreFail() && previousFuture != null && previousFuture.isDone()) { try { //获取结果,并检查失败 GisSurveySystemCheckResult gisSurveySystemCheckResult = previousFuture.get(); if (Objects.equals(gisSurveySystemCheckResult.getCheckStatus(), GisSurveyCheckStatusEnum.FAIL.getCode())) return gisSurveySystemCheckResult; } catch (InterruptedException | ExecutionException e) { //打印报错信息(不太可能走到这) LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName , String.format("检查历史失败异常 系统检查id:%s msg:%s", systemCheckId, e.getMessage()) ); } } //获取已存在的结果 GisSurveySystemCheckResult result = systemCheckFileManager.getResult(systemCheckId); //获取元素更新时间 LocalDateTime refreshTime = getRefreshTimeDuration(systemCheckId); //回填元素更新时间 params.setRefreshTime(refreshTime); //判断结果是否存在 if (result != null) { //判断时间间隔,如滞后未到临界,则直接返回结果 if (result.getRefreshTime() != null && refreshTime != null) { //结果滞后数据库的时间 Duration lags = Duration.between(result.getRefreshTime(), refreshTime); //规定滞后时间大于真实滞后时间,直接返回结果 if (systemCheckProperties.getResultLagDuration().compareTo(lags) >= 0) { return result; } } } //系统检查子任务 GisSurveySystemCheckSubtask checkSubtask = SUBTASK_CACHE.get(systemCheckId); //进行中判断(未完成且未清除) if (previousFuture != null && !previousFuture.isDone() && !previousFuture.isCancelled()) return GisSurveySystemCheckResult.inProgress(params, checkSubtask, TIME_CACHE.get(systemCheckId)); //启动检查任务 if (Boolean.TRUE.equals(params.getCheckStart())) { startTask(systemCheckId, params); //如需要返回上次结果,则直接返回结果 if (Boolean.TRUE.equals(params.getReturnLastResult())) { return result; } //返回进行中 return GisSurveySystemCheckResult.inProgress(params, checkSubtask, LocalDateTime.now()); } //未启动返回结果 return result; } /** * 取消检查 * * @param params 系统检查参数 * @return 检查结果(可能进行中) */ public GisSurveySystemCheckResult cancelCheck(GisSurveyCheckParams params) { //任务标识 GisSurveySystemCheckId taskId = GisSurveySystemCheckId.generateId(params); if (taskId == null) return GisSurveySystemCheckResult.fail(params); //如无缓存,则直接返回不存在 if (!TASK_CACHE.containsKey(taskId)) return GisSurveySystemCheckResult.notExists(params); //关闭检查任务 return stopTask(taskId) ? GisSurveySystemCheckResult.success(params) : GisSurveySystemCheckResult.fail(params); } /** * 过期任务 *

用于检查和使任务过期

*/ public void expireResult(Duration ttl) { //获取超时的id List systemCheckIds = TIME_CACHE.entrySet().stream() .filter(entry -> Duration.between(entry.getValue(), LocalDateTime.now()).compareTo(ttl) > 0 ) .map(Map.Entry::getKey) .collect(Collectors.toList()); //停止超时的任务并删除任务缓存 for (GisSurveySystemCheckId systemCheckId : systemCheckIds) { //如任务不存在,则删除时间缓存 if (!TASK_CACHE.containsKey(systemCheckId)) TIME_CACHE.remove(systemCheckId); //停用缓存 if (stopTask(systemCheckId)) TIME_CACHE.remove(systemCheckId); else { //打印报错信息 LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName , String.format( "过期任务停止失败 ttl:%s 系统检查id:%s" , ttl , systemCheckId ) ); } } } /** * 获取更新时间 * * @param systemCheckId 系统检查id * @return 元素更新时间 */ private LocalDateTime getRefreshTimeDuration(GisSurveySystemCheckId systemCheckId) { //默认为当前时间 LocalDateTime refreshTime = LocalDateTime.now(); //根据检查维度获取数据库里的更新时间 if (systemCheckId.getCheckType() == GisSurveyCheckTypeEnum.PROJECT) refreshTime = projectInfoService.findRefreshTimeByUid(systemCheckId.getCode()); if (systemCheckId.getCheckType() == GisSurveyCheckTypeEnum.JOB) refreshTime = jobInfoService.findRefreshTimeByUid(systemCheckId.getCode()); return refreshTime; } /** * 启动任务 * * @param systemCheckId 系统检查id * @param params 检查参数 */ private void startTask(GisSurveySystemCheckId systemCheckId, GisSurveyCheckParams params) { //获取已存在的任务 ListenableFuture previousFuture = TASK_CACHE.get(systemCheckId); //已结束判断,删除缓存 if (previousFuture != null && (previousFuture.isDone() || previousFuture.isCancelled())) removeCache(systemCheckId); //异步执行系统检查任务 ListenableFuture checkFuture = systemChecker.systemCheckTask(params, //缓存子任务句柄 subtask -> SUBTASK_CACHE.put(systemCheckId, subtask), //删除子任务句柄 subtaskSystemCheckId -> SUBTASK_CACHE.remove(systemCheckId) ); //缓存任务句柄 TASK_CACHE.put(systemCheckId, checkFuture); //缓存时间 TIME_CACHE.put(systemCheckId, LocalDateTime.now()); } /** * 停止任务 * * @param systemCheckId 系统检查id * @return 关闭状态 */ private Boolean stopTask(GisSurveySystemCheckId systemCheckId) { ListenableFuture future = TASK_CACHE.get(systemCheckId); //完成判断,完成删除缓存 if (future.isCancelled() || future.isDone()) { removeCache(systemCheckId); return true; } //尝试清除任务 boolean cancel = future.cancel(true); //清除成功,删除缓存 if (cancel) { removeCache(systemCheckId); return true; } return false; } /** * 清除缓存 * * @param systemCheckId 系统检查id */ private void removeCache(GisSurveySystemCheckId systemCheckId) { TASK_CACHE.remove(systemCheckId); SUBTASK_CACHE.remove(systemCheckId); TIME_CACHE.remove(systemCheckId); } }