GisSurveyThirdImportBizService.java 8.8 KB


  1. package com.shkpr.service.alambizplugin.bizservice;
  2. import com.global.base.log.LogLevelFlag;
  3. import com.global.base.log.LogPrintMgr;
  4. import com.shkpr.service.alambizplugin.apiparam.GisSurveyThirdImportParams;
  5. import com.shkpr.service.alambizplugin.commproperties.AsyncTaskProperties;
  6. import com.shkpr.service.alambizplugin.components.GisSurveyThirdImporter;
  7. import com.shkpr.service.alambizplugin.constants.GisSurveyImportStatusEnum;
  8. import com.shkpr.service.alambizplugin.constants.LogFlagBusiType;
  9. import com.shkpr.service.alambizplugin.dto.CommAsyncCache;
  10. import com.shkpr.service.alambizplugin.dto.GisSurveyThirdImportResult;
  11. import com.shkpr.service.alambizplugin.dto.GisSurveyThirdImportSubtask;
  12. import org.springframework.beans.factory.annotation.Qualifier;
  13. import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
  14. import org.springframework.stereotype.Component;
  15. import org.springframework.util.concurrent.ListenableFuture;
  16. import org.springframework.web.multipart.MultipartFile;
  17. import java.io.IOException;
  18. import java.io.InputStream;
  19. import java.time.Instant;
  20. import java.time.LocalDateTime;
  21. import java.util.ArrayList;
  22. import java.util.List;
  23. import java.util.Map;
  24. import java.util.Objects;
  25. import java.util.concurrent.ConcurrentHashMap;
  26. import java.util.concurrent.ExecutionException;
  27. /**
  28. * 第三方导入管理service
  29. *
  30. * @author 欧阳劲驰
  31. * @since 1.0.0
  32. */
  33. @Component
  34. public class GisSurveyThirdImportBizService {
  35. /**
  36. * 任务缓存
  37. */
  38. private final static Map<String, ListenableFuture<GisSurveyThirdImportResult>> TASK_CACHE = new ConcurrentHashMap<>();
  39. /**
  40. * 子任务缓存
  41. */
  42. private final static Map<String, GisSurveyThirdImportSubtask> SUBTASK_CACHE = new ConcurrentHashMap<>();
  43. /**
  44. * 开始时间缓存
  45. */
  46. private final static Map<String, CommAsyncCache> INFO_CACHE = new ConcurrentHashMap<>();
  47. /**
  48. * log
  49. */
  50. private final String mStrClassName;
  51. private final String mBizType;
  52. private final AsyncTaskProperties asyncTaskProperties;
  53. private final ThreadPoolTaskScheduler taskScheduler;
  54. private final GisSurveyThirdImporter thirdImporter;
  55. public GisSurveyThirdImportBizService(AsyncTaskProperties asyncTaskProperties
  56. , @Qualifier("timeThreadPoolTaskScheduler") ThreadPoolTaskScheduler taskScheduler
  57. , GisSurveyThirdImporter thirdImporter) {
  58. mStrClassName = "GisSurveySystemCheckBizService";
  59. mBizType = LogFlagBusiType.BUSI_GIS_SURVEY.toStrValue();
  60. this.asyncTaskProperties = asyncTaskProperties;
  61. this.taskScheduler = taskScheduler;
  62. this.thirdImporter = thirdImporter;
  63. }
  64. /**
  65. * 第三方导入
  66. *
  67. * @param params 第三方导入参数
  68. * @return 导入状态
  69. */
  70. public GisSurveyThirdImportResult thirdImport(GisSurveyThirdImportParams params) {
  71. //获取结果
  72. GisSurveyThirdImportResult result = getResult(params);
  73. if (result != null) return result;
  74. //获取文件流
  75. List<InputStream> inputStreams = new ArrayList<>();
  76. for (MultipartFile file : params.getFiles()) {
  77. try {
  78. inputStreams.add(file.getInputStream());
  79. } catch (IOException e) {
  80. return GisSurveyThirdImportResult.fail(params);
  81. }
  82. }
  83. //启动检查任务
  84. startTask(params, inputStreams);
  85. //返回进行中
  86. return GisSurveyThirdImportResult.inProgress(params, SUBTASK_CACHE.get(params.getJobId()),
  87. LocalDateTime.now(),
  88. params.getOperator()
  89. );
  90. }
  91. /**
  92. * 获取结果
  93. *
  94. * @param params 第三方导入参数
  95. * @return 导入结果
  96. */
  97. public GisSurveyThirdImportResult getResult(GisSurveyThirdImportParams params) {
  98. //获取已存在的任务
  99. ListenableFuture<GisSurveyThirdImportResult> previousFuture = TASK_CACHE.get(params.getJobId());
  100. //判断完成
  101. if (previousFuture != null && previousFuture.isDone()) {
  102. try {
  103. //获取结果
  104. GisSurveyThirdImportResult thirdImportResult = previousFuture.get();
  105. //如不忽略失败,且失败,则返回失败
  106. if (!params.getIgnoreFail() && Objects.equals(thirdImportResult.getImportStatus(), GisSurveyImportStatusEnum.FAIL.getCode()))
  107. return thirdImportResult;
  108. else if (Objects.equals(thirdImportResult.getImportStatus(), GisSurveyImportStatusEnum.SUCCESS.getCode()) ||
  109. Objects.equals(thirdImportResult.getImportStatus(), GisSurveyImportStatusEnum.DATA_ERROR.getCode())) {
  110. //如处理成功(导入成功或数据检查成功),返回成功,并清除缓存
  111. removeCache(params.getJobId());
  112. return thirdImportResult;
  113. }
  114. } catch (InterruptedException | ExecutionException e) {
  115. //打印报错信息
  116. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
  117. , String.format("检查历史失败异常 任务id:%s msg:%s", params.getJobId(), e.getMessage())
  118. );
  119. return GisSurveyThirdImportResult.fail(params);
  120. }
  121. }
  122. //进行中判断(未完成且未清除)
  123. if (previousFuture != null && !previousFuture.isDone() && !previousFuture.isCancelled()) {
  124. CommAsyncCache asyncCache = INFO_CACHE.get(params.getJobId());
  125. if (asyncCache != null)
  126. return GisSurveyThirdImportResult.inProgress(params,
  127. SUBTASK_CACHE.get(params.getJobId()),
  128. asyncCache.getStartTime(), asyncCache.getOperator()
  129. );
  130. }
  131. return null;
  132. }
  133. /**
  134. * 取消导入
  135. *
  136. * @param jobId 任务id
  137. * @return 取消结果
  138. */
  139. public int cancelImport(String jobId) {
  140. //如无缓存,则直接返回不存在
  141. if (!TASK_CACHE.containsKey(jobId)) return GisSurveyImportStatusEnum.NOT_EXISTS.getCode();
  142. //关闭检查任务
  143. if (stopTask(TASK_CACHE.get(jobId))) {
  144. removeCache(jobId);
  145. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
  146. , String.format(
  147. "第三方导入停止成功,清除缓存;任务id: %s"
  148. , jobId
  149. )
  150. );
  151. return GisSurveyImportStatusEnum.SUCCESS.getCode();
  152. }
  153. return GisSurveyImportStatusEnum.FAIL.getCode();
  154. }
  155. /**
  156. * 启动任务
  157. *
  158. * @param params 导入参数
  159. * @param inputStreams 文件输入流
  160. */
  161. private void startTask(GisSurveyThirdImportParams params, List<InputStream> inputStreams) {
  162. String jobId = params.getJobId();
  163. //获取已存在的任务
  164. ListenableFuture<GisSurveyThirdImportResult> previousFuture = TASK_CACHE.get(jobId);
  165. //已结束判断,删除缓存
  166. if (previousFuture != null && (previousFuture.isDone() || previousFuture.isCancelled()))
  167. removeCache(params.getJobId());
  168. //异步执行第三方导入查任务
  169. ListenableFuture<GisSurveyThirdImportResult> future = thirdImporter.thirdImportTask(params, inputStreams,
  170. //缓存子任务句柄
  171. subtask -> SUBTASK_CACHE.put(jobId, subtask),
  172. //删除子任务句柄
  173. subtaskSystemCheckId -> SUBTASK_CACHE.remove(jobId)
  174. );
  175. //任务超时
  176. taskScheduler.schedule(() -> {
  177. if (!future.isCancelled() && !future.isDone() && stopTask(future)) {
  178. removeCache(jobId);
  179. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
  180. , String.format(
  181. "第三方导入超时,成功停止任务;任务id: %s"
  182. , jobId
  183. )
  184. );
  185. }
  186. }, Instant.now().plusMillis(asyncTaskProperties.getThirdImportTimeout().toMillis()));
  187. //缓存任务句柄
  188. TASK_CACHE.put(jobId, future);
  189. //缓存时间
  190. INFO_CACHE.put(jobId, new CommAsyncCache(LocalDateTime.now(), params.getOperator()));
  191. }
  192. /**
  193. * 停止任务
  194. *
  195. * @param future 任务
  196. * @return 关闭状态
  197. */
  198. private Boolean stopTask(ListenableFuture<?> future) {
  199. //完成判断,完成删除缓存
  200. if (future.isCancelled() || future.isDone()) return true;
  201. //尝试清除任务
  202. return future.cancel(true);
  203. }
  204. /**
  205. * 清除缓存
  206. *
  207. * @param jobId 任务id
  208. */
  209. private void removeCache(String jobId) {
  210. TASK_CACHE.remove(jobId);
  211. SUBTASK_CACHE.remove(jobId);
  212. INFO_CACHE.remove(jobId);
  213. }
  214. }