123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- package com.shkpr.service.alambizplugin.bizservice;
- import com.fasterxml.jackson.core.type.TypeReference;
- import com.global.base.log.LogLevelFlag;
- import com.global.base.log.LogPrintMgr;
- import com.shkpr.service.alambizplugin.commproperties.AsyncTaskProperties;
- import com.shkpr.service.alambizplugin.components.AsyncResultManager;
- import com.shkpr.service.alambizplugin.components.GisSurveyThirdExporter;
- import com.shkpr.service.alambizplugin.constants.CommAsyncStatusEnum;
- import com.shkpr.service.alambizplugin.constants.FileTypeEnum;
- import com.shkpr.service.alambizplugin.constants.GisSurveConvertStatusEnum;
- import com.shkpr.service.alambizplugin.constants.LogFlagBusiType;
- import com.shkpr.service.alambizplugin.dbdao.services.intef.GisSurveyJobInfoService;
- import com.shkpr.service.alambizplugin.dto.CommAsyncResult;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
- import org.springframework.stereotype.Component;
- import org.springframework.util.concurrent.ListenableFuture;
- import java.time.Duration;
- import java.time.Instant;
- import java.time.LocalDateTime;
- import java.util.Map;
- import java.util.Objects;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ExecutionException;
- /**
- * 第三方导出管理service
- *
- * @author 欧阳劲驰
- * @since 1.0.0
- */
- @Component
- public class GisSurveyThirdExportBizService {
- /**
- * 任务缓存
- */
- private final static Map<String, ListenableFuture<CommAsyncResult<Map<String, String>>>> TASK_CACHE = new ConcurrentHashMap<>();
- /**
- * 开始时间缓存
- */
- private final static Map<String, LocalDateTime> TIME_CACHE = new ConcurrentHashMap<>();
- /**
- * log
- */
- private final String mStrClassName;
- private final String mBizType;
- private final AsyncTaskProperties asyncTaskProperties;
- private final AsyncResultManager asyncResultManager;
- private final ThreadPoolTaskScheduler taskScheduler;
- private final GisSurveyThirdExporter thirdExporter;
- private final GisSurveyJobInfoService jobInfoService;
- public GisSurveyThirdExportBizService(AsyncTaskProperties asyncTaskProperties, AsyncResultManager asyncResultManager
- , @Qualifier("timeThreadPoolTaskScheduler") ThreadPoolTaskScheduler taskScheduler
- , GisSurveyThirdExporter thirdExporter, GisSurveyJobInfoService jobInfoService) {
- mStrClassName = "GisSurveyThirdExportBizService";
- mBizType = LogFlagBusiType.BUSI_GIS_SURVEY.toStrValue();
- this.asyncTaskProperties = asyncTaskProperties;
- this.asyncResultManager = asyncResultManager;
- this.taskScheduler = taskScheduler;
- this.thirdExporter = thirdExporter;
- this.jobInfoService = jobInfoService;
- }
- /**
- * 第三方导出
- *
- * @param jobId 任务id
- * @param fileType 导出文件类型
- * @return 转换结果
- */
- public CommAsyncResult<Map<String, String>> thirdExport(String jobId, FileTypeEnum fileType) {
- //获取上次结果
- CommAsyncResult<Map<String, String>> lastResult = getResult(jobId);
- //失败/进行中处理
- if (lastResult != null && !Objects.equals(CommAsyncStatusEnum.SUCCESS.getCode(), lastResult.getStatus()))
- return lastResult;
- //文件结果flag
- final String FLAG = GisSurveyThirdExporter.RESULT_PREFIX + jobId;
- //获取文件结果
- CommAsyncResult<Map<String, String>> fileResult = asyncResultManager.getResult(FLAG, new TypeReference<CommAsyncResult<Map<String, String>>>() {
- });
- //获取数据更新时间
- LocalDateTime refreshTime = jobInfoService.findRefreshTimeByUid(jobId);
- //直接返回文件结果判断:结果\刷新时间\数据不为空,结果数据包含当前项,文件结果落后数据库
- if (fileResult != null && refreshTime != null
- //文件结果包含数据库时间
- && fileResult.getRefreshTimes() != null && fileResult.getRefreshTimes().containsKey(fileType.getName())
- //文件结果数据包含当前项
- && fileResult.getData() != null && fileResult.getData().containsKey(fileType.getName())) {
- //文件结果落后数据间隔
- Duration lags = Duration.between(fileResult.getRefreshTimes().get(fileType.getName()), refreshTime);
- //超出间隔,直接返回
- if (asyncTaskProperties.getThirdExportResultLag().compareTo(lags) >= 0) return fileResult;
- }
- //启动任务
- startTask(jobId, fileType);
- //返回进行中
- return CommAsyncResult.inProgress(jobId, LocalDateTime.now());
- }
- /**
- * 获取结果
- *
- * @param jobId 任务id
- * @return 结果
- */
- public CommAsyncResult<Map<String, String>> getResult(String jobId) {
- //获取已存在的任务
- ListenableFuture<CommAsyncResult<Map<String, String>>> previousFuture = TASK_CACHE.get(jobId);
- //判断完成
- if (previousFuture != null && previousFuture.isDone()) {
- try {
- //获取结果
- CommAsyncResult<Map<String, String>> result = previousFuture.get();
- //如成功/失败,则直接返回
- if (Objects.equals(result.getStatus(), CommAsyncStatusEnum.FAIL.getCode()) ||
- Objects.equals(result.getStatus(), GisSurveConvertStatusEnum.SUCCESS.getCode()))
- removeCache(jobId);
- return result;
- } catch (InterruptedException | ExecutionException e) {
- //打印报错信息
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
- , String.format("查询转换导出异常 任务id:%s error:%s", jobId, e)
- );
- return CommAsyncResult.fail(jobId);
- }
- }
- //进行中判断(未完成且未清除)
- if (previousFuture != null && !previousFuture.isDone() && !previousFuture.isCancelled())
- return CommAsyncResult.inProgress(jobId, TIME_CACHE.get(jobId));
- return null;
- }
- /**
- * 启动任务
- *
- * @param jobId 任务id
- * @param fileType 导出文件类型
- */
- private void startTask(String jobId, FileTypeEnum fileType) {
- //获取已存在的任务
- ListenableFuture<CommAsyncResult<Map<String, String>>> previousFuture = TASK_CACHE.get(jobId);
- //已结束判断,删除缓存
- if (previousFuture != null && (previousFuture.isDone() || previousFuture.isCancelled())) removeCache(jobId);
- //执行异步任务
- ListenableFuture<CommAsyncResult<Map<String, String>>> future = thirdExporter.thirdExportTask(jobId, fileType);
- //任务超时
- taskScheduler.schedule(() -> {
- if (!future.isCancelled() && !future.isDone() && stopTask(future)) {
- removeCache(jobId);
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
- , String.format(
- "第三方导出超时,成功停止任务;任务id: %s"
- , jobId
- )
- );
- }
- }, Instant.now().plusMillis(asyncTaskProperties.getThirdExportTimeout().toMillis()));
- //缓存任务句柄
- TASK_CACHE.put(jobId, future);
- //缓存时间
- TIME_CACHE.put(jobId, LocalDateTime.now());
- }
- /**
- * 停止任务
- *
- * @param future 任务id
- * @return 关闭状态
- */
- private Boolean stopTask(ListenableFuture<?> future) {
- //完成判断
- if (future.isCancelled() || future.isDone()) return true;
- //尝试清除任务
- return future.cancel(true);
- }
- /**
- * 清除缓存
- *
- * @param jobId 任务id
- */
- private void removeCache(String jobId) {
- TASK_CACHE.remove(jobId);
- TIME_CACHE.remove(jobId);
- }
- }
|