GisSurveySystemCheckBizService.java 11 KB


  1. package com.shkpr.service.alambizplugin.bizservice;
  2. import com.global.base.log.LogLevelFlag;
  3. import com.global.base.log.LogPrintMgr;
  4. import com.shkpr.service.alambizplugin.apiparam.GisSurveyCheckParams;
  5. import com.shkpr.service.alambizplugin.commproperties.GisSurveySystemCheckProperties;
  6. import com.shkpr.service.alambizplugin.components.GisSurveySystemCheckResultManager;
  7. import com.shkpr.service.alambizplugin.components.GisSurveySystemChecker;
  8. import com.shkpr.service.alambizplugin.constants.GisSurveyCheckStatusEnum;
  9. import com.shkpr.service.alambizplugin.constants.GisSurveyCheckTypeEnum;
  10. import com.shkpr.service.alambizplugin.constants.LogFlagBusiType;
  11. import com.shkpr.service.alambizplugin.dbdao.services.intef.GisSurveyJobInfoService;
  12. import com.shkpr.service.alambizplugin.dbdao.services.intef.GisSurveyProjectInfoService;
  13. import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckId;
  14. import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckResult;
  15. import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckSubtask;
  16. import org.springframework.stereotype.Component;
  17. import org.springframework.util.concurrent.ListenableFuture;
  18. import java.time.Duration;
  19. import java.time.LocalDateTime;
  20. import java.util.List;
  21. import java.util.Map;
  22. import java.util.Objects;
  23. import java.util.concurrent.ConcurrentHashMap;
  24. import java.util.concurrent.ExecutionException;
  25. import java.util.stream.Collectors;
  26. /**
  27. * 系统检查管理service
  28. *
  29. * @author 欧阳劲驰
  30. * @since 1.0.0
  31. */
  32. @Component
  33. public class GisSurveySystemCheckBizService {
  34. /**
  35. * 任务缓存
  36. */
  37. private final static Map<GisSurveySystemCheckId, ListenableFuture<GisSurveySystemCheckResult>> TASK_CACHE = new ConcurrentHashMap<>();
  38. /**
  39. * 子任务缓存
  40. */
  41. private final static Map<GisSurveySystemCheckId, GisSurveySystemCheckSubtask> SUBTASK_CACHE = new ConcurrentHashMap<>();
  42. /**
  43. * 开始时间缓存
  44. */
  45. private final static Map<GisSurveySystemCheckId, LocalDateTime> TIME_CACHE = new ConcurrentHashMap<>();
  46. /**
  47. * log
  48. */
  49. private final String mStrClassName;
  50. private final String mBizType;
  51. private final GisSurveySystemCheckProperties systemCheckProperties;
  52. private final GisSurveySystemCheckResultManager systemCheckFileManager;
  53. private final GisSurveySystemChecker systemChecker;
  54. private final GisSurveyProjectInfoService projectInfoService;
  55. private final GisSurveyJobInfoService jobInfoService;
  56. public GisSurveySystemCheckBizService(GisSurveySystemCheckProperties systemCheckProperties, GisSurveySystemCheckResultManager systemCheckFileManager
  57. , GisSurveySystemChecker systemChecker, GisSurveyProjectInfoService projectInfoService, GisSurveyJobInfoService jobInfoService) {
  58. mStrClassName = "GisSurveySystemCheckBizService";
  59. mBizType = LogFlagBusiType.BUSI_GIS_SURVEY.toStrValue();
  60. this.systemCheckProperties = systemCheckProperties;
  61. this.systemCheckFileManager = systemCheckFileManager;
  62. this.systemChecker = systemChecker;
  63. this.projectInfoService = projectInfoService;
  64. this.jobInfoService = jobInfoService;
  65. }
  66. /**
  67. * 系统检查
  68. *
  69. * @param params 系统检查参数
  70. * @return 检查状态
  71. */
  72. public GisSurveySystemCheckResult sysCheckFun(GisSurveyCheckParams params) {
  73. //系统检查id
  74. GisSurveySystemCheckId systemCheckId = GisSurveySystemCheckId.generateId(params);
  75. if (systemCheckId == null) return GisSurveySystemCheckResult.notExists(params);
  76. //获取已存在的任务
  77. ListenableFuture<GisSurveySystemCheckResult> previousFuture = TASK_CACHE.get(systemCheckId);
  78. //如任务已完成,则检查历史失败
  79. if (!params.getIgnoreFail() && previousFuture != null && previousFuture.isDone()) {
  80. try {
  81. //获取结果,并检查失败
  82. GisSurveySystemCheckResult gisSurveySystemCheckResult = previousFuture.get();
  83. if (Objects.equals(gisSurveySystemCheckResult.getCheckStatus(), GisSurveyCheckStatusEnum.FAIL.getCode()))
  84. return gisSurveySystemCheckResult;
  85. } catch (InterruptedException | ExecutionException e) {
  86. //打印报错信息(不太可能走到这)
  87. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
  88. , String.format("检查历史失败异常 系统检查id:%s msg:%s", systemCheckId, e.getMessage())
  89. );
  90. }
  91. }
  92. //获取已存在的结果
  93. GisSurveySystemCheckResult result = systemCheckFileManager.getResult(systemCheckId);
  94. //获取元素更新时间
  95. LocalDateTime refreshTime = getRefreshTimeDuration(systemCheckId);
  96. //回填元素更新时间
  97. params.setRefreshTime(refreshTime);
  98. //判断结果是否存在
  99. if (result != null) {
  100. //判断时间间隔,如滞后未到临界,则直接返回结果
  101. if (result.getRefreshTime() != null && refreshTime != null) {
  102. //结果滞后数据库的时间
  103. Duration lags = Duration.between(result.getRefreshTime(), refreshTime);
  104. //规定滞后时间大于真实滞后时间,直接返回结果
  105. if (systemCheckProperties.getResultLagDuration().compareTo(lags) >= 0) {
  106. return result;
  107. }
  108. }
  109. }
  110. //系统检查子任务
  111. GisSurveySystemCheckSubtask checkSubtask = SUBTASK_CACHE.get(systemCheckId);
  112. //进行中判断(未完成且未清除)
  113. if (previousFuture != null && !previousFuture.isDone() && !previousFuture.isCancelled())
  114. return GisSurveySystemCheckResult.inProgress(params, checkSubtask, TIME_CACHE.get(systemCheckId));
  115. //启动检查任务
  116. if (Boolean.TRUE.equals(params.getCheckStart())) {
  117. startTask(systemCheckId, params);
  118. //如需要返回上次结果,则直接返回结果
  119. if (Boolean.TRUE.equals(params.getReturnLastResult())) {
  120. return result;
  121. }
  122. //返回进行中
  123. return GisSurveySystemCheckResult.inProgress(params, checkSubtask, LocalDateTime.now());
  124. }
  125. //未启动返回结果
  126. return result;
  127. }
  128. /**
  129. * 取消检查
  130. *
  131. * @param params 系统检查参数
  132. * @return 检查结果(可能进行中)
  133. */
  134. public GisSurveySystemCheckResult cancelCheck(GisSurveyCheckParams params) {
  135. //任务标识
  136. GisSurveySystemCheckId taskId = GisSurveySystemCheckId.generateId(params);
  137. if (taskId == null) return GisSurveySystemCheckResult.fail(params);
  138. //如无缓存,则直接返回不存在
  139. if (!TASK_CACHE.containsKey(taskId)) return GisSurveySystemCheckResult.notExists(params);
  140. //关闭检查任务
  141. return stopTask(taskId) ? GisSurveySystemCheckResult.success(params) : GisSurveySystemCheckResult.fail(params);
  142. }
  143. /**
  144. * 过期任务
  145. * <p>用于检查和使任务过期</p>
  146. */
  147. public void expireResult(Duration ttl) {
  148. //获取超时的id
  149. List<GisSurveySystemCheckId> systemCheckIds = TIME_CACHE.entrySet().stream()
  150. .filter(entry ->
  151. Duration.between(entry.getValue(), LocalDateTime.now()).compareTo(ttl) > 0
  152. )
  153. .map(Map.Entry::getKey)
  154. .collect(Collectors.toList());
  155. //停止超时的任务并删除任务缓存
  156. for (GisSurveySystemCheckId systemCheckId : systemCheckIds) {
  157. //如任务不存在,则删除时间缓存
  158. if (!TASK_CACHE.containsKey(systemCheckId)) TIME_CACHE.remove(systemCheckId);
  159. //停用缓存
  160. if (stopTask(systemCheckId)) TIME_CACHE.remove(systemCheckId);
  161. else {
  162. //打印报错信息
  163. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
  164. , String.format(
  165. "过期任务停止失败 ttl:%s 系统检查id:%s"
  166. , ttl
  167. , systemCheckId
  168. )
  169. );
  170. }
  171. }
  172. }
  173. /**
  174. * 获取更新时间
  175. *
  176. * @param systemCheckId 系统检查id
  177. * @return 元素更新时间
  178. */
  179. private LocalDateTime getRefreshTimeDuration(GisSurveySystemCheckId systemCheckId) {
  180. //默认为当前时间
  181. LocalDateTime refreshTime = LocalDateTime.now();
  182. //根据检查维度获取数据库里的更新时间
  183. if (systemCheckId.getCheckType() == GisSurveyCheckTypeEnum.PROJECT)
  184. refreshTime = projectInfoService.findRefreshTimeByUid(systemCheckId.getCode());
  185. if (systemCheckId.getCheckType() == GisSurveyCheckTypeEnum.JOB)
  186. refreshTime = jobInfoService.findRefreshTimeByUid(systemCheckId.getCode());
  187. return refreshTime;
  188. }
  189. /**
  190. * 启动任务
  191. *
  192. * @param systemCheckId 系统检查id
  193. * @param params 检查参数
  194. */
  195. private void startTask(GisSurveySystemCheckId systemCheckId, GisSurveyCheckParams params) {
  196. //获取已存在的任务
  197. ListenableFuture<GisSurveySystemCheckResult> previousFuture = TASK_CACHE.get(systemCheckId);
  198. //已结束判断,删除缓存
  199. if (previousFuture != null && (previousFuture.isDone() || previousFuture.isCancelled()))
  200. removeCache(systemCheckId);
  201. //异步执行系统检查任务
  202. ListenableFuture<GisSurveySystemCheckResult> checkFuture = systemChecker.systemCheckTask(params,
  203. //缓存子任务句柄
  204. subtask -> SUBTASK_CACHE.put(systemCheckId, subtask),
  205. //删除子任务句柄
  206. subtaskSystemCheckId -> SUBTASK_CACHE.remove(systemCheckId)
  207. );
  208. //缓存任务句柄
  209. TASK_CACHE.put(systemCheckId, checkFuture);
  210. //缓存时间
  211. TIME_CACHE.put(systemCheckId, LocalDateTime.now());
  212. }
  213. /**
  214. * 停止任务
  215. *
  216. * @param systemCheckId 系统检查id
  217. * @return 关闭状态
  218. */
  219. private Boolean stopTask(GisSurveySystemCheckId systemCheckId) {
  220. ListenableFuture<GisSurveySystemCheckResult> future = TASK_CACHE.get(systemCheckId);
  221. //完成判断,完成删除缓存
  222. if (future.isCancelled() || future.isDone()) {
  223. removeCache(systemCheckId);
  224. return true;
  225. }
  226. //尝试清除任务
  227. boolean cancel = future.cancel(true);
  228. //清除成功,删除缓存
  229. if (cancel) {
  230. removeCache(systemCheckId);
  231. return true;
  232. }
  233. return false;
  234. }
  235. /**
  236. * 清除缓存
  237. *
  238. * @param systemCheckId 系统检查id
  239. */
  240. private void removeCache(GisSurveySystemCheckId systemCheckId) {
  241. TASK_CACHE.remove(systemCheckId);
  242. SUBTASK_CACHE.remove(systemCheckId);
  243. TIME_CACHE.remove(systemCheckId);
  244. }
  245. }