package com.shkpr.service.alambizplugin.bizservice; import com.global.base.log.LogLevelFlag; import com.global.base.log.LogPrintMgr; import com.shkpr.service.alambizplugin.apiparam.GisSurveyCheckParams; import com.shkpr.service.alambizplugin.commproperties.GisSurveySystemCheckProperties; import com.shkpr.service.alambizplugin.components.GisSurveySystemCheckResultManager; import com.shkpr.service.alambizplugin.components.GisSurveySystemChecker; import com.shkpr.service.alambizplugin.constants.GisSurveyCheckTypeEnum; import com.shkpr.service.alambizplugin.constants.LogFlagBusiType; import com.shkpr.service.alambizplugin.dbdao.services.intef.GisSurveyJobInfoService; import com.shkpr.service.alambizplugin.dbdao.services.intef.GisSurveyProjectInfoService; import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckId; import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckResult; import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckSubtask; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import java.time.Duration; import java.time.LocalDateTime; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; /** * 系统检查管理service * * @author 欧阳劲驰 * @since 1.0.0 */ @Component public class GisSurveyBizService { /** * 任务缓存 */ private final static Map> TASK_CACHE = new ConcurrentHashMap<>(); /** * 子任务缓存 */ private final static Map SUBTASK_CACHE = new ConcurrentHashMap<>(); /** * 开始时间缓存 */ private final static Map TIME_CACHE = new ConcurrentHashMap<>(); /** * log */ private final String mStrClassName; private final String mBizType; private final GisSurveySystemCheckProperties systemCheckProperties; private final GisSurveySystemCheckResultManager systemCheckFileManager; private final GisSurveySystemChecker systemChecker; private final GisSurveyProjectInfoService projectInfoService; private final GisSurveyJobInfoService jobInfoService; public GisSurveyBizService(GisSurveySystemCheckProperties systemCheckProperties, GisSurveySystemCheckResultManager systemCheckFileManager , GisSurveySystemChecker systemChecker, GisSurveyProjectInfoService projectInfoService, GisSurveyJobInfoService jobInfoService) { mStrClassName = "GisSurveyBizService"; mBizType = LogFlagBusiType.BUSI_GIS_SURVEY.toStrValue(); this.systemCheckProperties = systemCheckProperties; this.systemCheckFileManager = systemCheckFileManager; this.systemChecker = systemChecker; this.projectInfoService = projectInfoService; this.jobInfoService = jobInfoService; } /** * 系统检查 * * @param params 系统检查参数 * @return 检查状态 */ public GisSurveySystemCheckResult sysCheckFun(GisSurveyCheckParams params) { //系统检查id GisSurveySystemCheckId systemCheckId = GisSurveySystemCheckId.generateId(params); if (systemCheckId == null) return GisSurveySystemCheckResult.notExists(params); //获取已存在的任务 ListenableFuture previousFuture = TASK_CACHE.get(systemCheckId); //获取已存在的结果 GisSurveySystemCheckResult result = systemCheckFileManager.getResult(systemCheckId); //获取元素更新时间 LocalDateTime refreshTime = getRefreshTimeDuration(systemCheckId); //回填元素更新时间 params.setRefreshTime(refreshTime); //判断结果是否存在 if (result != null) { //判断时间间隔,如滞后未到临界,则直接返回结果 if (result.getRefreshTime() != null && refreshTime != null) { //结果滞后数据库的时间 Duration lags = Duration.between(result.getRefreshTime(), refreshTime); //规定滞后时间大于真实滞后时间,直接返回结果 if (systemCheckProperties.getResultLagDuration().compareTo(lags) >= 0) { return result; } } } //系统检查子任务 GisSurveySystemCheckSubtask checkSubtask = SUBTASK_CACHE.get(systemCheckId); //进行中判断(未完成且未清除) if (previousFuture != null && !previousFuture.isDone() && !previousFuture.isCancelled()) return GisSurveySystemCheckResult.inProgress(params, checkSubtask, TIME_CACHE.get(systemCheckId)); //启动检查任务 startTask(systemCheckId, params); //返回进行中 return GisSurveySystemCheckResult.inProgress(params, checkSubtask, LocalDateTime.now()); } /** * 取消检查 * * @param params 系统检查参数 * @return 检查结果(可能进行中) */ public GisSurveySystemCheckResult cancelCheck(GisSurveyCheckParams params) { //任务标识 GisSurveySystemCheckId taskId = GisSurveySystemCheckId.generateId(params); if (taskId == null) return GisSurveySystemCheckResult.fail(params); //如无缓存,则直接返回不存在 if (!TASK_CACHE.containsKey(taskId)) return GisSurveySystemCheckResult.notExists(params); //关闭检查任务 return stopTask(taskId) ? GisSurveySystemCheckResult.success(params) : GisSurveySystemCheckResult.fail(params); } /** * 过期任务 *

用于检查和使任务过期

*/ public void expireResult(Duration ttl) { //获取超时的id List taskIds = TIME_CACHE.entrySet().stream() .filter(entry -> Duration.between(entry.getValue(), LocalDateTime.now()).compareTo(ttl) > 0 ) .map(Map.Entry::getKey) .collect(Collectors.toList()); //停止超时的任务并删除任务缓存 for (GisSurveySystemCheckId taskId : taskIds) { //如任务不存在,则删除时间缓存 if (!TASK_CACHE.containsKey(taskId)) TIME_CACHE.remove(taskId); //停用缓存 if (stopTask(taskId)) TIME_CACHE.remove(taskId); else { //打印报错信息 LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName , String.format( "过期任务停止失败 ttl:%s taskId:%s" , ttl , taskId ) ); } } } /** * 获取更新时间 * * @param systemCheckId 系统检查id * @return 元素更新时间 */ private LocalDateTime getRefreshTimeDuration(GisSurveySystemCheckId systemCheckId) { //默认为当前时间 LocalDateTime refreshTime = LocalDateTime.now(); //根据检查维度获取数据库里的更新时间 if (systemCheckId.getCheckType() == GisSurveyCheckTypeEnum.PROJECT) refreshTime = projectInfoService.findRefreshTimeByUid(systemCheckId.getCode()); if (systemCheckId.getCheckType() == GisSurveyCheckTypeEnum.JOB) refreshTime = jobInfoService.findRefreshTimeByUid(systemCheckId.getCode()); return refreshTime; } /** * 启动任务 * * @param systemCheckId 系统检查id * @param params 检查参数 */ private void startTask(GisSurveySystemCheckId systemCheckId, GisSurveyCheckParams params) { //获取已存在的任务 ListenableFuture previousFuture = TASK_CACHE.get(systemCheckId); //已结束判断,删除缓存 if (previousFuture != null && (previousFuture.isDone() || previousFuture.isCancelled())) { TASK_CACHE.remove(systemCheckId); SUBTASK_CACHE.remove(systemCheckId); TIME_CACHE.remove(systemCheckId); } //异步执行系统检查任务 ListenableFuture checkFuture = systemChecker.systemCheckTask(params, //缓存子任务句柄 subtask -> SUBTASK_CACHE.put(systemCheckId, subtask) ); //缓存任务句柄 TASK_CACHE.put(systemCheckId, checkFuture); //缓存时间 TIME_CACHE.put(systemCheckId, LocalDateTime.now()); } /** * 停止任务 * * @param taskId 任务id * @return 关闭状态 */ private Boolean stopTask(GisSurveySystemCheckId taskId) { ListenableFuture future = TASK_CACHE.get(taskId); //完成判断,完成删除缓存 if (future.isCancelled() || future.isDone()) { TASK_CACHE.remove(taskId); TIME_CACHE.remove(taskId); return true; } //尝试清除任务 boolean cancel = future.cancel(true); //清除成功,删除缓存 if (cancel) { TASK_CACHE.remove(taskId); TIME_CACHE.remove(taskId); return true; } return false; } }