瀏覽代碼

伊宁版本更新:
a.新增sse服务器代码逻辑:全局以会话id为key存储各自会话的通讯链接,心跳包以及登出指令都以当前会话id为基础单位发送指令
b.修改伊宁mainView首页,增加跳转参数clientId用以sse通讯
c.新增sse请求地址获取接口
d.增加weblog前端日志接口
e.更改伊宁逻辑,修改登录缓存逻辑
f.优化sse链接代码

1037015548@qq.com 6 天之前
父節點
當前提交
f61f9a57c0

+ 125 - 98
ruoyi-admin/src/main/java/com/ruoyi/web/controller/system/SseServlet.java

@@ -26,9 +26,7 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
 
 import javax.annotation.PostConstruct;
-import javax.servlet.AsyncContext;
-import javax.servlet.ServletException;
-import javax.servlet.ServletInputStream;
+import javax.servlet.*;
 import javax.servlet.annotation.WebServlet;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
@@ -49,98 +47,126 @@ public class SseServlet extends BaseController {
     private static final Logger log = LoggerFactory.getLogger(SseServlet.class);
 
     //reqid,连接对象
-    public static final  ConcurrentHashMap<String,ConcurrentHashMap<String, PrintWriter>> countConnections = new ConcurrentHashMap<>();
+    // reqid -> (clientId -> AsyncContext)
+    private static final ConcurrentHashMap<String, ConcurrentHashMap<String, AsyncContext>> countConnections = new ConcurrentHashMap<>();
+    // reqid -> 心跳Timer
+    private static final ConcurrentHashMap<String, Timer> timerArray = new ConcurrentHashMap<>();
 
-    public static final ConcurrentHashMap<String,Timer> timerArray = new ConcurrentHashMap<>();
 
     @GetMapping("/sse/subscribe")
     @CrossOrigin(origins = "*")
-    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
-        response.setContentType("text/event-stream");
-        response.setCharacterEncoding("UTF-8");
-        response.setHeader("Cache-Control", "no-cache");
-        response.setHeader("Connection", "keep-alive");
+    public void subscribe(HttpServletRequest request, HttpServletResponse response){
+        try {
+            response.setContentType("text/event-stream");
+            response.setCharacterEncoding("UTF-8");
+            response.setHeader("Cache-Control", "no-cache");
+            response.setHeader("Connection", "keep-alive");
 
-        String clientId = request.getParameter("clientId");
-        if (clientId == null || clientId.isEmpty()) {
-            response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-            return;
-        }
-        String reqid = clientId.split("___")[0];
-        if (reqid == null || reqid.isEmpty()) {
-            response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-            return;
-        }
+            String clientId = request.getParameter("clientId");
+            if (clientId == null || clientId.isEmpty()) {
+                response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+                return;
+            }
+            String reqid = clientId.split("___")[0];
+            if (reqid == null || reqid.isEmpty()) {
+                response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+                return;
+            }
 
-        AsyncContext asyncContext = request.startAsync();
-        PrintWriter writer = response.getWriter();
+            countConnections.putIfAbsent(reqid, new ConcurrentHashMap<>());
+            ConcurrentHashMap<String, AsyncContext> group = countConnections.get(reqid);
 
-        // 1. 确保连接唯一,重复时主动关闭老连接并清理资源
-        countConnections.putIfAbsent(reqid, new ConcurrentHashMap<>());
-        ConcurrentHashMap<String, PrintWriter> connections = countConnections.get(reqid);
+            // Start async context
+            AsyncContext asyncContext = request.startAsync();
+            asyncContext.setTimeout(0); // 永不超时
 
-        PrintWriter oldWriter = connections.put(clientId, writer);
-        if (oldWriter != null) {
-            try { oldWriter.close(); } catch (Exception e) { }
-            log.info("重复连接已回收: clientId={}", clientId);
-        }
+            // 并发下,需同步保证同一clientId只有一个活跃连接
+            synchronized (group) {
+                AsyncContext oldCtx = group.put(clientId, asyncContext);
+                if (oldCtx != null) {
+                    try {
+                        PrintWriter oldWriter = oldCtx.getResponse().getWriter();
+                        oldWriter.print("event: close\ndata: duplicate connection closed\n\n");
+                        oldWriter.flush();
+                    } catch (Exception e) {
+                        // ignore
+                    }
+                    try {
+                        oldCtx.complete();
+                    } catch (Exception ignore) {
+                    }
+                    // 日志
+                    log.info("重复连接已回收: clientId={}", clientId);
+                }
+            }
+
+            PrintWriter writer = asyncContext.getResponse().getWriter();
+            writer.print("data: Connected\n\n");
+            writer.flush();
 
-        // 2. 持久化reqid对应的timer和心跳
-        timerArray.computeIfAbsent(reqid, k -> {
-            Timer timer = new Timer();
-            timer.schedule(new TimerTask() {
+            // 添加连接监听
+            asyncContext.addListener(new AsyncListener() {
                 @Override
-                public void run() {
-                    broadcastHeart(reqid);
+                public void onComplete(AsyncEvent event) {
+                    cleanup(reqid, clientId);
                 }
-            }, 2000, 2000);
-            return timer;
-        });
 
-        asyncContext.setTimeout(0);
+                @Override
+                public void onTimeout(AsyncEvent event) {
+                    cleanup(reqid, clientId);
+                }
 
-        // 3. 发送连接成功消息
-        try {
-            writer.print("data: Connected\n\n");
-            writer.flush();
-        } catch (Exception e) {
-            // 连接失效立即清理(理论上不会)
-            cleanupConnection(reqid, clientId);
-            asyncContext.complete();
-            return;
-        }
+                @Override
+                public void onError(AsyncEvent event) {
+                    cleanup(reqid, clientId);
+                }
 
-        asyncContext.addListener(new javax.servlet.AsyncListener() {
-            @Override public void onComplete(javax.servlet.AsyncEvent asyncEvent) { cleanupConnection(reqid, clientId); }
-            @Override public void onTimeout(javax.servlet.AsyncEvent asyncEvent) { cleanupConnection(reqid, clientId); }
-            @Override public void onError(javax.servlet.AsyncEvent asyncEvent) { cleanupConnection(reqid, clientId); }
-            @Override public void onStartAsync(javax.servlet.AsyncEvent asyncEvent) { log.info("开启链接: {}", clientId); }
-        });
+                @Override
+                public void onStartAsync(AsyncEvent event) {
+                }
+            });
 
-        log.info("建立链接成功: clientId={}", clientId);
+            // 心跳定时
+            Timer timer = timerArray.computeIfAbsent(reqid, key -> {
+                log.info("为组{}新建心跳timer", reqid);
+                Timer t = new Timer();
+                t.schedule(new TimerTask() {
+                    @Override
+                    public void run() {
+                        broadcastHeart(reqid);
+                    }
+                }, 2000, 2000);
+                return t;
+            });
+
+            log.info("建立链接成功: clientId={}", clientId);
+        }catch(Exception ex){
+            log.error("sse推送异常"+ex.getLocalizedMessage());
+        }
     }
 
     // 工具:清理连接和Timer
-    private static void cleanupConnection(String reqid, String clientId) {
-        try {
-            ConcurrentHashMap<String, PrintWriter> connections = countConnections.get(reqid);
-            if (connections != null) {
-                PrintWriter w = connections.remove(clientId);
-                if (w != null) {
-                    try { w.close(); } catch (Exception e) { }
+    private static void cleanup(String reqid, String clientId) {
+        ConcurrentHashMap<String, AsyncContext> group = countConnections.get(reqid);
+        if (group != null) {
+            synchronized (group) {
+                AsyncContext ctx = group.remove(clientId);
+                if (ctx != null) {
+                    try { ctx.getResponse().getWriter().close(); } catch (Exception ignore) {}
+                    try { ctx.complete(); } catch (Exception ignore) {}
                 }
-                // 如果分组都清空,移除reqid和timer,彻底释放内存
-                if (connections.isEmpty()) {
+                // 组内没人了,清理定时器
+                if (group.isEmpty()) {
                     countConnections.remove(reqid);
-                    Timer t = timerArray.remove(reqid);
-                    if (t != null) t.cancel();
+                    Timer timer = timerArray.remove(reqid);
+                    if (timer != null) { timer.cancel(); }
+                    log.info("组{}已清空,timer被销毁", reqid);
                 }
             }
-        } catch (Exception ex) { }
+        }
     }
 
     @PostMapping("/sse/weblog")
-    @ResponseBody
     @CrossOrigin(origins = "*") // 允许所有来源
     protected void doGetLog(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
         response.setContentType("text/event-stream");
@@ -158,7 +184,11 @@ public class SseServlet extends BaseController {
         try {
             String body = getJsonBodyStr(request);
             JSONArray jsonArray = JSONArray.parseArray(body);
-            broadcastLogout(jsonArray);
+            if(jsonArray!=null&&jsonArray.size()>0) {
+                for (Object object : jsonArray) {
+                    broadcastLogout(object.toString());
+                }
+            }
             return AjaxResult.success();
         }catch(Exception ex){
             return AjaxResult.error();
@@ -213,43 +243,40 @@ public class SseServlet extends BaseController {
 
     //TODO 广播
     //TODO 心跳广播
-    public void broadcastHeart(String reqid) {
-        ConcurrentHashMap<String, PrintWriter> writerMap = countConnections.get(reqid);
-        if (writerMap == null || writerMap.isEmpty()) return;
-
-        for (Map.Entry<String, PrintWriter> entry : writerMap.entrySet()) {
-            PrintWriter writer = entry.getValue();
+    private void broadcastHeart(String reqid) {
+        ConcurrentHashMap<String, AsyncContext> group = countConnections.get(reqid);
+        if (group == null) { return; }
+        for (Map.Entry<String, AsyncContext> entry : group.entrySet()) {
+            String clientId = entry.getKey();
+            AsyncContext ctx = entry.getValue();
             try {
+                PrintWriter writer = ctx.getResponse().getWriter();
                 writer.print("event: heart\n");
                 writer.print("data: heart is running\n\n");
                 writer.flush();
-            } catch (Exception ex) {
-                // 写失败主动移除连接
-                cleanupConnection(reqid, entry.getKey());
+            } catch (Exception e) {
+                cleanup(reqid, clientId);
             }
         }
     }
 
+
     //TODO jsonArray是要退出的clientId集合
-    public void broadcastLogout(JSONArray jsonArray) {
-        if (CollectionUtils.isEmpty(countConnections) || jsonArray == null) return;
-        for (Object obj : jsonArray) {
-            String targetReqid = obj.toString();
-            ConcurrentHashMap<String, PrintWriter> writerMap = countConnections.get(targetReqid);
-            if (writerMap != null) {
-                for (Map.Entry<String, PrintWriter> entry : writerMap.entrySet()) {
-                    try {
-                        PrintWriter writer = entry.getValue();
-                        writer.print("event: logout\n");
-                        writer.print("data: User is logging out\n\n");
-                        writer.flush();
-                    } catch (Exception ex) { }
-                }
-                // 确认通知完毕就清理
-                for (String clientId : writerMap.keySet()) {
-                    cleanupConnection(targetReqid, clientId);
-                }
+    public void broadcastLogout(String reqid) {
+        ConcurrentHashMap<String, AsyncContext> group = countConnections.get(reqid);
+        if (group != null) {
+            for (Map.Entry<String, AsyncContext> entry : group.entrySet()) {
+                String clientId = entry.getKey();
+                AsyncContext ctx = entry.getValue();
+                try {
+                    PrintWriter writer = ctx.getResponse().getWriter();
+                    writer.print("event: logout\n");
+                    writer.print("data: bye\n\n");
+                    writer.flush();
+                } catch (Exception ignore) {}
+                cleanup(reqid, clientId);
             }
         }
     }
+
 }

+ 6 - 3
ruoyi-admin/src/main/resources/templates/mainYiningView.html

@@ -389,9 +389,12 @@
 		}
 
 	//TODO sse测试
-    /*const clientId = 'uniqueClientId123'; // This should be unique for each client
+    const clientId = 'uniqueClientId123'; // This should be unique for each client
     const eventSource = new EventSource("/sse/subscribe?clientId="+clientId+"&reqid="+sessionId);
-
+    for (let i = 0; i < 0; i++) {
+        let eventSourceNNN = new EventSource("/sse/subscribe?clientId="+clientId+"&reqid="+sessionId);
+        console.log("重连次数:"+i);
+    }
     eventSource.onmessage = function(event) {
         const newElement = document.createElement("div");
         newElement.textContent = "Message: "+event.data;
@@ -414,7 +417,7 @@
         const newElement = document.createElement("div");
         newElement.textContent = "An error occurred!";
         console.log(newElement.textContent);
-    };*/
+    };
 </script>
 </body>
 </html>