package com.shkpr.service.alambizplugin.bizservice; import com.fasterxml.jackson.core.type.TypeReference; import com.global.base.log.LogLevelFlag; import com.global.base.log.LogPrintMgr; import com.shkpr.service.alambizplugin.commproperties.AsyncTaskProperties; import com.shkpr.service.alambizplugin.components.AsyncResultManager; import com.shkpr.service.alambizplugin.components.GisSurveyThirdExporter; import com.shkpr.service.alambizplugin.constants.CommAsyncStatusEnum; import com.shkpr.service.alambizplugin.constants.FileTypeEnum; import com.shkpr.service.alambizplugin.constants.GisSurveConvertStatusEnum; import com.shkpr.service.alambizplugin.constants.LogFlagBusiType; import com.shkpr.service.alambizplugin.dbdao.services.intef.GisSurveyJobInfoService; import com.shkpr.service.alambizplugin.dto.CommAsyncResult; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import java.time.Duration; import java.time.Instant; import java.time.LocalDateTime; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; /** * 第三方导出管理service * * @author 欧阳劲驰 * @since 1.0.0 */ @Component public class GisSurveyThirdExportBizService { /** * 任务缓存 */ private final static Map>>> TASK_CACHE = new ConcurrentHashMap<>(); /** * 开始时间缓存 */ private final static Map TIME_CACHE = new ConcurrentHashMap<>(); /** * log */ private final String mStrClassName; private final String mBizType; private final AsyncTaskProperties asyncTaskProperties; private final AsyncResultManager asyncResultManager; private final ThreadPoolTaskScheduler taskScheduler; private final GisSurveyThirdExporter thirdExporter; private final GisSurveyJobInfoService jobInfoService; public GisSurveyThirdExportBizService(AsyncTaskProperties asyncTaskProperties, AsyncResultManager asyncResultManager , @Qualifier("timeThreadPoolTaskScheduler") ThreadPoolTaskScheduler taskScheduler , GisSurveyThirdExporter thirdExporter, GisSurveyJobInfoService jobInfoService) { mStrClassName = "GisSurveyThirdExportBizService"; mBizType = LogFlagBusiType.BUSI_GIS_SURVEY.toStrValue(); this.asyncTaskProperties = asyncTaskProperties; this.asyncResultManager = asyncResultManager; this.taskScheduler = taskScheduler; this.thirdExporter = thirdExporter; this.jobInfoService = jobInfoService; } /** * 第三方导出 * * @param jobId 任务id * @param fileType 导出文件类型 * @return 转换结果 */ public CommAsyncResult> thirdExport(String jobId, FileTypeEnum fileType) { //获取上次结果 CommAsyncResult> lastResult = getResult(jobId); //失败/进行中处理 if (lastResult != null && !Objects.equals(CommAsyncStatusEnum.SUCCESS.getCode(), lastResult.getStatus())) return lastResult; //文件结果flag final String FLAG = GisSurveyThirdExporter.RESULT_PREFIX + jobId; //获取文件结果 CommAsyncResult> fileResult = asyncResultManager.getResult(FLAG, new TypeReference>>() { }); //获取数据更新时间 LocalDateTime refreshTime = jobInfoService.findRefreshTimeByUid(jobId); //直接返回文件结果判断:结果\刷新时间\数据不为空,结果数据包含当前项,文件结果落后数据库 if (fileResult != null && refreshTime != null //文件结果包含数据库时间 && fileResult.getRefreshTimes() != null && fileResult.getRefreshTimes().containsKey(fileType.getName()) //文件结果数据包含当前项 && fileResult.getData() != null && fileResult.getData().containsKey(fileType.getName())) { //文件结果落后数据间隔 Duration lags = Duration.between(fileResult.getRefreshTimes().get(fileType.getName()), refreshTime); //超出间隔,直接返回 if (asyncTaskProperties.getThirdExportResultLag().compareTo(lags) >= 0) return fileResult; } //启动任务 startTask(jobId, fileType); //返回进行中 return CommAsyncResult.inProgress(jobId, LocalDateTime.now()); } /** * 获取结果 * * @param jobId 任务id * @return 结果 */ public CommAsyncResult> getResult(String jobId) { //获取已存在的任务 ListenableFuture>> previousFuture = TASK_CACHE.get(jobId); //判断完成 if (previousFuture != null && previousFuture.isDone()) { try { //获取结果 CommAsyncResult> result = previousFuture.get(); //如成功/失败,则直接返回 if (Objects.equals(result.getStatus(), CommAsyncStatusEnum.FAIL.getCode()) || Objects.equals(result.getStatus(), GisSurveConvertStatusEnum.SUCCESS.getCode())) removeCache(jobId); return result; } catch (InterruptedException | ExecutionException e) { //打印报错信息 LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName , String.format("查询转换导出异常 任务id:%s error:%s", jobId, e) ); return CommAsyncResult.fail(jobId); } } //进行中判断(未完成且未清除) if (previousFuture != null && !previousFuture.isDone() && !previousFuture.isCancelled()) return CommAsyncResult.inProgress(jobId, TIME_CACHE.get(jobId)); return null; } /** * 启动任务 * * @param jobId 任务id * @param fileType 导出文件类型 */ private void startTask(String jobId, FileTypeEnum fileType) { //获取已存在的任务 ListenableFuture>> previousFuture = TASK_CACHE.get(jobId); //已结束判断,删除缓存 if (previousFuture != null && (previousFuture.isDone() || previousFuture.isCancelled())) removeCache(jobId); //执行异步任务 ListenableFuture>> future = thirdExporter.thirdExportTask(jobId, fileType); //任务超时 taskScheduler.schedule(() -> { if (!future.isCancelled() && !future.isDone() && stopTask(future)) { removeCache(jobId); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName , String.format( "第三方导出超时,成功停止任务;任务id: %s" , jobId ) ); } }, Instant.now().plusMillis(asyncTaskProperties.getThirdExportTimeout().toMillis())); //缓存任务句柄 TASK_CACHE.put(jobId, future); //缓存时间 TIME_CACHE.put(jobId, LocalDateTime.now()); } /** * 停止任务 * * @param future 任务id * @return 关闭状态 */ private Boolean stopTask(ListenableFuture future) { //完成判断 if (future.isCancelled() || future.isDone()) return true; //尝试清除任务 return future.cancel(true); } /** * 清除缓存 * * @param jobId 任务id */ private void removeCache(String jobId) { TASK_CACHE.remove(jobId); TIME_CACHE.remove(jobId); } }