123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- package com.shkpr.service.alambizplugin.bizservice;
- import com.global.base.log.LogLevelFlag;
- import com.global.base.log.LogPrintMgr;
- import com.shkpr.service.alambizplugin.apiparam.GisSurveyThirdImportParams;
- import com.shkpr.service.alambizplugin.components.GisSurveyThirdImporter;
- import com.shkpr.service.alambizplugin.constants.GisSurveyImportStatusEnum;
- import com.shkpr.service.alambizplugin.constants.LogFlagBusiType;
- import com.shkpr.service.alambizplugin.dto.GisSurveyThirdImportResult;
- import com.shkpr.service.alambizplugin.dto.GisSurveyThirdImportSubtask;
- import org.springframework.stereotype.Component;
- import org.springframework.util.concurrent.ListenableFuture;
- import org.springframework.web.multipart.MultipartFile;
- import java.io.IOException;
- import java.io.InputStream;
- import java.time.Duration;
- import java.time.LocalDateTime;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import java.util.Objects;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ExecutionException;
- import java.util.stream.Collectors;
- /**
- * 第三方导入管理service
- *
- * @author 欧阳劲驰
- * @since 1.0.0
- */
- @Component
- public class GisSurveyThirdImportBizService {
- /**
- * 任务缓存
- */
- private final static Map<String, ListenableFuture<GisSurveyThirdImportResult>> TASK_CACHE = new ConcurrentHashMap<>();
- /**
- * 子任务缓存
- */
- private final static Map<String, GisSurveyThirdImportSubtask> SUBTASK_CACHE = new ConcurrentHashMap<>();
- /**
- * 开始时间缓存
- */
- private final static Map<String, LocalDateTime> TIME_CACHE = new ConcurrentHashMap<>();
- /**
- * log
- */
- private final String mStrClassName;
- private final String mBizType;
- private final GisSurveyThirdImporter thirdImporter;
- public GisSurveyThirdImportBizService(GisSurveyThirdImporter thirdImporter) {
- mStrClassName = "GisSurveySystemCheckBizService";
- mBizType = LogFlagBusiType.BUSI_GIS_SURVEY.toStrValue();
- this.thirdImporter = thirdImporter;
- }
- /**
- * 第三方导入
- *
- * @param params 第三方导入参数
- * @return 导入状态
- */
- public GisSurveyThirdImportResult thirdImport(GisSurveyThirdImportParams params) {
- //获取已存在的任务
- ListenableFuture<GisSurveyThirdImportResult> previousFuture = TASK_CACHE.get(params.getJobId());
- //判断完成
- if (previousFuture != null && previousFuture.isDone()) {
- try {
- //获取结果
- GisSurveyThirdImportResult thirdImportResult = previousFuture.get();
- //如不忽略失败,且失败,则返回失败
- if (!params.getIgnoreFail() && Objects.equals(thirdImportResult.getImportStatus(), GisSurveyImportStatusEnum.FAIL.getCode()))
- return thirdImportResult;
- else if (Objects.equals(thirdImportResult.getImportStatus(), GisSurveyImportStatusEnum.SUCCESS.getCode())) {
- //如成功,返回成功,并清除缓存
- removeCache(params.getJobId());
- return thirdImportResult;
- }
- } catch (InterruptedException | ExecutionException e) {
- //打印报错信息(不太可能走到这)
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
- , String.format("检查历史失败异常 任务id:%s msg:%s", params.getJobId(), e.getMessage())
- );
- }
- }
- //系统检查子任务
- GisSurveyThirdImportSubtask importSubtask = SUBTASK_CACHE.get(params.getJobId());
- //进行中判断(未完成且未清除)
- if (previousFuture != null && !previousFuture.isDone() && !previousFuture.isCancelled())
- return GisSurveyThirdImportResult.inProgress(params, importSubtask, TIME_CACHE.get(params.getJobId()));
- //获取文件流
- List<InputStream> inputStreams = new ArrayList<>();
- for (MultipartFile file : params.getFiles()) {
- try {
- inputStreams.add(file.getInputStream());
- } catch (IOException e) {
- return GisSurveyThirdImportResult.fail(params);
- }
- }
- //启动检查任务
- startTask(params, inputStreams);
- //返回进行中
- return GisSurveyThirdImportResult.inProgress(params, importSubtask, LocalDateTime.now());
- }
- /**
- * 取消检查
- *
- * @param jobId 任务id
- * @return 取消结果
- */
- public int cancelCheck(String jobId) {
- //如无缓存,则直接返回不存在
- if (!TASK_CACHE.containsKey(jobId)) return GisSurveyImportStatusEnum.NOT_EXISTS.getCode();
- //关闭检查任务
- return stopTask(jobId) ? GisSurveyImportStatusEnum.SUCCESS.getCode() : GisSurveyImportStatusEnum.FAIL.getCode();
- }
- /**
- * 过期任务
- * <p>用于检查和使任务过期</p>
- */
- public void expireResult(Duration ttl) {
- //获取超时的id
- List<String> jobIds = TIME_CACHE.entrySet().stream()
- .filter(entry ->
- Duration.between(entry.getValue(), LocalDateTime.now()).compareTo(ttl) > 0
- )
- .map(Map.Entry::getKey)
- .collect(Collectors.toList());
- //停止超时的任务并删除任务缓存
- for (String jobid : jobIds) {
- //如任务不存在,则删除时间缓存
- if (!TASK_CACHE.containsKey(jobid)) TIME_CACHE.remove(jobid);
- //停用缓存
- if (stopTask(jobid)) TIME_CACHE.remove(jobid);
- else {
- //打印报错信息
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
- , String.format(
- "过期任务停止失败 ttl:%s 任务id:%s"
- , ttl
- , jobid
- )
- );
- }
- }
- }
- /**
- * 启动任务
- *
- * @param params 导入参数
- * @param inputStreams 文件输入流
- */
- private void startTask(GisSurveyThirdImportParams params, List<InputStream> inputStreams) {
- String jobId = params.getJobId();
- //获取已存在的任务
- ListenableFuture<GisSurveyThirdImportResult> previousFuture = TASK_CACHE.get(jobId);
- //已结束判断,删除缓存
- if (previousFuture != null && (previousFuture.isDone() || previousFuture.isCancelled())) {
- removeCache(params.getJobId());
- }
- //异步执行第三方导入查任务
- ListenableFuture<GisSurveyThirdImportResult> checkFuture = thirdImporter.thirdImportTask(params, inputStreams,
- //缓存子任务句柄
- subtask -> SUBTASK_CACHE.put(jobId, subtask),
- //删除子任务句柄
- subtaskSystemCheckId -> SUBTASK_CACHE.remove(jobId)
- );
- //缓存任务句柄
- TASK_CACHE.put(jobId, checkFuture);
- //缓存时间
- TIME_CACHE.put(jobId, LocalDateTime.now());
- }
- /**
- * 停止任务
- *
- * @param jobId 任务id
- * @return 关闭状态
- */
- private Boolean stopTask(String jobId) {
- ListenableFuture<GisSurveyThirdImportResult> future = TASK_CACHE.get(jobId);
- //完成判断,完成删除缓存
- if (future.isCancelled() || future.isDone()) {
- removeCache(jobId);
- return true;
- }
- //尝试清除任务
- boolean cancel = future.cancel(true);
- //清除成功,删除缓存
- if (cancel) {
- removeCache(jobId);
- return true;
- }
- return false;
- }
- /**
- * 清除缓存
- *
- * @param jobId 任务id
- */
- private void removeCache(String jobId) {
- TASK_CACHE.remove(jobId);
- SUBTASK_CACHE.remove(jobId);
- TIME_CACHE.remove(jobId);
- }
- }
|