|
@@ -7,28 +7,34 @@ package com.ruoyi.web.controller.system;
|
|
* @Date 2025/5/15
|
|
* @Date 2025/5/15
|
|
* @Version V1.0
|
|
* @Version V1.0
|
|
**/
|
|
**/
|
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
import com.ruoyi.common.core.controller.BaseController;
|
|
import com.ruoyi.common.core.controller.BaseController;
|
|
import com.ruoyi.common.core.domain.AjaxResult;
|
|
import com.ruoyi.common.core.domain.AjaxResult;
|
|
import com.ruoyi.common.utils.ShiroUtils;
|
|
import com.ruoyi.common.utils.ShiroUtils;
|
|
import com.ruoyi.framework.config.ShiroConfig;
|
|
import com.ruoyi.framework.config.ShiroConfig;
|
|
import com.ruoyi.framework.shiro.realm.UserRealm;
|
|
import com.ruoyi.framework.shiro.realm.UserRealm;
|
|
|
|
+import org.apache.commons.io.IOUtils;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.stereotype.Controller;
|
|
import org.springframework.stereotype.Controller;
|
|
import org.springframework.util.CollectionUtils;
|
|
import org.springframework.util.CollectionUtils;
|
|
|
|
+import org.springframework.web.bind.annotation.CrossOrigin;
|
|
import org.springframework.web.bind.annotation.GetMapping;
|
|
import org.springframework.web.bind.annotation.GetMapping;
|
|
|
|
+import org.springframework.web.bind.annotation.PostMapping;
|
|
import org.springframework.web.bind.annotation.ResponseBody;
|
|
import org.springframework.web.bind.annotation.ResponseBody;
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
import javax.annotation.PostConstruct;
|
|
import javax.servlet.AsyncContext;
|
|
import javax.servlet.AsyncContext;
|
|
import javax.servlet.ServletException;
|
|
import javax.servlet.ServletException;
|
|
|
|
+import javax.servlet.ServletInputStream;
|
|
import javax.servlet.annotation.WebServlet;
|
|
import javax.servlet.annotation.WebServlet;
|
|
import javax.servlet.http.HttpServlet;
|
|
import javax.servlet.http.HttpServlet;
|
|
import javax.servlet.http.HttpServletRequest;
|
|
import javax.servlet.http.HttpServletRequest;
|
|
import javax.servlet.http.HttpServletResponse;
|
|
import javax.servlet.http.HttpServletResponse;
|
|
-import java.io.IOException;
|
|
|
|
-import java.io.PrintWriter;
|
|
|
|
|
|
+import java.io.*;
|
|
|
|
+import java.nio.charset.Charset;
|
|
import java.time.LocalDateTime;
|
|
import java.time.LocalDateTime;
|
|
import java.util.Timer;
|
|
import java.util.Timer;
|
|
import java.util.TimerTask;
|
|
import java.util.TimerTask;
|
|
@@ -40,11 +46,13 @@ public class SseServlet extends BaseController {
|
|
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(SseServlet.class);
|
|
private static final Logger log = LoggerFactory.getLogger(SseServlet.class);
|
|
|
|
|
|
- private static final ConcurrentHashMap<String,ConcurrentHashMap<String, PrintWriter>> countConnections = new ConcurrentHashMap<>();
|
|
|
|
|
|
+ //reqid,连接对象
|
|
|
|
+ public static final ConcurrentHashMap<String,ConcurrentHashMap<String, PrintWriter>> countConnections = new ConcurrentHashMap<>();
|
|
|
|
|
|
- private static final ConcurrentHashMap<String,Timer> timerArray = new ConcurrentHashMap<>();
|
|
|
|
|
|
+ public static final ConcurrentHashMap<String,Timer> timerArray = new ConcurrentHashMap<>();
|
|
|
|
|
|
@GetMapping("/sse/subscribe")
|
|
@GetMapping("/sse/subscribe")
|
|
|
|
+ @CrossOrigin(origins = "*") // 允许所有来源
|
|
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
|
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
|
response.setContentType("text/event-stream");
|
|
response.setContentType("text/event-stream");
|
|
response.setCharacterEncoding("UTF-8");
|
|
response.setCharacterEncoding("UTF-8");
|
|
@@ -56,17 +64,22 @@ public class SseServlet extends BaseController {
|
|
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
|
|
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
+ String reqid = clientId.split("___")[0];
|
|
|
|
+ if (reqid == null) {
|
|
|
|
+ response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
|
|
// Start async processing
|
|
// Start async processing
|
|
AsyncContext asyncContext = request.startAsync();
|
|
AsyncContext asyncContext = request.startAsync();
|
|
|
|
+ PrintWriter writer = response.getWriter();
|
|
|
|
|
|
ConcurrentHashMap<String, PrintWriter> connections = new ConcurrentHashMap<>();
|
|
ConcurrentHashMap<String, PrintWriter> connections = new ConcurrentHashMap<>();
|
|
- if (countConnections.get(ShiroUtils.getSessionId())!=null&&countConnections.get(ShiroUtils.getSessionId()).size()>0){
|
|
|
|
- connections = countConnections.get(ShiroUtils.getSessionId());
|
|
|
|
|
|
+ if(countConnections.get(reqid)!=null&&countConnections.get(reqid).size()>0){
|
|
|
|
+ connections = countConnections.get(reqid);
|
|
}
|
|
}
|
|
- PrintWriter writer = response.getWriter();
|
|
|
|
- connections.put(clientId, writer);
|
|
|
|
- countConnections.put(ShiroUtils.getSessionId(),connections);
|
|
|
|
|
|
+ connections.put(clientId,writer);
|
|
|
|
+ countConnections.put(reqid, connections);
|
|
|
|
|
|
// Set timeout for async context
|
|
// Set timeout for async context
|
|
asyncContext.setTimeout(0);
|
|
asyncContext.setTimeout(0);
|
|
@@ -74,37 +87,42 @@ public class SseServlet extends BaseController {
|
|
writer.flush();
|
|
writer.flush();
|
|
|
|
|
|
//TODO 建立心跳定时包
|
|
//TODO 建立心跳定时包
|
|
- final String sessionId = ShiroUtils.getSessionId();
|
|
|
|
Timer timer2 = new Timer();
|
|
Timer timer2 = new Timer();
|
|
TimerTask timerTask2 = new TimerTask() {
|
|
TimerTask timerTask2 = new TimerTask() {
|
|
@Override
|
|
@Override
|
|
public void run() {
|
|
public void run() {
|
|
- broadcastHeart(sessionId);
|
|
|
|
|
|
+ broadcastHeart(reqid);
|
|
}
|
|
}
|
|
};
|
|
};
|
|
timer2.schedule(timerTask2, 2000, 2000);
|
|
timer2.schedule(timerTask2, 2000, 2000);
|
|
- timerArray.put(sessionId,timer2);
|
|
|
|
|
|
+ timerArray.put(reqid,timer2);
|
|
|
|
|
|
// Handle connection close
|
|
// Handle connection close
|
|
asyncContext.addListener(new javax.servlet.AsyncListener() {
|
|
asyncContext.addListener(new javax.servlet.AsyncListener() {
|
|
@Override
|
|
@Override
|
|
public void onComplete(javax.servlet.AsyncEvent asyncEvent) throws IOException {
|
|
public void onComplete(javax.servlet.AsyncEvent asyncEvent) throws IOException {
|
|
if(!CollectionUtils.isEmpty(countConnections)) {
|
|
if(!CollectionUtils.isEmpty(countConnections)) {
|
|
- countConnections.get(ShiroUtils.getSessionId()).remove(clientId);
|
|
|
|
|
|
+ if(!CollectionUtils.isEmpty(countConnections.get(reqid))) {
|
|
|
|
+ countConnections.get(reqid).remove(clientId);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void onTimeout(javax.servlet.AsyncEvent asyncEvent) throws IOException {
|
|
public void onTimeout(javax.servlet.AsyncEvent asyncEvent) throws IOException {
|
|
if(!CollectionUtils.isEmpty(countConnections)) {
|
|
if(!CollectionUtils.isEmpty(countConnections)) {
|
|
- countConnections.get(ShiroUtils.getSessionId()).remove(clientId);
|
|
|
|
|
|
+ if(!CollectionUtils.isEmpty(countConnections.get(reqid))) {
|
|
|
|
+ countConnections.get(reqid).remove(clientId);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void onError(javax.servlet.AsyncEvent asyncEvent) throws IOException {
|
|
public void onError(javax.servlet.AsyncEvent asyncEvent) throws IOException {
|
|
if(!CollectionUtils.isEmpty(countConnections)) {
|
|
if(!CollectionUtils.isEmpty(countConnections)) {
|
|
- countConnections.get(ShiroUtils.getSessionId()).remove(clientId);
|
|
|
|
|
|
+ if(!CollectionUtils.isEmpty(countConnections.get(reqid))) {
|
|
|
|
+ countConnections.get(reqid).remove(clientId);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -115,14 +133,52 @@ public class SseServlet extends BaseController {
|
|
}
|
|
}
|
|
});
|
|
});
|
|
|
|
|
|
- log.info("建立链接成功:sessionId:"+sessionId+";clientId:"+clientId);
|
|
|
|
|
|
+ log.info("建立链接成功:clientId:"+clientId);
|
|
}
|
|
}
|
|
|
|
|
|
- @GetMapping("/sse/logout")
|
|
|
|
|
|
+ @PostMapping("/sse/logout")
|
|
@ResponseBody
|
|
@ResponseBody
|
|
protected AjaxResult logout(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
|
protected AjaxResult logout(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
|
- broadcastLogout();
|
|
|
|
- return AjaxResult.success();
|
|
|
|
|
|
+ try {
|
|
|
|
+ String body = getJsonBodyStr(request);
|
|
|
|
+ JSONArray jsonArray = JSONArray.parseArray(body);
|
|
|
|
+ broadcastLogout(jsonArray);
|
|
|
|
+ return AjaxResult.success();
|
|
|
|
+ }catch(Exception ex){
|
|
|
|
+ return AjaxResult.error();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public static String getJsonBodyStr(HttpServletRequest request){
|
|
|
|
+ BufferedReader reader = null;
|
|
|
|
+ InputStream inputStream = null;
|
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
|
+ try {
|
|
|
|
+ inputStream = request.getInputStream();
|
|
|
|
+ reader = new BufferedReader(new InputStreamReader(inputStream, Charset.forName("UTF-8")));
|
|
|
|
+ String line = "";
|
|
|
|
+ while ((line = reader.readLine()) != null) {
|
|
|
|
+ sb.append(line);
|
|
|
|
+ }
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ e.printStackTrace();
|
|
|
|
+ } finally {
|
|
|
|
+ if (inputStream != null) {
|
|
|
|
+ try {
|
|
|
|
+ inputStream.close();
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ e.printStackTrace();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if (reader != null) {
|
|
|
|
+ try {
|
|
|
|
+ reader.close();
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ e.printStackTrace();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return sb.toString();
|
|
}
|
|
}
|
|
|
|
|
|
@Value("${sseUrl}")
|
|
@Value("${sseUrl}")
|
|
@@ -141,30 +197,53 @@ public class SseServlet extends BaseController {
|
|
|
|
|
|
//TODO 广播
|
|
//TODO 广播
|
|
//TODO 心跳广播
|
|
//TODO 心跳广播
|
|
- public void broadcastHeart(String sessionId) {
|
|
|
|
|
|
+ public void broadcastHeart(String reqid) {
|
|
if(!CollectionUtils.isEmpty(countConnections)) {
|
|
if(!CollectionUtils.isEmpty(countConnections)) {
|
|
- for (PrintWriter writer : countConnections.get(sessionId).values()) {
|
|
|
|
- writer.println("event: heart");
|
|
|
|
- writer.println("data: heart is running\n");
|
|
|
|
- writer.flush();
|
|
|
|
|
|
+ for (ConcurrentHashMap<String, PrintWriter> writerMap : countConnections.values()) {
|
|
|
|
+ try {
|
|
|
|
+ //捕捉一下 如果有链接异常 避免影响其他链接循环
|
|
|
|
+ for (PrintWriter writer : writerMap.values()) {
|
|
|
|
+ writer.println("event: heart");
|
|
|
|
+ writer.println("data: heart is running\n");
|
|
|
|
+ writer.flush();
|
|
|
|
+ }
|
|
|
|
+ }catch(Exception ex){}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- public void broadcastLogout() {
|
|
|
|
|
|
+ //TODO jsonArray是要退出的clientId集合
|
|
|
|
+ public void broadcastLogout(JSONArray jsonArray) {
|
|
if(!CollectionUtils.isEmpty(countConnections)) {
|
|
if(!CollectionUtils.isEmpty(countConnections)) {
|
|
- for (PrintWriter writer : countConnections.get(ShiroUtils.getSessionId()).values()) {
|
|
|
|
- writer.println("event: logout");
|
|
|
|
- writer.println("data: User is logging out\n");
|
|
|
|
- writer.flush();
|
|
|
|
|
|
+ for (String key: countConnections.keySet()) {
|
|
|
|
+ if(jsonArray!=null) {
|
|
|
|
+ for (Object object : jsonArray) {
|
|
|
|
+ if (key.equals(object.toString())) {
|
|
|
|
+ ConcurrentHashMap<String,PrintWriter> writerMap = countConnections.get(key);
|
|
|
|
+ for (PrintWriter writer:writerMap.values()) {
|
|
|
|
+ writer.println("event: logout");
|
|
|
|
+ writer.println("data: User is logging out\n");
|
|
|
|
+ writer.flush();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ for (Object object : jsonArray) {
|
|
|
|
+ countConnections.remove(object.toString());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- countConnections.remove(ShiroUtils.getSessionId());
|
|
|
|
if(!CollectionUtils.isEmpty(timerArray)){
|
|
if(!CollectionUtils.isEmpty(timerArray)){
|
|
- Timer timerCurrent = timerArray.get(ShiroUtils.getSessionId());
|
|
|
|
- timerCurrent.cancel();
|
|
|
|
- timerArray.remove(ShiroUtils.getSessionId());
|
|
|
|
|
|
+ if(jsonArray!=null) {
|
|
|
|
+ for (Object object : jsonArray) {
|
|
|
|
+ Timer timerCurrent = timerArray.get(object.toString());
|
|
|
|
+ if (timerCurrent!=null) {
|
|
|
|
+ timerCurrent.cancel();
|
|
|
|
+ timerArray.remove(object.toString());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- log.info("退出指令发送:sessionId:"+ShiroUtils.getSessionId());
|
|
|
|
|
|
+ log.info("退出指令发送:sessionId:"+jsonArray.toJSONString());
|
|
}
|
|
}
|
|
}
|
|
}
|