package com.shkpr.service.alambizplugin.bizservice; import com.global.base.log.LogLevelFlag; import com.global.base.log.LogPrintMgr; import com.shkpr.service.alambizplugin.apiparam.GisSurveyThirdImportParams; import com.shkpr.service.alambizplugin.components.GisSurveyThirdImporter; import com.shkpr.service.alambizplugin.constants.GisSurveyImportStatusEnum; import com.shkpr.service.alambizplugin.constants.LogFlagBusiType; import com.shkpr.service.alambizplugin.dto.GisSurveyThirdImportResult; import com.shkpr.service.alambizplugin.dto.GisSurveyThirdImportSubtask; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.web.multipart.MultipartFile; import java.io.IOException; import java.io.InputStream; import java.time.Duration; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; /** * 第三方导入管理service * * @author 欧阳劲驰 * @since 1.0.0 */ @Component public class GisSurveyThirdImportBizService { /** * 任务缓存 */ private final static Map> TASK_CACHE = new ConcurrentHashMap<>(); /** * 子任务缓存 */ private final static Map SUBTASK_CACHE = new ConcurrentHashMap<>(); /** * 开始时间缓存 */ private final static Map TIME_CACHE = new ConcurrentHashMap<>(); /** * log */ private final String mStrClassName; private final String mBizType; private final GisSurveyThirdImporter thirdImporter; public GisSurveyThirdImportBizService(GisSurveyThirdImporter thirdImporter) { mStrClassName = "GisSurveySystemCheckBizService"; mBizType = LogFlagBusiType.BUSI_GIS_SURVEY.toStrValue(); this.thirdImporter = thirdImporter; } /** * 第三方导入 * * @param params 第三方导入参数 * @return 导入状态 */ public GisSurveyThirdImportResult thirdImport(GisSurveyThirdImportParams params) { //获取已存在的任务 ListenableFuture previousFuture = TASK_CACHE.get(params.getJobId()); //判断完成 if (previousFuture != null && previousFuture.isDone()) { try { //获取结果 GisSurveyThirdImportResult thirdImportResult = previousFuture.get(); //如不忽略失败,且失败,则返回失败 if (!params.getIgnoreFail() && Objects.equals(thirdImportResult.getImportStatus(), GisSurveyImportStatusEnum.FAIL.getCode())) return thirdImportResult; else if (Objects.equals(thirdImportResult.getImportStatus(), GisSurveyImportStatusEnum.SUCCESS.getCode())) { //如成功,返回成功,并清除缓存 removeCache(params.getJobId()); return thirdImportResult; } } catch (InterruptedException | ExecutionException e) { //打印报错信息(不太可能走到这) LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName , String.format("检查历史失败异常 任务id:%s msg:%s", params.getJobId(), e.getMessage()) ); } } //系统检查子任务 GisSurveyThirdImportSubtask importSubtask = SUBTASK_CACHE.get(params.getJobId()); //进行中判断(未完成且未清除) if (previousFuture != null && !previousFuture.isDone() && !previousFuture.isCancelled()) return GisSurveyThirdImportResult.inProgress(params, importSubtask, TIME_CACHE.get(params.getJobId())); //获取文件流 List inputStreams = new ArrayList<>(); for (MultipartFile file : params.getFiles()) { try { inputStreams.add(file.getInputStream()); } catch (IOException e) { return GisSurveyThirdImportResult.fail(params); } } //启动检查任务 startTask(params, inputStreams); //返回进行中 return GisSurveyThirdImportResult.inProgress(params, importSubtask, LocalDateTime.now()); } /** * 取消检查 * * @param jobId 任务id * @return 取消结果 */ public int cancelCheck(String jobId) { //如无缓存,则直接返回不存在 if (!TASK_CACHE.containsKey(jobId)) return GisSurveyImportStatusEnum.NOT_EXISTS.getCode(); //关闭检查任务 return stopTask(jobId) ? GisSurveyImportStatusEnum.SUCCESS.getCode() : GisSurveyImportStatusEnum.FAIL.getCode(); } /** * 过期任务 *

用于检查和使任务过期

*/ public void expireResult(Duration ttl) { //获取超时的id List jobIds = TIME_CACHE.entrySet().stream() .filter(entry -> Duration.between(entry.getValue(), LocalDateTime.now()).compareTo(ttl) > 0 ) .map(Map.Entry::getKey) .collect(Collectors.toList()); //停止超时的任务并删除任务缓存 for (String jobid : jobIds) { //如任务不存在,则删除时间缓存 if (!TASK_CACHE.containsKey(jobid)) TIME_CACHE.remove(jobid); //停用缓存 if (stopTask(jobid)) TIME_CACHE.remove(jobid); else { //打印报错信息 LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName , String.format( "过期任务停止失败 ttl:%s 任务id:%s" , ttl , jobid ) ); } } } /** * 启动任务 * * @param params 导入参数 * @param inputStreams 文件输入流 */ private void startTask(GisSurveyThirdImportParams params, List inputStreams) { String jobId = params.getJobId(); //获取已存在的任务 ListenableFuture previousFuture = TASK_CACHE.get(jobId); //已结束判断,删除缓存 if (previousFuture != null && (previousFuture.isDone() || previousFuture.isCancelled())) { removeCache(params.getJobId()); } //异步执行第三方导入查任务 ListenableFuture checkFuture = thirdImporter.thirdImportTask(params, inputStreams, //缓存子任务句柄 subtask -> SUBTASK_CACHE.put(jobId, subtask), //删除子任务句柄 subtaskSystemCheckId -> SUBTASK_CACHE.remove(jobId) ); //缓存任务句柄 TASK_CACHE.put(jobId, checkFuture); //缓存时间 TIME_CACHE.put(jobId, LocalDateTime.now()); } /** * 停止任务 * * @param jobId 任务id * @return 关闭状态 */ private Boolean stopTask(String jobId) { ListenableFuture future = TASK_CACHE.get(jobId); //完成判断,完成删除缓存 if (future.isCancelled() || future.isDone()) { removeCache(jobId); return true; } //尝试清除任务 boolean cancel = future.cancel(true); //清除成功,删除缓存 if (cancel) { removeCache(jobId); return true; } return false; } /** * 清除缓存 * * @param jobId 任务id */ private void removeCache(String jobId) { TASK_CACHE.remove(jobId); SUBTASK_CACHE.remove(jobId); TIME_CACHE.remove(jobId); } }