|
@@ -5,9 +5,11 @@ import com.global.base.log.LogPrintMgr;
|
|
|
import com.shkpr.service.alambizplugin.apiparam.GisSurveyThirdImportParams;
|
|
|
import com.shkpr.service.alambizplugin.commproperties.AsyncTaskProperties;
|
|
|
import com.shkpr.service.alambizplugin.components.GisSurveyThirdImporter;
|
|
|
+import com.shkpr.service.alambizplugin.constants.CommAsyncStatusEnum;
|
|
|
import com.shkpr.service.alambizplugin.constants.GisSurveyImportStatusEnum;
|
|
|
import com.shkpr.service.alambizplugin.constants.LogFlagBusiType;
|
|
|
import com.shkpr.service.alambizplugin.dto.CommAsyncCache;
|
|
|
+import com.shkpr.service.alambizplugin.dto.CommAsyncResult;
|
|
|
import com.shkpr.service.alambizplugin.dto.GisSurveyThirdImportResult;
|
|
|
import com.shkpr.service.alambizplugin.dto.GisSurveyThirdImportSubtask;
|
|
|
import org.springframework.beans.factory.annotation.Qualifier;
|
|
@@ -36,9 +38,13 @@ import java.util.concurrent.ExecutionException;
|
|
|
@Component
|
|
|
public class GisSurveyThirdImportBizService {
|
|
|
/**
|
|
|
- * 任务缓存
|
|
|
+ * 导入任务缓存
|
|
|
*/
|
|
|
- private final static Map<String, ListenableFuture<GisSurveyThirdImportResult>> TASK_CACHE = new ConcurrentHashMap<>();
|
|
|
+ private final static Map<String, ListenableFuture<GisSurveyThirdImportResult>> IMPORT_TASK_CACHE = new ConcurrentHashMap<>();
|
|
|
+ /**
|
|
|
+ * 提交任务缓存
|
|
|
+ */
|
|
|
+ private final static Map<String, ListenableFuture<CommAsyncResult<Boolean>>> COMMIT_TASK_CACHE = new ConcurrentHashMap<>();
|
|
|
/**
|
|
|
* 子任务缓存
|
|
|
*/
|
|
@@ -47,6 +53,16 @@ public class GisSurveyThirdImportBizService {
|
|
|
* 开始时间缓存
|
|
|
*/
|
|
|
private final static Map<String, CommAsyncCache> INFO_CACHE = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 导入前缀
|
|
|
+ */
|
|
|
+ private final static String IMPORT_PREFIX = "import-";
|
|
|
+ /**
|
|
|
+ * 提交前缀
|
|
|
+ */
|
|
|
+ private final static String COMMIT_PREFIX = "commit-";
|
|
|
+
|
|
|
/**
|
|
|
* log
|
|
|
*/
|
|
@@ -76,7 +92,7 @@ public class GisSurveyThirdImportBizService {
|
|
|
*/
|
|
|
public GisSurveyThirdImportResult thirdImport(GisSurveyThirdImportParams params) {
|
|
|
//获取结果
|
|
|
- GisSurveyThirdImportResult result = getResult(params);
|
|
|
+ GisSurveyThirdImportResult result = getImportResult(params);
|
|
|
if (result != null) return result;
|
|
|
|
|
|
//获取文件流
|
|
@@ -89,41 +105,61 @@ public class GisSurveyThirdImportBizService {
|
|
|
}
|
|
|
}
|
|
|
//启动检查任务
|
|
|
- startTask(params, inputStreams);
|
|
|
+ startImport(params, inputStreams);
|
|
|
//返回进行中
|
|
|
- return GisSurveyThirdImportResult.inProgress(params, SUBTASK_CACHE.get(params.getJobId()),
|
|
|
+ return GisSurveyThirdImportResult.inProgress(params, SUBTASK_CACHE.get(IMPORT_PREFIX + params.getJobId()),
|
|
|
LocalDateTime.now(),
|
|
|
params.getOperator()
|
|
|
);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 获取结果
|
|
|
+ * 提交导入
|
|
|
+ *
|
|
|
+ * @param jobId 任务id
|
|
|
+ * @param operator 操作人
|
|
|
+ * @return 提交状态
|
|
|
+ */
|
|
|
+ public CommAsyncResult<Boolean> commitImport(String jobId, String operator) {
|
|
|
+ //获取结果
|
|
|
+ CommAsyncResult<Boolean> result = getCommitResult(jobId);
|
|
|
+ if (result != null) return result;
|
|
|
+
|
|
|
+ //启动提交任务
|
|
|
+ startCommit(jobId, operator);
|
|
|
+ //返回进行中
|
|
|
+ return CommAsyncResult.inProgress(jobId,
|
|
|
+ LocalDateTime.now(),
|
|
|
+ operator
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取导入结果
|
|
|
*
|
|
|
* @param params 第三方导入参数
|
|
|
* @return 导入结果
|
|
|
*/
|
|
|
- public GisSurveyThirdImportResult getResult(GisSurveyThirdImportParams params) {
|
|
|
+ public GisSurveyThirdImportResult getImportResult(GisSurveyThirdImportParams params) {
|
|
|
//获取已存在的任务
|
|
|
- ListenableFuture<GisSurveyThirdImportResult> previousFuture = TASK_CACHE.get(params.getJobId());
|
|
|
+ ListenableFuture<GisSurveyThirdImportResult> previousFuture = IMPORT_TASK_CACHE.get(IMPORT_PREFIX + params.getJobId());
|
|
|
//判断完成
|
|
|
if (previousFuture != null && previousFuture.isDone()) {
|
|
|
try {
|
|
|
//获取结果
|
|
|
GisSurveyThirdImportResult thirdImportResult = previousFuture.get();
|
|
|
- //如不忽略失败,且失败,则返回失败
|
|
|
- if (!params.getIgnoreFail() && Objects.equals(thirdImportResult.getImportStatus(), GisSurveyImportStatusEnum.FAIL.getCode()))
|
|
|
- return thirdImportResult;
|
|
|
- else if (Objects.equals(thirdImportResult.getImportStatus(), GisSurveyImportStatusEnum.SUCCESS.getCode()) ||
|
|
|
+ //如处理成功/失败,返回结果,并清除缓存
|
|
|
+ if (Objects.equals(thirdImportResult.getImportStatus(), GisSurveyImportStatusEnum.FAIL.getCode()) ||
|
|
|
+ Objects.equals(thirdImportResult.getImportStatus(), GisSurveyImportStatusEnum.SUCCESS.getCode()) ||
|
|
|
Objects.equals(thirdImportResult.getImportStatus(), GisSurveyImportStatusEnum.DATA_ERROR.getCode())) {
|
|
|
- //如处理成功(导入成功或数据检查成功),返回成功,并清除缓存
|
|
|
- removeCache(params.getJobId());
|
|
|
+ removeCache(IMPORT_PREFIX + params.getJobId());
|
|
|
return thirdImportResult;
|
|
|
}
|
|
|
} catch (InterruptedException | ExecutionException e) {
|
|
|
//打印报错信息
|
|
|
LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
|
|
|
- , String.format("检查历史失败异常 任务id:%s msg:%s", params.getJobId(), e.getMessage())
|
|
|
+ , String.format("检查历史异常 任务id:%s msg:%s", params.getJobId(), e.getMessage())
|
|
|
);
|
|
|
return GisSurveyThirdImportResult.fail(params);
|
|
|
}
|
|
@@ -131,10 +167,10 @@ public class GisSurveyThirdImportBizService {
|
|
|
|
|
|
//进行中判断(未完成且未清除)
|
|
|
if (previousFuture != null && !previousFuture.isDone() && !previousFuture.isCancelled()) {
|
|
|
- CommAsyncCache asyncCache = INFO_CACHE.get(params.getJobId());
|
|
|
+ CommAsyncCache asyncCache = INFO_CACHE.get(IMPORT_PREFIX + params.getJobId());
|
|
|
if (asyncCache != null)
|
|
|
return GisSurveyThirdImportResult.inProgress(params,
|
|
|
- SUBTASK_CACHE.get(params.getJobId()),
|
|
|
+ SUBTASK_CACHE.get(IMPORT_PREFIX + params.getJobId()),
|
|
|
asyncCache.getStartTime(), asyncCache.getOperator()
|
|
|
);
|
|
|
}
|
|
@@ -143,6 +179,49 @@ public class GisSurveyThirdImportBizService {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
+ * 获取提交结果
|
|
|
+ *
|
|
|
+ * @param jobId 任务id
|
|
|
+ * @return 提交结果
|
|
|
+ */
|
|
|
+ private CommAsyncResult<Boolean> getCommitResult(String jobId) {
|
|
|
+ //获取已存在的任务
|
|
|
+ ListenableFuture<CommAsyncResult<Boolean>> previousFuture = COMMIT_TASK_CACHE.get(COMMIT_PREFIX + jobId);
|
|
|
+ //判断完成
|
|
|
+ if (previousFuture != null && previousFuture.isDone()) {
|
|
|
+ try {
|
|
|
+ //获取结果
|
|
|
+ CommAsyncResult<Boolean> commitResult = previousFuture.get();
|
|
|
+ //如处理成功/失败,返回结果,并清除缓存
|
|
|
+ if (Objects.equals(commitResult.getStatus(), CommAsyncStatusEnum.FAIL.getCode()) ||
|
|
|
+ Objects.equals(commitResult.getStatus(), CommAsyncStatusEnum.SUCCESS.getCode())) {
|
|
|
+ removeCache(COMMIT_PREFIX + jobId);
|
|
|
+ return commitResult;
|
|
|
+ }
|
|
|
+ } catch (InterruptedException | ExecutionException e) {
|
|
|
+ //打印报错信息
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_ERROR, mBizType, mStrClassName
|
|
|
+ , String.format("检查历史异常 任务id:%s msg:%s", jobId, e.getMessage())
|
|
|
+ );
|
|
|
+ return CommAsyncResult.fail(jobId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //进行中判断(未完成且未清除)
|
|
|
+ if (previousFuture != null && !previousFuture.isDone() && !previousFuture.isCancelled()) {
|
|
|
+ CommAsyncCache asyncCache = INFO_CACHE.get(COMMIT_PREFIX + jobId);
|
|
|
+ if (asyncCache != null)
|
|
|
+ return CommAsyncResult.inProgress(jobId,
|
|
|
+ asyncCache.getStartTime(),
|
|
|
+ asyncCache.getOperator()
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
* 取消导入
|
|
|
*
|
|
|
* @param jobId 任务id
|
|
@@ -150,10 +229,11 @@ public class GisSurveyThirdImportBizService {
|
|
|
*/
|
|
|
public int cancelImport(String jobId) {
|
|
|
//如无缓存,则直接返回不存在
|
|
|
- if (!TASK_CACHE.containsKey(jobId)) return GisSurveyImportStatusEnum.NOT_EXISTS.getCode();
|
|
|
+ if (!IMPORT_TASK_CACHE.containsKey(IMPORT_PREFIX + jobId))
|
|
|
+ return GisSurveyImportStatusEnum.NOT_EXISTS.getCode();
|
|
|
//关闭检查任务
|
|
|
- if (stopTask(TASK_CACHE.get(jobId))) {
|
|
|
- removeCache(jobId);
|
|
|
+ if (stopTask(IMPORT_TASK_CACHE.get(IMPORT_PREFIX + jobId))) {
|
|
|
+ removeCache(IMPORT_PREFIX + jobId);
|
|
|
|
|
|
LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
|
|
|
, String.format(
|
|
@@ -167,29 +247,29 @@ public class GisSurveyThirdImportBizService {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 启动任务
|
|
|
+ * 启动导入
|
|
|
*
|
|
|
* @param params 导入参数
|
|
|
* @param inputStreams 文件输入流
|
|
|
*/
|
|
|
- private void startTask(GisSurveyThirdImportParams params, List<InputStream> inputStreams) {
|
|
|
+ private void startImport(GisSurveyThirdImportParams params, List<InputStream> inputStreams) {
|
|
|
String jobId = params.getJobId();
|
|
|
//获取已存在的任务
|
|
|
- ListenableFuture<GisSurveyThirdImportResult> previousFuture = TASK_CACHE.get(jobId);
|
|
|
+ ListenableFuture<GisSurveyThirdImportResult> previousFuture = IMPORT_TASK_CACHE.get(IMPORT_PREFIX + jobId);
|
|
|
//已结束判断,删除缓存
|
|
|
if (previousFuture != null && (previousFuture.isDone() || previousFuture.isCancelled()))
|
|
|
- removeCache(params.getJobId());
|
|
|
+ removeCache(IMPORT_PREFIX + params.getJobId());
|
|
|
//异步执行第三方导入查任务
|
|
|
ListenableFuture<GisSurveyThirdImportResult> future = thirdImporter.thirdImportTask(params, inputStreams,
|
|
|
//缓存子任务句柄
|
|
|
- subtask -> SUBTASK_CACHE.put(jobId, subtask),
|
|
|
+ subtask -> SUBTASK_CACHE.put(IMPORT_PREFIX + jobId, subtask),
|
|
|
//删除子任务句柄
|
|
|
- subtaskSystemCheckId -> SUBTASK_CACHE.remove(jobId)
|
|
|
+ subtaskSystemCheckId -> SUBTASK_CACHE.remove(IMPORT_PREFIX + jobId)
|
|
|
);
|
|
|
//任务超时
|
|
|
taskScheduler.schedule(() -> {
|
|
|
if (!future.isCancelled() && !future.isDone() && stopTask(future)) {
|
|
|
- removeCache(jobId);
|
|
|
+ removeCache(IMPORT_PREFIX + jobId);
|
|
|
LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
|
|
|
, String.format(
|
|
|
"第三方导入超时,成功停止任务;任务id: %s"
|
|
@@ -199,9 +279,41 @@ public class GisSurveyThirdImportBizService {
|
|
|
}
|
|
|
}, Instant.now().plusMillis(asyncTaskProperties.getThirdImportTimeout().toMillis()));
|
|
|
//缓存任务句柄
|
|
|
- TASK_CACHE.put(jobId, future);
|
|
|
+ IMPORT_TASK_CACHE.put(IMPORT_PREFIX + jobId, future);
|
|
|
+ //缓存时间
|
|
|
+ INFO_CACHE.put(IMPORT_PREFIX + jobId, new CommAsyncCache(LocalDateTime.now(), params.getOperator()));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 启动提交
|
|
|
+ *
|
|
|
+ * @param jobId 任务id
|
|
|
+ * @param operator 操作人
|
|
|
+ */
|
|
|
+ private void startCommit(String jobId, String operator) {
|
|
|
+ //获取已存在的任务
|
|
|
+ ListenableFuture<CommAsyncResult<Boolean>> previousFuture = COMMIT_TASK_CACHE.get(COMMIT_PREFIX + jobId);
|
|
|
+ //已结束判断,删除缓存
|
|
|
+ if (previousFuture != null && (previousFuture.isDone() || previousFuture.isCancelled()))
|
|
|
+ removeCache(COMMIT_PREFIX + jobId);
|
|
|
+ //异步执行第三方导入查任务
|
|
|
+ ListenableFuture<CommAsyncResult<Boolean>> future = thirdImporter.commitImportTask(jobId, operator);
|
|
|
+ //任务超时
|
|
|
+ taskScheduler.schedule(() -> {
|
|
|
+ if (!future.isCancelled() && !future.isDone() && stopTask(future)) {
|
|
|
+ removeCache(COMMIT_PREFIX + jobId);
|
|
|
+ LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
|
|
|
+ , String.format(
|
|
|
+ "提交导入超时,成功停止任务;任务id: %s"
|
|
|
+ , jobId
|
|
|
+ )
|
|
|
+ );
|
|
|
+ }
|
|
|
+ }, Instant.now().plusMillis(asyncTaskProperties.getCommitImportTimeout().toMillis()));
|
|
|
+ //缓存任务句柄
|
|
|
+ COMMIT_TASK_CACHE.put(COMMIT_PREFIX + jobId, future);
|
|
|
//缓存时间
|
|
|
- INFO_CACHE.put(jobId, new CommAsyncCache(LocalDateTime.now(), params.getOperator()));
|
|
|
+ INFO_CACHE.put(COMMIT_PREFIX + jobId, new CommAsyncCache(LocalDateTime.now(), operator));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -220,11 +332,12 @@ public class GisSurveyThirdImportBizService {
|
|
|
/**
|
|
|
* 清除缓存
|
|
|
*
|
|
|
- * @param jobId 任务id
|
|
|
+ * @param key 缓存key
|
|
|
*/
|
|
|
- private void removeCache(String jobId) {
|
|
|
- TASK_CACHE.remove(jobId);
|
|
|
- SUBTASK_CACHE.remove(jobId);
|
|
|
- INFO_CACHE.remove(jobId);
|
|
|
+ private void removeCache(String key) {
|
|
|
+ IMPORT_TASK_CACHE.remove(key);
|
|
|
+ COMMIT_TASK_CACHE.remove(key);
|
|
|
+ SUBTASK_CACHE.remove(key);
|
|
|
+ INFO_CACHE.remove(key);
|
|
|
}
|
|
|
}
|