GisSurveyBizService.java 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package com.shkpr.service.alambizplugin.bizservice;
  2. import com.global.base.log.LogLevelFlag;
  3. import com.global.base.log.LogPrintMgr;
  4. import com.shkpr.service.alambizplugin.apiparam.GisSurveyCheckParams;
  5. import com.shkpr.service.alambizplugin.commproperties.GisSurveySystemCheckProperties;
  6. import com.shkpr.service.alambizplugin.components.GisSurveySystemCheckResultManager;
  7. import com.shkpr.service.alambizplugin.components.GisSurveySystemChecker;
  8. import com.shkpr.service.alambizplugin.constants.GisSurveyCheckTypeEnum;
  9. import com.shkpr.service.alambizplugin.constants.LogFlagBusiType;
  10. import com.shkpr.service.alambizplugin.dbdao.services.intef.GisSurveyJobInfoService;
  11. import com.shkpr.service.alambizplugin.dbdao.services.intef.GisSurveyProjectInfoService;
  12. import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckId;
  13. import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckResult;
  14. import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckSubtask;
  15. import org.springframework.stereotype.Component;
  16. import org.springframework.util.concurrent.ListenableFuture;
  17. import java.time.Duration;
  18. import java.time.LocalDateTime;
  19. import java.util.List;
  20. import java.util.Map;
  21. import java.util.concurrent.ConcurrentHashMap;
  22. import java.util.stream.Collectors;
  23. /**
  24. * 系统检查管理service
  25. *
  26. * @author 欧阳劲驰
  27. * @since 1.0.0
  28. */
  29. @Component
  30. public class GisSurveyBizService {
  31. /**
  32. * 任务缓存
  33. */
  34. private final static Map<GisSurveySystemCheckId, ListenableFuture<GisSurveySystemCheckResult>> TASK_CACHE = new ConcurrentHashMap<>();
  35. /**
  36. * 子任务缓存
  37. */
  38. private final static Map<GisSurveySystemCheckId, GisSurveySystemCheckSubtask> SUBTASK_CACHE = new ConcurrentHashMap<>();
  39. /**
  40. * 开始时间缓存
  41. */
  42. private final static Map<GisSurveySystemCheckId, LocalDateTime> TIME_CACHE = new ConcurrentHashMap<>();
  43. /**
  44. * log
  45. */
  46. private final String mStrClassName;
  47. private final String mBizType;
  48. private final GisSurveySystemCheckProperties systemCheckProperties;
  49. private final GisSurveySystemCheckResultManager systemCheckFileManager;
  50. private final GisSurveySystemChecker systemChecker;
  51. private final GisSurveyProjectInfoService projectInfoService;
  52. private final GisSurveyJobInfoService jobInfoService;
  53. public GisSurveyBizService(GisSurveySystemCheckProperties systemCheckProperties, GisSurveySystemCheckResultManager systemCheckFileManager
  54. , GisSurveySystemChecker systemChecker, GisSurveyProjectInfoService projectInfoService, GisSurveyJobInfoService jobInfoService) {
  55. mStrClassName = "GisSurveyBizService";
  56. mBizType = LogFlagBusiType.BUSI_GIS_SURVEY.toStrValue();
  57. this.systemCheckProperties = systemCheckProperties;
  58. this.systemCheckFileManager = systemCheckFileManager;
  59. this.systemChecker = systemChecker;
  60. this.projectInfoService = projectInfoService;
  61. this.jobInfoService = jobInfoService;
  62. }
  63. /**
  64. * 系统检查
  65. *
  66. * @param params 系统检查参数
  67. * @return 检查状态
  68. */
  69. public GisSurveySystemCheckResult sysCheckFun(GisSurveyCheckParams params) {
  70. //系统检查id
  71. GisSurveySystemCheckId systemCheckId = GisSurveySystemCheckId.generateId(params);
  72. if (systemCheckId == null) return GisSurveySystemCheckResult.notExists(params);
  73. //获取已存在的任务
  74. ListenableFuture<GisSurveySystemCheckResult> previousFuture = TASK_CACHE.get(systemCheckId);
  75. //获取已存在的结果
  76. GisSurveySystemCheckResult result = systemCheckFileManager.getResult(systemCheckId);
  77. //获取元素更新时间
  78. LocalDateTime refreshTime = getRefreshTimeDuration(systemCheckId);
  79. //回填元素更新时间
  80. params.setRefreshTime(refreshTime);
  81. //判断结果是否存在
  82. if (result != null) {
  83. //判断时间间隔,如滞后未到临界,则直接返回结果
  84. if (result.getRefreshTime() != null && refreshTime != null) {
  85. //结果滞后数据库的时间
  86. Duration lags = Duration.between(result.getRefreshTime(), refreshTime);
  87. //规定滞后时间大于真实滞后时间,直接返回结果
  88. if (systemCheckProperties.getResultLagDuration().compareTo(lags) >= 0) {
  89. return result;
  90. }
  91. }
  92. }
  93. //系统检查子任务
  94. GisSurveySystemCheckSubtask checkSubtask = SUBTASK_CACHE.get(systemCheckId);
  95. //进行中判断(未完成且未清除)
  96. if (previousFuture != null && !previousFuture.isDone() && !previousFuture.isCancelled())
  97. return GisSurveySystemCheckResult.inProgress(params, checkSubtask, TIME_CACHE.get(systemCheckId));
  98. //启动检查任务
  99. startTask(systemCheckId, params);
  100. //返回进行中
  101. return GisSurveySystemCheckResult.inProgress(params, checkSubtask, LocalDateTime.now());
  102. }
  103. /**
  104. * 取消检查
  105. *
  106. * @param params 系统检查参数
  107. * @return 检查结果(可能进行中)
  108. */
  109. public GisSurveySystemCheckResult cancelCheck(GisSurveyCheckParams params) {
  110. //任务标识
  111. GisSurveySystemCheckId taskId = GisSurveySystemCheckId.generateId(params);
  112. if (taskId == null) return GisSurveySystemCheckResult.fail(params);
  113. //如无缓存,则直接返回不存在
  114. if (!TASK_CACHE.containsKey(taskId)) return GisSurveySystemCheckResult.notExists(params);
  115. //关闭检查任务
  116. return stopTask(taskId) ? GisSurveySystemCheckResult.success(params) : GisSurveySystemCheckResult.fail(params);
  117. }
  118. /**
  119. * 过期任务
  120. * <p>用于检查和使任务过期</p>
  121. */
  122. public void expireResult(Duration ttl) {
  123. //获取超时的id
  124. List<GisSurveySystemCheckId> taskIds = TIME_CACHE.entrySet().stream()
  125. .filter(entry ->
  126. Duration.between(entry.getValue(), LocalDateTime.now()).compareTo(ttl) > 0
  127. )
  128. .map(Map.Entry::getKey)
  129. .collect(Collectors.toList());
  130. //停止超时的任务并删除任务缓存
  131. for (GisSurveySystemCheckId taskId : taskIds) {
  132. //如任务不存在,则删除时间缓存
  133. if (!TASK_CACHE.containsKey(taskId)) TIME_CACHE.remove(taskId);
  134. //停用缓存
  135. if (stopTask(taskId)) TIME_CACHE.remove(taskId);
  136. else {
  137. //打印报错信息
  138. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
  139. , String.format(
  140. "过期任务停止失败 ttl:%s taskId:%s"
  141. , ttl
  142. , taskId
  143. )
  144. );
  145. }
  146. }
  147. }
  148. /**
  149. * 获取更新时间
  150. *
  151. * @param systemCheckId 系统检查id
  152. * @return 元素更新时间
  153. */
  154. private LocalDateTime getRefreshTimeDuration(GisSurveySystemCheckId systemCheckId) {
  155. //默认为当前时间
  156. LocalDateTime refreshTime = LocalDateTime.now();
  157. //根据检查维度获取数据库里的更新时间
  158. if (systemCheckId.getCheckType() == GisSurveyCheckTypeEnum.PROJECT)
  159. refreshTime = projectInfoService.findRefreshTimeByUid(systemCheckId.getCode());
  160. if (systemCheckId.getCheckType() == GisSurveyCheckTypeEnum.JOB)
  161. refreshTime = jobInfoService.findRefreshTimeByUid(systemCheckId.getCode());
  162. return refreshTime;
  163. }
  164. /**
  165. * 启动任务
  166. *
  167. * @param systemCheckId 系统检查id
  168. * @param params 检查参数
  169. */
  170. private void startTask(GisSurveySystemCheckId systemCheckId, GisSurveyCheckParams params) {
  171. //获取已存在的任务
  172. ListenableFuture<GisSurveySystemCheckResult> previousFuture = TASK_CACHE.get(systemCheckId);
  173. //已结束判断,删除缓存
  174. if (previousFuture != null && (previousFuture.isDone() || previousFuture.isCancelled())) {
  175. TASK_CACHE.remove(systemCheckId);
  176. SUBTASK_CACHE.remove(systemCheckId);
  177. TIME_CACHE.remove(systemCheckId);
  178. }
  179. //异步执行系统检查任务
  180. ListenableFuture<GisSurveySystemCheckResult> checkFuture = systemChecker.systemCheckTask(params,
  181. //缓存子任务句柄
  182. subtask -> SUBTASK_CACHE.put(systemCheckId, subtask)
  183. );
  184. //缓存任务句柄
  185. TASK_CACHE.put(systemCheckId, checkFuture);
  186. //缓存时间
  187. TIME_CACHE.put(systemCheckId, LocalDateTime.now());
  188. }
  189. /**
  190. * 停止任务
  191. *
  192. * @param taskId 任务id
  193. * @return 关闭状态
  194. */
  195. private Boolean stopTask(GisSurveySystemCheckId taskId) {
  196. ListenableFuture<GisSurveySystemCheckResult> future = TASK_CACHE.get(taskId);
  197. //完成判断,完成删除缓存
  198. if (future.isCancelled() || future.isDone()) {
  199. TASK_CACHE.remove(taskId);
  200. TIME_CACHE.remove(taskId);
  201. return true;
  202. }
  203. //尝试清除任务
  204. boolean cancel = future.cancel(true);
  205. //清除成功,删除缓存
  206. if (cancel) {
  207. TASK_CACHE.remove(taskId);
  208. TIME_CACHE.remove(taskId);
  209. return true;
  210. }
  211. return false;
  212. }
  213. }