package com.shkpr.service.alambizplugin.bizservice; import com.global.base.log.LogLevelFlag; import com.global.base.log.LogPrintMgr; import com.shkpr.service.alambizplugin.apiparam.GisSurveyThirdImportParams; import com.shkpr.service.alambizplugin.commproperties.AsyncTaskProperties; import com.shkpr.service.alambizplugin.components.GisSurveyThirdImporter; import com.shkpr.service.alambizplugin.constants.GisSurveyImportStatusEnum; import com.shkpr.service.alambizplugin.constants.LogFlagBusiType; import com.shkpr.service.alambizplugin.dto.CommAsyncCache; import com.shkpr.service.alambizplugin.dto.GisSurveyThirdImportResult; import com.shkpr.service.alambizplugin.dto.GisSurveyThirdImportSubtask; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.web.multipart.MultipartFile; import java.io.IOException; import java.io.InputStream; import java.time.Instant; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; /** * 第三方导入管理service * * @author 欧阳劲驰 * @since 1.0.0 */ @Component public class GisSurveyThirdImportBizService { /** * 任务缓存 */ private final static Map> TASK_CACHE = new ConcurrentHashMap<>(); /** * 子任务缓存 */ private final static Map SUBTASK_CACHE = new ConcurrentHashMap<>(); /** * 开始时间缓存 */ private final static Map INFO_CACHE = new ConcurrentHashMap<>(); /** * log */ private final String mStrClassName; private final String mBizType; private final AsyncTaskProperties asyncTaskProperties; private final ThreadPoolTaskScheduler taskScheduler; private final GisSurveyThirdImporter thirdImporter; public GisSurveyThirdImportBizService(AsyncTaskProperties asyncTaskProperties , @Qualifier("timeThreadPoolTaskScheduler") ThreadPoolTaskScheduler taskScheduler , GisSurveyThirdImporter thirdImporter) { mStrClassName = "GisSurveySystemCheckBizService"; mBizType = LogFlagBusiType.BUSI_GIS_SURVEY.toStrValue(); this.asyncTaskProperties = asyncTaskProperties; this.taskScheduler = taskScheduler; this.thirdImporter = thirdImporter; } /** * 第三方导入 * * @param params 第三方导入参数 * @return 导入状态 */ public GisSurveyThirdImportResult thirdImport(GisSurveyThirdImportParams params) { //获取结果 GisSurveyThirdImportResult result = getResult(params); if (result != null) return result; //获取文件流 List inputStreams = new ArrayList<>(); for (MultipartFile file : params.getFiles()) { try { inputStreams.add(file.getInputStream()); } catch (IOException e) { return GisSurveyThirdImportResult.fail(params); } } //启动检查任务 startTask(params, inputStreams); //返回进行中 return GisSurveyThirdImportResult.inProgress(params, SUBTASK_CACHE.get(params.getJobId()), LocalDateTime.now(), params.getOperator() ); } /** * 获取结果 * * @param params 第三方导入参数 * @return 导入结果 */ public GisSurveyThirdImportResult getResult(GisSurveyThirdImportParams params) { //获取已存在的任务 ListenableFuture previousFuture = TASK_CACHE.get(params.getJobId()); //判断完成 if (previousFuture != null && previousFuture.isDone()) { try { //获取结果 GisSurveyThirdImportResult thirdImportResult = previousFuture.get(); //如不忽略失败,且失败,则返回失败 if (!params.getIgnoreFail() && Objects.equals(thirdImportResult.getImportStatus(), GisSurveyImportStatusEnum.FAIL.getCode())) return thirdImportResult; else if (Objects.equals(thirdImportResult.getImportStatus(), GisSurveyImportStatusEnum.SUCCESS.getCode()) || Objects.equals(thirdImportResult.getImportStatus(), GisSurveyImportStatusEnum.DATA_ERROR.getCode())) { //如处理成功(导入成功或数据检查成功),返回成功,并清除缓存 removeCache(params.getJobId()); return thirdImportResult; } } catch (InterruptedException | ExecutionException e) { //打印报错信息 LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName , String.format("检查历史失败异常 任务id:%s msg:%s", params.getJobId(), e.getMessage()) ); return GisSurveyThirdImportResult.fail(params); } } //进行中判断(未完成且未清除) if (previousFuture != null && !previousFuture.isDone() && !previousFuture.isCancelled()) { CommAsyncCache asyncCache = INFO_CACHE.get(params.getJobId()); if (asyncCache != null) return GisSurveyThirdImportResult.inProgress(params, SUBTASK_CACHE.get(params.getJobId()), asyncCache.getStartTime(), asyncCache.getOperator() ); } return null; } /** * 取消导入 * * @param jobId 任务id * @return 取消结果 */ public int cancelImport(String jobId) { //如无缓存,则直接返回不存在 if (!TASK_CACHE.containsKey(jobId)) return GisSurveyImportStatusEnum.NOT_EXISTS.getCode(); //关闭检查任务 if (stopTask(TASK_CACHE.get(jobId))) { removeCache(jobId); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName , String.format( "第三方导入停止成功,清除缓存;任务id: %s" , jobId ) ); return GisSurveyImportStatusEnum.SUCCESS.getCode(); } return GisSurveyImportStatusEnum.FAIL.getCode(); } /** * 启动任务 * * @param params 导入参数 * @param inputStreams 文件输入流 */ private void startTask(GisSurveyThirdImportParams params, List inputStreams) { String jobId = params.getJobId(); //获取已存在的任务 ListenableFuture previousFuture = TASK_CACHE.get(jobId); //已结束判断,删除缓存 if (previousFuture != null && (previousFuture.isDone() || previousFuture.isCancelled())) removeCache(params.getJobId()); //异步执行第三方导入查任务 ListenableFuture future = thirdImporter.thirdImportTask(params, inputStreams, //缓存子任务句柄 subtask -> SUBTASK_CACHE.put(jobId, subtask), //删除子任务句柄 subtaskSystemCheckId -> SUBTASK_CACHE.remove(jobId) ); //任务超时 taskScheduler.schedule(() -> { if (!future.isCancelled() && !future.isDone() && stopTask(future)) { removeCache(jobId); LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName , String.format( "第三方导入超时,成功停止任务;任务id: %s" , jobId ) ); } }, Instant.now().plusMillis(asyncTaskProperties.getThirdImportTimeout().toMillis())); //缓存任务句柄 TASK_CACHE.put(jobId, future); //缓存时间 INFO_CACHE.put(jobId, new CommAsyncCache(LocalDateTime.now(), params.getOperator())); } /** * 停止任务 * * @param future 任务 * @return 关闭状态 */ private Boolean stopTask(ListenableFuture future) { //完成判断,完成删除缓存 if (future.isCancelled() || future.isDone()) return true; //尝试清除任务 return future.cancel(true); } /** * 清除缓存 * * @param jobId 任务id */ private void removeCache(String jobId) { TASK_CACHE.remove(jobId); SUBTASK_CACHE.remove(jobId); INFO_CACHE.remove(jobId); } }