Jelajahi Sumber

并行流增加中断响应

欧阳劲驰 1 bulan lalu
induk
melakukan
433e4b7f2f

+ 80 - 51
src/main/java/com/shkpr/service/alambizplugin/components/checker/DuplicatePointsFinder.java

@@ -18,6 +18,7 @@ import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckResultDetail;
 import com.shkpr.service.alambizplugin.dto.GisSurveyThirdImportElement;
 import com.shkpr.service.alambizplugin.dto.GisSurveyThirdImportResultDetail;
 import com.shkpr.service.alambizplugin.dto.TypeDefine;
+import com.shkpr.service.alambizplugin.exception.UncheckedInterruptedException;
 import org.apache.commons.lang3.StringUtils;
 import org.locationtech.jts.geom.Coordinate;
 import org.springframework.scheduling.annotation.Async;
@@ -81,36 +82,50 @@ public class DuplicatePointsFinder {
         LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName, "开始执行寻找重复点   ======>");
         long begin = System.currentTimeMillis();
 
-        //根据code度分组
-        Map<String, List<Map<String, String>>> keyToPoints = points.parallelStream()
-                .collect(Collectors.groupingBy(point ->
-                                StringUtils.defaultIfBlank(
-                                        ThirdImportTemplateUtils.getValue(
-                                                point, pointLayerTemplates,
-                                                GisMetadataDefine.TYPE_KINE.POINT,
-                                                GisSurveyExcelDefine.TEMPLATE.NO
-                                        ),
-                                        ""
-                                ),
-                        Collectors.toList())
-                );
-        //响应中断
-        if (Thread.interrupted()) throw new InterruptedException();
-        //并行流寻找重复点号
-        List<GisSurveyThirdImportElement> groupElements = keyToPoints.entrySet().parallelStream()
-                //过滤组内大于1
-                .filter(group -> StringUtils.isNotBlank(group.getKey()) && group.getValue().size() > 1)
-                .map(entry -> new GisSurveyThirdImportElement(GisMetadataDefine.TYPE_KINE.POINT, entry.getKey(), null, null))
-                .collect(Collectors.toList());
+        try {
+            //根据code度分组
+            Map<String, List<Map<String, String>>> keyToPoints = points.parallelStream()
+                    //响应中断
+                    .peek(it -> {
+                        if (Thread.currentThread().isInterrupted())
+                            throw new UncheckedInterruptedException(new InterruptedException());
+                    })
+                    .collect(Collectors.groupingBy(point ->
+                                    StringUtils.defaultIfBlank(
+                                            ThirdImportTemplateUtils.getValue(
+                                                    point, pointLayerTemplates,
+                                                    GisMetadataDefine.TYPE_KINE.POINT,
+                                                    GisSurveyExcelDefine.TEMPLATE.NO
+                                            ),
+                                            ""
+                                    ),
+                            Collectors.toList())
+                    );
+            //响应中断
+            if (Thread.interrupted()) throw new InterruptedException();
+            //并行流寻找重复点号
+            List<GisSurveyThirdImportElement> groupElements = keyToPoints.entrySet().parallelStream()
+                    //响应中断
+                    .peek(it -> {
+                        if (Thread.currentThread().isInterrupted())
+                            throw new UncheckedInterruptedException(new InterruptedException());
+                    })
+                    //过滤组内大于1
+                    .filter(group -> StringUtils.isNotBlank(group.getKey()) && group.getValue().size() > 1)
+                    .map(entry -> new GisSurveyThirdImportElement(GisMetadataDefine.TYPE_KINE.POINT, entry.getKey(), null, null))
+                    .collect(Collectors.toList());
 
-        long end = System.currentTimeMillis();
-        LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
-                , String.format(
-                        "结束执行寻找重复点,用时(毫秒):%d"
-                        , (end - begin)
-                )
-        );
-        return new AsyncResult<>(new GisSurveyThirdImportResultDetail<>(true, groupElements));
+            long end = System.currentTimeMillis();
+            LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName
+                    , String.format(
+                            "结束执行寻找重复点,用时(毫秒):%d"
+                            , (end - begin)
+                    )
+            );
+            return new AsyncResult<>(new GisSurveyThirdImportResultDetail<>(true, groupElements));
+        } catch (UncheckedInterruptedException e) {
+            throw e.getCause();
+        }
     }
 
     /**
@@ -132,29 +147,43 @@ public class DuplicatePointsFinder {
                 String.format("开始执行寻找重复点 经度精度:%s 纬度精度:%s   ======>", lonScale, latScale));
         long begin = System.currentTimeMillis();
 
-        //根据经纬度分组
-        Map<String, List<GisSurveyLayerApplyPoint>> keyToPoints = points.parallelStream()
-                .collect(Collectors.groupingBy(point -> {
-                    //抹去经纬度精度,并设置为分组条件
-                    Coordinate coordinate = point.getGis().getCoordinate();
-                    BigDecimal bdLon = new BigDecimal(Double.toString(coordinate.getX())).setScale(lonScale, RoundingMode.DOWN);
-                    BigDecimal bdLat = new BigDecimal(Double.toString(coordinate.getY())).setScale(latScale, RoundingMode.DOWN);
-                    return String.format("%s%s%s", bdLon.toPlainString(), bdLat.toPlainString(), point.getElevation());
-                }, Collectors.toList()));
-        //响应中断
-        if (Thread.interrupted()) throw new InterruptedException();
-        //并行流寻找重复点
-        List<List<GisSurveySystemCheckElement>> groupElements = keyToPoints.values().parallelStream()
-                //过滤组内大于1
-                .filter(group -> group.size() > 1)
-                //转为返回元素
-                .map(group -> group.stream()
-                        .map(point -> BeanUtil.copy(point, GisSurveySystemCheckElement.class))
-                        .collect(Collectors.toList())
-                )
-                .collect(Collectors.toList());
+        try {
+            //根据经纬度分组
+            Map<String, List<GisSurveyLayerApplyPoint>> keyToPoints = points.parallelStream()
+                    //响应中断
+                    .peek(it -> {
+                        if (Thread.currentThread().isInterrupted())
+                            throw new UncheckedInterruptedException(new InterruptedException());
+                    })
+                    .collect(Collectors.groupingBy(point -> {
+                        //抹去经纬度精度,并设置为分组条件
+                        Coordinate coordinate = point.getGis().getCoordinate();
+                        BigDecimal bdLon = new BigDecimal(Double.toString(coordinate.getX())).setScale(lonScale, RoundingMode.DOWN);
+                        BigDecimal bdLat = new BigDecimal(Double.toString(coordinate.getY())).setScale(latScale, RoundingMode.DOWN);
+                        return String.format("%s%s%s", bdLon.toPlainString(), bdLat.toPlainString(), point.getElevation());
+                    }, Collectors.toList()));
+            //响应中断
+            if (Thread.interrupted()) throw new InterruptedException();
+            //并行流寻找重复点
+            List<List<GisSurveySystemCheckElement>> groupElements = keyToPoints.values().parallelStream()
+                    //响应中断
+                    .peek(it -> {
+                        if (Thread.currentThread().isInterrupted())
+                            throw new UncheckedInterruptedException(new InterruptedException());
+                    })
+                    //过滤组内大于1
+                    .filter(group -> group.size() > 1)
+                    //转为返回元素
+                    .map(group -> group.stream()
+                            .map(point -> BeanUtil.copy(point, GisSurveySystemCheckElement.class))
+                            .collect(Collectors.toList())
+                    )
+                    .collect(Collectors.toList());
 
-        return new AsyncResult<>(createResult(groupElements, systemCheckId, begin));
+            return new AsyncResult<>(createResult(groupElements, systemCheckId, begin));
+        } catch (UncheckedInterruptedException e) {
+            throw e.getCause();
+        }
     }
 
     /**

+ 42 - 27
src/main/java/com/shkpr/service/alambizplugin/components/checker/IsolatedPointsFinder.java

@@ -13,6 +13,7 @@ import com.shkpr.service.alambizplugin.dto.GisSurveyLayerApplyPoint;
 import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckElement;
 import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckId;
 import com.shkpr.service.alambizplugin.dto.GisSurveySystemCheckResultDetail;
+import com.shkpr.service.alambizplugin.exception.UncheckedInterruptedException;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.scheduling.annotation.AsyncResult;
@@ -62,39 +63,53 @@ public class IsolatedPointsFinder {
         LogPrintMgr.getInstance().printLogMsg(LogLevelFlag.LOG_INFO, mBizType, mStrClassName, "开始执行寻找孤立点========>");
         long begin = System.currentTimeMillis();
 
-        //线为空,点不为空,则全孤立点
-        if (lines.isEmpty() && !points.isEmpty()) {
+        try {
+            //线为空,点不为空,则全孤立点
+            if (lines.isEmpty() && !points.isEmpty()) {
+                List<GisSurveySystemCheckElement> elements = points.parallelStream()
+                        //响应中断
+                        .peek(it -> {
+                            if (Thread.currentThread().isInterrupted())
+                                throw new UncheckedInterruptedException(new InterruptedException());
+                        })
+                        //转为返回元素
+                        .map(point -> BeanUtil.copy(point, GisSurveySystemCheckElement.class))
+                        .collect(Collectors.toList());
+                return new AsyncResult<>(createResult(elements, systemCheckId, begin));
+            }
+            //线为空,点也为空,则无孤立点
+            if (lines.isEmpty()) return new AsyncResult<>(createResult(Collections.emptyList(), systemCheckId, begin));
+            //点为空,则无孤立点
+            if (points.isEmpty()) return new AsyncResult<>(createResult(Collections.emptyList(), systemCheckId, begin));
+            //计算预期最大容量,避免频繁扩容
+            int expectedMaxSize = (int) (((long) lines.size() << 1) / .75f + 1);
+            //联通点code
+            Set<String> connectedPoints = new HashSet<>(expectedMaxSize);
+            //遍历所有线,标记参与的点
+            for (GisSurveyLayerApplyLine line : lines) {
+                //响应中断
+                if (Thread.interrupted()) throw new InterruptedException();
+                //上下游节点判空
+                if (StringUtils.isBlank(line.getUpNode()) || StringUtils.isBlank(line.getDownNode())) continue;
+                connectedPoints.add(line.getUpNode());
+                connectedPoints.add(line.getDownNode());
+            }
+            //使用并行流找出孤立点
             List<GisSurveySystemCheckElement> elements = points.parallelStream()
+                    //响应中断
+                    .peek(it -> {
+                        if (Thread.currentThread().isInterrupted())
+                            throw new UncheckedInterruptedException(new InterruptedException());
+                    })
                     //转为返回元素
                     .map(point -> BeanUtil.copy(point, GisSurveySystemCheckElement.class))
+                    //过滤连通点
+                    .filter(element -> Objects.nonNull(element) && !connectedPoints.contains(element.getCode()))
                     .collect(Collectors.toList());
             return new AsyncResult<>(createResult(elements, systemCheckId, begin));
+        } catch (UncheckedInterruptedException e) {
+            throw e.getCause();
         }
-        //线为空,点也为空,则无孤立点
-        if (lines.isEmpty()) return new AsyncResult<>(createResult(Collections.emptyList(), systemCheckId, begin));
-        //点为空,则无孤立点
-        if (points.isEmpty()) return new AsyncResult<>(createResult(Collections.emptyList(), systemCheckId, begin));
-        //计算预期最大容量,避免频繁扩容
-        int expectedMaxSize = (int) (((long) lines.size() << 1) / .75f + 1);
-        //联通点code
-        Set<String> connectedPoints = new HashSet<>(expectedMaxSize);
-        //遍历所有线,标记参与的点
-        for (GisSurveyLayerApplyLine line : lines) {
-            //响应中断
-            if (Thread.interrupted()) throw new InterruptedException();
-            //上下游节点判空
-            if (StringUtils.isBlank(line.getUpNode()) || StringUtils.isBlank(line.getDownNode())) continue;
-            connectedPoints.add(line.getUpNode());
-            connectedPoints.add(line.getDownNode());
-        }
-        //使用并行流找出孤立点
-        List<GisSurveySystemCheckElement> elements = points.parallelStream()
-                //转为返回元素
-                .map(point -> BeanUtil.copy(point, GisSurveySystemCheckElement.class))
-                //过滤连通点
-                .filter(element -> Objects.nonNull(element) && !connectedPoints.contains(element.getCode()))
-                .collect(Collectors.toList());
-        return new AsyncResult<>(createResult(elements, systemCheckId, begin));
     }
 
     /**