123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- package com.shkpr.service.alambizplugin.bizservice;
- import com.global.base.log.LogLevelFlag;
- import com.global.base.log.LogPrintMgr;
- import com.shkpr.service.alambizplugin.apiparam.GisSurveyThirdImportParams;
- import com.shkpr.service.alambizplugin.commproperties.AsyncTaskProperties;
- import com.shkpr.service.alambizplugin.components.GisSurveyThirdImporter;
- import com.shkpr.service.alambizplugin.constants.GisSurveyImportStatusEnum;
- import com.shkpr.service.alambizplugin.constants.LogFlagBusiType;
- import com.shkpr.service.alambizplugin.dto.CommAsyncCache;
- import com.shkpr.service.alambizplugin.dto.GisSurveyThirdImportResult;
- import com.shkpr.service.alambizplugin.dto.GisSurveyThirdImportSubtask;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
- import org.springframework.stereotype.Component;
- import org.springframework.util.concurrent.ListenableFuture;
- import org.springframework.web.multipart.MultipartFile;
- import java.io.IOException;
- import java.io.InputStream;
- import java.time.Instant;
- import java.time.LocalDateTime;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- import java.util.Objects;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ExecutionException;
- /**
- * 第三方导入管理service
- *
- * @author 欧阳劲驰
- * @since 1.0.0
- */
- @Component
- public class GisSurveyThirdImportBizService {
- /**
- * 任务缓存
- */
- private final static Map<String, ListenableFuture<GisSurveyThirdImportResult>> TASK_CACHE = new ConcurrentHashMap<>();
- /**
- * 子任务缓存
- */
- private final static Map<String, GisSurveyThirdImportSubtask> SUBTASK_CACHE = new ConcurrentHashMap<>();
- /**
- * 开始时间缓存
- */
- private final static Map<String, CommAsyncCache> INFO_CACHE = new ConcurrentHashMap<>();
- /**
- * log
- */
- private final String mStrClassName;
- private final String mBizType;
- private final AsyncTaskProperties asyncTaskProperties;
- private final ThreadPoolTaskScheduler taskScheduler;
- private final GisSurveyThirdImporter thirdImporter;
- public GisSurveyThirdImportBizService(AsyncTaskProperties asyncTaskProperties
- , @Qualifier("timeThreadPoolTaskScheduler") ThreadPoolTaskScheduler taskScheduler
- , GisSurveyThirdImporter thirdImporter) {
- mStrClassName = "GisSurveySystemCheckBizService";
- mBizType = LogFlagBusiType.BUSI_GIS_SURVEY.toStrValue();
- this.asyncTaskProperties = asyncTaskProperties;
- this.taskScheduler = taskScheduler;
- this.thirdImporter = thirdImporter;
- }
- /**
- * 第三方导入
- *
- * @param params 第三方导入参数
- * @return 导入状态
- */
- public GisSurveyThirdImportResult thirdImport(GisSurveyThirdImportParams params) {
- //获取结果
- GisSurveyThirdImportResult result = getResult(params);
- if (result != null) return result;
- //获取文件流
- List<InputStream> inputStreams = new ArrayList<>();
- for (MultipartFile file : params.getFiles()) {
- try {
- inputStreams.add(file.getInputStream());
- } catch (IOException e) {
- return GisSurveyThirdImportResult.fail(params);
- }
- }
- //启动检查任务
- startTask(params, inputStreams);
- //返回进行中
- return GisSurveyThirdImportResult.inProgress(params, SUBTASK_CACHE.get(params.getJobId()),
- LocalDateTime.now(),
- params.getOperator()
- );
- }
- /**
- * 获取结果
- *
- * @param params 第三方导入参数
- * @return 导入结果
- */
- public GisSurveyThirdImportResult getResult(GisSurveyThirdImportParams params) {
- //获取已存在的任务
- ListenableFuture<GisSurveyThirdImportResult> previousFuture = TASK_CACHE.get(params.getJobId());
- //判断完成
- if (previousFuture != null && previousFuture.isDone()) {
- try {
- //获取结果
- GisSurveyThirdImportResult thirdImportResult = previousFuture.get();
- //如不忽略失败,且失败,则返回失败
- if (!params.getIgnoreFail() && Objects.equals(thirdImportResult.getImportStatus(), GisSurveyImportStatusEnum.FAIL.getCode()))
- return thirdImportResult;
- else if (Objects.equals(thirdImportResult.getImportStatus(), GisSurveyImportStatusEnum.SUCCESS.getCode()) ||
- Objects.equals(thirdImportResult.getImportStatus(), GisSurveyImportStatusEnum.DATA_ERROR.getCode())) {
- //如处理成功(导入成功或数据检查成功),返回成功,并清除缓存
- removeCache(params.getJobId());
- return thirdImportResult;
- }
- } catch (InterruptedException | ExecutionException e) {
- //打印报错信息
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
- , String.format("检查历史失败异常 任务id:%s msg:%s", params.getJobId(), e.getMessage())
- );
- return GisSurveyThirdImportResult.fail(params);
- }
- }
- //进行中判断(未完成且未清除)
- if (previousFuture != null && !previousFuture.isDone() && !previousFuture.isCancelled()) {
- CommAsyncCache asyncCache = INFO_CACHE.get(params.getJobId());
- if (asyncCache != null)
- return GisSurveyThirdImportResult.inProgress(params,
- SUBTASK_CACHE.get(params.getJobId()),
- asyncCache.getStartTime(), asyncCache.getOperator()
- );
- }
- return null;
- }
- /**
- * 取消导入
- *
- * @param jobId 任务id
- * @return 取消结果
- */
- public int cancelImport(String jobId) {
- //如无缓存,则直接返回不存在
- if (!TASK_CACHE.containsKey(jobId)) return GisSurveyImportStatusEnum.NOT_EXISTS.getCode();
- //关闭检查任务
- if (stopTask(TASK_CACHE.get(jobId))) {
- removeCache(jobId);
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
- , String.format(
- "第三方导入停止成功,清除缓存;任务id: %s"
- , jobId
- )
- );
- return GisSurveyImportStatusEnum.SUCCESS.getCode();
- }
- return GisSurveyImportStatusEnum.FAIL.getCode();
- }
- /**
- * 启动任务
- *
- * @param params 导入参数
- * @param inputStreams 文件输入流
- */
- private void startTask(GisSurveyThirdImportParams params, List<InputStream> inputStreams) {
- String jobId = params.getJobId();
- //获取已存在的任务
- ListenableFuture<GisSurveyThirdImportResult> previousFuture = TASK_CACHE.get(jobId);
- //已结束判断,删除缓存
- if (previousFuture != null && (previousFuture.isDone() || previousFuture.isCancelled()))
- removeCache(params.getJobId());
- //异步执行第三方导入查任务
- ListenableFuture<GisSurveyThirdImportResult> future = thirdImporter.thirdImportTask(params, inputStreams,
- //缓存子任务句柄
- subtask -> SUBTASK_CACHE.put(jobId, subtask),
- //删除子任务句柄
- subtaskSystemCheckId -> SUBTASK_CACHE.remove(jobId)
- );
- //任务超时
- taskScheduler.schedule(() -> {
- if (!future.isCancelled() && !future.isDone() && stopTask(future)) {
- removeCache(jobId);
- LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
- , String.format(
- "第三方导入超时,成功停止任务;任务id: %s"
- , jobId
- )
- );
- }
- }, Instant.now().plusMillis(asyncTaskProperties.getThirdImportTimeout().toMillis()));
- //缓存任务句柄
- TASK_CACHE.put(jobId, future);
- //缓存时间
- INFO_CACHE.put(jobId, new CommAsyncCache(LocalDateTime.now(), params.getOperator()));
- }
- /**
- * 停止任务
- *
- * @param future 任务
- * @return 关闭状态
- */
- private Boolean stopTask(ListenableFuture<?> future) {
- //完成判断,完成删除缓存
- if (future.isCancelled() || future.isDone()) return true;
- //尝试清除任务
- return future.cancel(true);
- }
- /**
- * 清除缓存
- *
- * @param jobId 任务id
- */
- private void removeCache(String jobId) {
- TASK_CACHE.remove(jobId);
- SUBTASK_CACHE.remove(jobId);
- INFO_CACHE.remove(jobId);
- }
- }
|