GisSurveyThirdExportBizService.java 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package com.shkpr.service.alambizplugin.bizservice;
  2. import com.fasterxml.jackson.core.type.TypeReference;
  3. import com.global.base.log.LogLevelFlag;
  4. import com.global.base.log.LogPrintMgr;
  5. import com.shkpr.service.alambizplugin.commproperties.AsyncTaskProperties;
  6. import com.shkpr.service.alambizplugin.components.AsyncResultManager;
  7. import com.shkpr.service.alambizplugin.components.GisSurveyThirdExporter;
  8. import com.shkpr.service.alambizplugin.constants.CommAsyncStatusEnum;
  9. import com.shkpr.service.alambizplugin.constants.FileTypeEnum;
  10. import com.shkpr.service.alambizplugin.constants.GisSurveConvertStatusEnum;
  11. import com.shkpr.service.alambizplugin.constants.LogFlagBusiType;
  12. import com.shkpr.service.alambizplugin.dbdao.services.intef.GisSurveyJobInfoService;
  13. import com.shkpr.service.alambizplugin.dto.CommAsyncResult;
  14. import org.springframework.beans.factory.annotation.Qualifier;
  15. import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
  16. import org.springframework.stereotype.Component;
  17. import org.springframework.util.concurrent.ListenableFuture;
  18. import java.time.Duration;
  19. import java.time.Instant;
  20. import java.time.LocalDateTime;
  21. import java.util.Map;
  22. import java.util.Objects;
  23. import java.util.concurrent.ConcurrentHashMap;
  24. import java.util.concurrent.ExecutionException;
  25. /**
  26. * 第三方导出管理service
  27. *
  28. * @author 欧阳劲驰
  29. * @since 1.0.0
  30. */
  31. @Component
  32. public class GisSurveyThirdExportBizService {
  33. /**
  34. * 任务缓存
  35. */
  36. private final static Map<String, ListenableFuture<CommAsyncResult<Map<String, String>>>> TASK_CACHE = new ConcurrentHashMap<>();
  37. /**
  38. * 开始时间缓存
  39. */
  40. private final static Map<String, LocalDateTime> TIME_CACHE = new ConcurrentHashMap<>();
  41. /**
  42. * log
  43. */
  44. private final String mStrClassName;
  45. private final String mBizType;
  46. private final AsyncTaskProperties asyncTaskProperties;
  47. private final AsyncResultManager asyncResultManager;
  48. private final ThreadPoolTaskScheduler taskScheduler;
  49. private final GisSurveyThirdExporter thirdExporter;
  50. private final GisSurveyJobInfoService jobInfoService;
  51. public GisSurveyThirdExportBizService(AsyncTaskProperties asyncTaskProperties, AsyncResultManager asyncResultManager
  52. , @Qualifier("timeThreadPoolTaskScheduler") ThreadPoolTaskScheduler taskScheduler
  53. , GisSurveyThirdExporter thirdExporter, GisSurveyJobInfoService jobInfoService) {
  54. mStrClassName = "GisSurveyThirdExportBizService";
  55. mBizType = LogFlagBusiType.BUSI_GIS_SURVEY.toStrValue();
  56. this.asyncTaskProperties = asyncTaskProperties;
  57. this.asyncResultManager = asyncResultManager;
  58. this.taskScheduler = taskScheduler;
  59. this.thirdExporter = thirdExporter;
  60. this.jobInfoService = jobInfoService;
  61. }
  62. /**
  63. * 第三方导出
  64. *
  65. * @param jobId 任务id
  66. * @param fileType 导出文件类型
  67. * @return 转换结果
  68. */
  69. public CommAsyncResult<Map<String, String>> thirdExport(String jobId, FileTypeEnum fileType) {
  70. //获取上次结果
  71. CommAsyncResult<Map<String, String>> lastResult = getResult(jobId);
  72. //失败/进行中处理
  73. if (lastResult != null && !Objects.equals(CommAsyncStatusEnum.SUCCESS.getCode(), lastResult.getStatus()))
  74. return lastResult;
  75. //文件结果flag
  76. final String FLAG = GisSurveyThirdExporter.RESULT_PREFIX + jobId;
  77. //获取文件结果
  78. CommAsyncResult<Map<String, String>> fileResult = asyncResultManager.getResult(FLAG, new TypeReference<CommAsyncResult<Map<String, String>>>() {
  79. });
  80. //获取数据更新时间
  81. LocalDateTime refreshTime = jobInfoService.findRefreshTimeByUid(jobId);
  82. //直接返回文件结果判断:结果\刷新时间\数据不为空,结果数据包含当前项,文件结果落后数据库
  83. if (fileResult != null && refreshTime != null
  84. //文件结果包含数据库时间
  85. && fileResult.getRefreshTimes() != null && fileResult.getRefreshTimes().containsKey(fileType.getName())
  86. //文件结果数据包含当前项
  87. && fileResult.getData() != null && fileResult.getData().containsKey(fileType.getName())) {
  88. //文件结果落后数据间隔
  89. Duration lags = Duration.between(fileResult.getRefreshTimes().get(fileType.getName()), refreshTime);
  90. //超出间隔,直接返回
  91. if (asyncTaskProperties.getThirdExportResultLag().compareTo(lags) >= 0) return fileResult;
  92. }
  93. //启动任务
  94. startTask(jobId, fileType);
  95. //返回进行中
  96. return CommAsyncResult.inProgress(jobId, LocalDateTime.now());
  97. }
  98. /**
  99. * 获取结果
  100. *
  101. * @param jobId 任务id
  102. * @return 结果
  103. */
  104. public CommAsyncResult<Map<String, String>> getResult(String jobId) {
  105. //获取已存在的任务
  106. ListenableFuture<CommAsyncResult<Map<String, String>>> previousFuture = TASK_CACHE.get(jobId);
  107. //判断完成
  108. if (previousFuture != null && previousFuture.isDone()) {
  109. try {
  110. //获取结果
  111. CommAsyncResult<Map<String, String>> result = previousFuture.get();
  112. //如成功/失败,则直接返回
  113. if (Objects.equals(result.getStatus(), CommAsyncStatusEnum.FAIL.getCode()) ||
  114. Objects.equals(result.getStatus(), GisSurveConvertStatusEnum.SUCCESS.getCode()))
  115. removeCache(jobId);
  116. return result;
  117. } catch (InterruptedException | ExecutionException e) {
  118. //打印报错信息
  119. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
  120. , String.format("查询转换导出异常 任务id:%s error:%s", jobId, e)
  121. );
  122. return CommAsyncResult.fail(jobId);
  123. }
  124. }
  125. //进行中判断(未完成且未清除)
  126. if (previousFuture != null && !previousFuture.isDone() && !previousFuture.isCancelled())
  127. return CommAsyncResult.inProgress(jobId, TIME_CACHE.get(jobId));
  128. return null;
  129. }
  130. /**
  131. * 启动任务
  132. *
  133. * @param jobId 任务id
  134. * @param fileType 导出文件类型
  135. */
  136. private void startTask(String jobId, FileTypeEnum fileType) {
  137. //获取已存在的任务
  138. ListenableFuture<CommAsyncResult<Map<String, String>>> previousFuture = TASK_CACHE.get(jobId);
  139. //已结束判断,删除缓存
  140. if (previousFuture != null && (previousFuture.isDone() || previousFuture.isCancelled())) removeCache(jobId);
  141. //执行异步任务
  142. ListenableFuture<CommAsyncResult<Map<String, String>>> future = thirdExporter.thirdExportTask(jobId, fileType);
  143. //任务超时
  144. taskScheduler.schedule(() -> {
  145. if (!future.isCancelled() && !future.isDone() && stopTask(future)) {
  146. removeCache(jobId);
  147. LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
  148. , String.format(
  149. "第三方导出超时,成功停止任务;任务id: %s"
  150. , jobId
  151. )
  152. );
  153. }
  154. }, Instant.now().plusMillis(asyncTaskProperties.getThirdExportTimeout().toMillis()));
  155. //缓存任务句柄
  156. TASK_CACHE.put(jobId, future);
  157. //缓存时间
  158. TIME_CACHE.put(jobId, LocalDateTime.now());
  159. }
  160. /**
  161. * 停止任务
  162. *
  163. * @param future 任务id
  164. * @return 关闭状态
  165. */
  166. private Boolean stopTask(ListenableFuture<?> future) {
  167. //完成判断
  168. if (future.isCancelled() || future.isDone()) return true;
  169. //尝试清除任务
  170. return future.cancel(true);
  171. }
  172. /**
  173. * 清除缓存
  174. *
  175. * @param jobId 任务id
  176. */
  177. private void removeCache(String jobId) {
  178. TASK_CACHE.remove(jobId);
  179. TIME_CACHE.remove(jobId);
  180. }
  181. }