GisSurveyThirdImportBizService.java 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. package com.shkpr.service.alambizplugin.bizservice;
  2. import com.global.base.log.LogLevelFlag;
  3. import com.global.base.log.LogPrintMgr;
  4. import com.shkpr.service.alambizplugin.apiparam.GisSurveyThirdImportParams;
  5. import com.shkpr.service.alambizplugin.components.GisSurveyThirdImporter;
  6. import com.shkpr.service.alambizplugin.constants.GisSurveyImportStatusEnum;
  7. import com.shkpr.service.alambizplugin.constants.LogFlagBusiType;
  8. import com.shkpr.service.alambizplugin.dto.GisSurveyThirdImportResult;
  9. import com.shkpr.service.alambizplugin.dto.GisSurveyThirdImportSubtask;
  10. import org.springframework.stereotype.Component;
  11. import org.springframework.util.concurrent.ListenableFuture;
  12. import org.springframework.web.multipart.MultipartFile;
  13. import java.io.IOException;
  14. import java.io.InputStream;
  15. import java.time.Duration;
  16. import java.time.LocalDateTime;
  17. import java.util.ArrayList;
  18. import java.util.List;
  19. import java.util.Map;
  20. import java.util.Objects;
  21. import java.util.concurrent.ConcurrentHashMap;
  22. import java.util.concurrent.ExecutionException;
  23. import java.util.stream.Collectors;
  24. /**
  25. * 第三方导入管理service
  26. *
  27. * @author 欧阳劲驰
  28. * @since 1.0.0
  29. */
  30. @Component
  31. public class GisSurveyThirdImportBizService {
  32. /**
  33. * 任务缓存
  34. */
  35. private final static Map<String, ListenableFuture<GisSurveyThirdImportResult>> TASK_CACHE = new ConcurrentHashMap<>();
  36. /**
  37. * 子任务缓存
  38. */
  39. private final static Map<String, GisSurveyThirdImportSubtask> SUBTASK_CACHE = new ConcurrentHashMap<>();
  40. /**
  41. * 开始时间缓存
  42. */
  43. private final static Map<String, LocalDateTime> TIME_CACHE = new ConcurrentHashMap<>();
  44. /**
  45. * log
  46. */
  47. private final String mStrClassName;
  48. private final String mBizType;
  49. private final GisSurveyThirdImporter thirdImporter;
  50. public GisSurveyThirdImportBizService(GisSurveyThirdImporter thirdImporter) {
  51. mStrClassName = "GisSurveySystemCheckBizService";
  52. mBizType = LogFlagBusiType.BUSI_GIS_SURVEY.toStrValue();
  53. this.thirdImporter = thirdImporter;
  54. }
  55. /**
  56. * 第三方导入
  57. *
  58. * @param params 第三方导入参数
  59. * @return 导入状态
  60. */
  61. public GisSurveyThirdImportResult thirdImport(GisSurveyThirdImportParams params) {
  62. //获取已存在的任务
  63. ListenableFuture<GisSurveyThirdImportResult> previousFuture = TASK_CACHE.get(params.getJobId());
  64. //判断完成
  65. if (previousFuture != null && previousFuture.isDone()) {
  66. try {
  67. //获取结果
  68. GisSurveyThirdImportResult thirdImportResult = previousFuture.get();
  69. //如不忽略失败,且失败,则返回失败
  70. if (!params.getIgnoreFail() && Objects.equals(thirdImportResult.getImportStatus(), GisSurveyImportStatusEnum.FAIL.getCode()))
  71. return thirdImportResult;
  72. else if (Objects.equals(thirdImportResult.getImportStatus(), GisSurveyImportStatusEnum.SUCCESS.getCode())) {
  73. //如成功,返回成功,并清除缓存
  74. removeCache(params.getJobId());
  75. return thirdImportResult;
  76. }
  77. } catch (InterruptedException | ExecutionException e) {
  78. //打印报错信息(不太可能走到这)
  79. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
  80. , String.format("检查历史失败异常 任务id:%s msg:%s", params.getJobId(), e.getMessage())
  81. );
  82. }
  83. }
  84. //系统检查子任务
  85. GisSurveyThirdImportSubtask importSubtask = SUBTASK_CACHE.get(params.getJobId());
  86. //进行中判断(未完成且未清除)
  87. if (previousFuture != null && !previousFuture.isDone() && !previousFuture.isCancelled())
  88. return GisSurveyThirdImportResult.inProgress(params, importSubtask, TIME_CACHE.get(params.getJobId()));
  89. //获取文件流
  90. List<InputStream> inputStreams = new ArrayList<>();
  91. for (MultipartFile file : params.getFiles()) {
  92. try {
  93. inputStreams.add(file.getInputStream());
  94. } catch (IOException e) {
  95. return GisSurveyThirdImportResult.fail(params);
  96. }
  97. }
  98. //启动检查任务
  99. startTask(params, inputStreams);
  100. //返回进行中
  101. return GisSurveyThirdImportResult.inProgress(params, importSubtask, LocalDateTime.now());
  102. }
  103. /**
  104. * 取消检查
  105. *
  106. * @param jobId 任务id
  107. * @return 取消结果
  108. */
  109. public int cancelCheck(String jobId) {
  110. //如无缓存,则直接返回不存在
  111. if (!TASK_CACHE.containsKey(jobId)) return GisSurveyImportStatusEnum.NOT_EXISTS.getCode();
  112. //关闭检查任务
  113. return stopTask(jobId) ? GisSurveyImportStatusEnum.SUCCESS.getCode() : GisSurveyImportStatusEnum.FAIL.getCode();
  114. }
  115. /**
  116. * 过期任务
  117. * <p>用于检查和使任务过期</p>
  118. */
  119. public void expireResult(Duration ttl) {
  120. //获取超时的id
  121. List<String> jobIds = TIME_CACHE.entrySet().stream()
  122. .filter(entry ->
  123. Duration.between(entry.getValue(), LocalDateTime.now()).compareTo(ttl) > 0
  124. )
  125. .map(Map.Entry::getKey)
  126. .collect(Collectors.toList());
  127. //停止超时的任务并删除任务缓存
  128. for (String jobid : jobIds) {
  129. //如任务不存在,则删除时间缓存
  130. if (!TASK_CACHE.containsKey(jobid)) TIME_CACHE.remove(jobid);
  131. //停用缓存
  132. if (stopTask(jobid)) TIME_CACHE.remove(jobid);
  133. else {
  134. //打印报错信息
  135. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
  136. , String.format(
  137. "过期任务停止失败 ttl:%s 任务id:%s"
  138. , ttl
  139. , jobid
  140. )
  141. );
  142. }
  143. }
  144. }
  145. /**
  146. * 启动任务
  147. *
  148. * @param params 导入参数
  149. * @param inputStreams 文件输入流
  150. */
  151. private void startTask(GisSurveyThirdImportParams params, List<InputStream> inputStreams) {
  152. String jobId = params.getJobId();
  153. //获取已存在的任务
  154. ListenableFuture<GisSurveyThirdImportResult> previousFuture = TASK_CACHE.get(jobId);
  155. //已结束判断,删除缓存
  156. if (previousFuture != null && (previousFuture.isDone() || previousFuture.isCancelled())) {
  157. removeCache(params.getJobId());
  158. }
  159. //异步执行第三方导入查任务
  160. ListenableFuture<GisSurveyThirdImportResult> checkFuture = thirdImporter.thirdImportTask(params, inputStreams,
  161. //缓存子任务句柄
  162. subtask -> SUBTASK_CACHE.put(jobId, subtask),
  163. //删除子任务句柄
  164. subtaskSystemCheckId -> SUBTASK_CACHE.remove(jobId)
  165. );
  166. //缓存任务句柄
  167. TASK_CACHE.put(jobId, checkFuture);
  168. //缓存时间
  169. TIME_CACHE.put(jobId, LocalDateTime.now());
  170. }
  171. /**
  172. * 停止任务
  173. *
  174. * @param jobId 任务id
  175. * @return 关闭状态
  176. */
  177. private Boolean stopTask(String jobId) {
  178. ListenableFuture<GisSurveyThirdImportResult> future = TASK_CACHE.get(jobId);
  179. //完成判断,完成删除缓存
  180. if (future.isCancelled() || future.isDone()) {
  181. removeCache(jobId);
  182. return true;
  183. }
  184. //尝试清除任务
  185. boolean cancel = future.cancel(true);
  186. //清除成功,删除缓存
  187. if (cancel) {
  188. removeCache(jobId);
  189. return true;
  190. }
  191. return false;
  192. }
  193. /**
  194. * 清除缓存
  195. *
  196. * @param jobId 任务id
  197. */
  198. private void removeCache(String jobId) {
  199. TASK_CACHE.remove(jobId);
  200. SUBTASK_CACHE.remove(jobId);
  201. TIME_CACHE.remove(jobId);
  202. }
  203. }