|
@@ -37,6 +37,7 @@ import java.io.*;
|
|
import java.nio.charset.Charset;
|
|
import java.nio.charset.Charset;
|
|
import java.time.LocalDateTime;
|
|
import java.time.LocalDateTime;
|
|
import java.time.format.DateTimeFormatter;
|
|
import java.time.format.DateTimeFormatter;
|
|
|
|
+import java.util.Map;
|
|
import java.util.Timer;
|
|
import java.util.Timer;
|
|
import java.util.TimerTask;
|
|
import java.util.TimerTask;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
@@ -53,7 +54,7 @@ public class SseServlet extends BaseController {
|
|
public static final ConcurrentHashMap<String,Timer> timerArray = new ConcurrentHashMap<>();
|
|
public static final ConcurrentHashMap<String,Timer> timerArray = new ConcurrentHashMap<>();
|
|
|
|
|
|
@GetMapping("/sse/subscribe")
|
|
@GetMapping("/sse/subscribe")
|
|
- @CrossOrigin(origins = "*") // 允许所有来源
|
|
|
|
|
|
+ @CrossOrigin(origins = "*")
|
|
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
|
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
|
response.setContentType("text/event-stream");
|
|
response.setContentType("text/event-stream");
|
|
response.setCharacterEncoding("UTF-8");
|
|
response.setCharacterEncoding("UTF-8");
|
|
@@ -61,80 +62,81 @@ public class SseServlet extends BaseController {
|
|
response.setHeader("Connection", "keep-alive");
|
|
response.setHeader("Connection", "keep-alive");
|
|
|
|
|
|
String clientId = request.getParameter("clientId");
|
|
String clientId = request.getParameter("clientId");
|
|
- if (clientId == null) {
|
|
|
|
|
|
+ if (clientId == null || clientId.isEmpty()) {
|
|
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
|
|
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
String reqid = clientId.split("___")[0];
|
|
String reqid = clientId.split("___")[0];
|
|
- if (reqid == null) {
|
|
|
|
|
|
+ if (reqid == null || reqid.isEmpty()) {
|
|
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
|
|
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
- // Start async processing
|
|
|
|
AsyncContext asyncContext = request.startAsync();
|
|
AsyncContext asyncContext = request.startAsync();
|
|
PrintWriter writer = response.getWriter();
|
|
PrintWriter writer = response.getWriter();
|
|
|
|
|
|
- ConcurrentHashMap<String, PrintWriter> connections = new ConcurrentHashMap<>();
|
|
|
|
- if(countConnections.get(reqid)!=null&&countConnections.get(reqid).size()>0){
|
|
|
|
- connections = countConnections.get(reqid);
|
|
|
|
|
|
+ // 1. 确保连接唯一,重复时主动关闭老连接并清理资源
|
|
|
|
+ countConnections.putIfAbsent(reqid, new ConcurrentHashMap<>());
|
|
|
|
+ ConcurrentHashMap<String, PrintWriter> connections = countConnections.get(reqid);
|
|
|
|
+
|
|
|
|
+ PrintWriter oldWriter = connections.put(clientId, writer);
|
|
|
|
+ if (oldWriter != null) {
|
|
|
|
+ try { oldWriter.close(); } catch (Exception e) { }
|
|
|
|
+ log.info("重复连接已回收: clientId={}", clientId);
|
|
}
|
|
}
|
|
- connections.put(clientId,writer);
|
|
|
|
- countConnections.put(reqid, connections);
|
|
|
|
|
|
|
|
- // Set timeout for async context
|
|
|
|
|
|
+ // 2. 持久化reqid对应的timer和心跳
|
|
|
|
+ timerArray.computeIfAbsent(reqid, k -> {
|
|
|
|
+ Timer timer = new Timer();
|
|
|
|
+ timer.schedule(new TimerTask() {
|
|
|
|
+ @Override
|
|
|
|
+ public void run() {
|
|
|
|
+ broadcastHeart(reqid);
|
|
|
|
+ }
|
|
|
|
+ }, 2000, 2000);
|
|
|
|
+ return timer;
|
|
|
|
+ });
|
|
|
|
+
|
|
asyncContext.setTimeout(0);
|
|
asyncContext.setTimeout(0);
|
|
- writer.println("data: Connected\n");
|
|
|
|
- writer.flush();
|
|
|
|
|
|
|
|
- //TODO 建立心跳定时包
|
|
|
|
- Timer timer2 = new Timer();
|
|
|
|
- TimerTask timerTask2 = new TimerTask() {
|
|
|
|
- @Override
|
|
|
|
- public void run() {
|
|
|
|
- broadcastHeart(reqid);
|
|
|
|
- }
|
|
|
|
- };
|
|
|
|
- timer2.schedule(timerTask2, 2000, 2000);
|
|
|
|
- timerArray.put(reqid,timer2);
|
|
|
|
|
|
+ // 3. 发送连接成功消息
|
|
|
|
+ try {
|
|
|
|
+ writer.print("data: Connected\n\n");
|
|
|
|
+ writer.flush();
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ // 连接失效立即清理(理论上不会)
|
|
|
|
+ cleanupConnection(reqid, clientId);
|
|
|
|
+ asyncContext.complete();
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
|
|
- // Handle connection close
|
|
|
|
asyncContext.addListener(new javax.servlet.AsyncListener() {
|
|
asyncContext.addListener(new javax.servlet.AsyncListener() {
|
|
- @Override
|
|
|
|
- public void onComplete(javax.servlet.AsyncEvent asyncEvent) throws IOException {
|
|
|
|
- if(!CollectionUtils.isEmpty(countConnections)) {
|
|
|
|
- if(!CollectionUtils.isEmpty(countConnections.get(reqid))) {
|
|
|
|
- countConnections.get(reqid).remove(clientId);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ @Override public void onComplete(javax.servlet.AsyncEvent asyncEvent) { cleanupConnection(reqid, clientId); }
|
|
|
|
+ @Override public void onTimeout(javax.servlet.AsyncEvent asyncEvent) { cleanupConnection(reqid, clientId); }
|
|
|
|
+ @Override public void onError(javax.servlet.AsyncEvent asyncEvent) { cleanupConnection(reqid, clientId); }
|
|
|
|
+ @Override public void onStartAsync(javax.servlet.AsyncEvent asyncEvent) { log.info("开启链接: {}", clientId); }
|
|
|
|
+ });
|
|
|
|
|
|
- @Override
|
|
|
|
- public void onTimeout(javax.servlet.AsyncEvent asyncEvent) throws IOException {
|
|
|
|
- if(!CollectionUtils.isEmpty(countConnections)) {
|
|
|
|
- if(!CollectionUtils.isEmpty(countConnections.get(reqid))) {
|
|
|
|
- countConnections.get(reqid).remove(clientId);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ log.info("建立链接成功: clientId={}", clientId);
|
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
|
- public void onError(javax.servlet.AsyncEvent asyncEvent) throws IOException {
|
|
|
|
- if(!CollectionUtils.isEmpty(countConnections)) {
|
|
|
|
- if(!CollectionUtils.isEmpty(countConnections.get(reqid))) {
|
|
|
|
- countConnections.get(reqid).remove(clientId);
|
|
|
|
- }
|
|
|
|
|
|
+ // 工具:清理连接和Timer
|
|
|
|
+ private static void cleanupConnection(String reqid, String clientId) {
|
|
|
|
+ try {
|
|
|
|
+ ConcurrentHashMap<String, PrintWriter> connections = countConnections.get(reqid);
|
|
|
|
+ if (connections != null) {
|
|
|
|
+ PrintWriter w = connections.remove(clientId);
|
|
|
|
+ if (w != null) {
|
|
|
|
+ try { w.close(); } catch (Exception e) { }
|
|
|
|
+ }
|
|
|
|
+ // 如果分组都清空,移除reqid和timer,彻底释放内存
|
|
|
|
+ if (connections.isEmpty()) {
|
|
|
|
+ countConnections.remove(reqid);
|
|
|
|
+ Timer t = timerArray.remove(reqid);
|
|
|
|
+ if (t != null) t.cancel();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void onStartAsync(javax.servlet.AsyncEvent asyncEvent) throws IOException {
|
|
|
|
- // No action needed here
|
|
|
|
- System.out.println("开启链接");
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
-
|
|
|
|
- log.info("建立链接成功:clientId:"+clientId);
|
|
|
|
|
|
+ } catch (Exception ex) { }
|
|
}
|
|
}
|
|
|
|
|
|
@PostMapping("/sse/weblog")
|
|
@PostMapping("/sse/weblog")
|
|
@@ -212,52 +214,42 @@ public class SseServlet extends BaseController {
|
|
//TODO 广播
|
|
//TODO 广播
|
|
//TODO 心跳广播
|
|
//TODO 心跳广播
|
|
public void broadcastHeart(String reqid) {
|
|
public void broadcastHeart(String reqid) {
|
|
- if(!CollectionUtils.isEmpty(countConnections)) {
|
|
|
|
- for (ConcurrentHashMap<String, PrintWriter> writerMap : countConnections.values()) {
|
|
|
|
- try {
|
|
|
|
- //捕捉一下 如果有链接异常 避免影响其他链接循环
|
|
|
|
- for (PrintWriter writer : writerMap.values()) {
|
|
|
|
- writer.println("event: heart");
|
|
|
|
- writer.println("data: heart is running\n");
|
|
|
|
- writer.flush();
|
|
|
|
- }
|
|
|
|
- }catch(Exception ex){}
|
|
|
|
|
|
+ ConcurrentHashMap<String, PrintWriter> writerMap = countConnections.get(reqid);
|
|
|
|
+ if (writerMap == null || writerMap.isEmpty()) return;
|
|
|
|
+
|
|
|
|
+ for (Map.Entry<String, PrintWriter> entry : writerMap.entrySet()) {
|
|
|
|
+ PrintWriter writer = entry.getValue();
|
|
|
|
+ try {
|
|
|
|
+ writer.print("event: heart\n");
|
|
|
|
+ writer.print("data: heart is running\n\n");
|
|
|
|
+ writer.flush();
|
|
|
|
+ } catch (Exception ex) {
|
|
|
|
+ // 写失败主动移除连接
|
|
|
|
+ cleanupConnection(reqid, entry.getKey());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
//TODO jsonArray是要退出的clientId集合
|
|
//TODO jsonArray是要退出的clientId集合
|
|
public void broadcastLogout(JSONArray jsonArray) {
|
|
public void broadcastLogout(JSONArray jsonArray) {
|
|
- if(!CollectionUtils.isEmpty(countConnections)) {
|
|
|
|
- for (String key: countConnections.keySet()) {
|
|
|
|
- if(jsonArray!=null) {
|
|
|
|
- for (Object object : jsonArray) {
|
|
|
|
- if (key.equals(object.toString())) {
|
|
|
|
- ConcurrentHashMap<String,PrintWriter> writerMap = countConnections.get(key);
|
|
|
|
- for (PrintWriter writer:writerMap.values()) {
|
|
|
|
- writer.println("event: logout");
|
|
|
|
- writer.println("data: User is logging out\n");
|
|
|
|
- writer.flush();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ if (CollectionUtils.isEmpty(countConnections) || jsonArray == null) return;
|
|
|
|
+ for (Object obj : jsonArray) {
|
|
|
|
+ String targetReqid = obj.toString();
|
|
|
|
+ ConcurrentHashMap<String, PrintWriter> writerMap = countConnections.get(targetReqid);
|
|
|
|
+ if (writerMap != null) {
|
|
|
|
+ for (Map.Entry<String, PrintWriter> entry : writerMap.entrySet()) {
|
|
|
|
+ try {
|
|
|
|
+ PrintWriter writer = entry.getValue();
|
|
|
|
+ writer.print("event: logout\n");
|
|
|
|
+ writer.print("data: User is logging out\n\n");
|
|
|
|
+ writer.flush();
|
|
|
|
+ } catch (Exception ex) { }
|
|
}
|
|
}
|
|
- }
|
|
|
|
- for (Object object : jsonArray) {
|
|
|
|
- countConnections.remove(object.toString());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- if(!CollectionUtils.isEmpty(timerArray)){
|
|
|
|
- if(jsonArray!=null) {
|
|
|
|
- for (Object object : jsonArray) {
|
|
|
|
- Timer timerCurrent = timerArray.get(object.toString());
|
|
|
|
- if (timerCurrent!=null) {
|
|
|
|
- timerCurrent.cancel();
|
|
|
|
- timerArray.remove(object.toString());
|
|
|
|
- }
|
|
|
|
|
|
+ // 确认通知完毕就清理
|
|
|
|
+ for (String clientId : writerMap.keySet()) {
|
|
|
|
+ cleanupConnection(targetReqid, clientId);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- log.info("退出指令发送:sessionId:"+jsonArray.toJSONString());
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|