吞吐量大的 spring-5-reactive-websockets
一、服务端搭建
1、pom依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.7</version><relativePath/></parent><groupId>com.test</groupId><artifactId>demo</artifactId><version>0.0.1-SNAPSHOT</version><dependencies><!--starter-websocket--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!--fastjson--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.68</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build>
2、WebSocketConfig
package com.test.websocket;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configuration
public class WebSocketConfig {/*** 注入ServerEndpointExporter,* 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}}
3、CustomConfigurator(需要获取请求的Header信息才需要加上这个)
package com.test.websocket;import org.springframework.util.StringUtils;import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class CustomConfigurator extends ServerEndpointConfig.Configurator {@Overridepublic void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) {// 获取请求头中的 token 信息List<String> tokenList = request.getHeaders().get("Authorization");if (tokenList != null && !tokenList.isEmpty()) {String token = tokenList.get(0);config.getUserProperties().put("Authorization", token);}//先从queryParam获取用户id /ws/websocket?sn=xxxxString query = request.getQueryString();Map<String, String> queryParams = new HashMap<>();if (query != null) {for (String param : query.split("&")) {String[] entry = param.split("=");if (entry.length > 1) {queryParams.put(entry[0], entry[1]);}}}String userId = queryParams.get("sn");//再从路径取用户id /ws/webSocket/xxxxx 中的 xxxxx 作为标识(这步可以不用,WebSocketServer的open方法可以直接取)if (StringUtils.isEmpty(userId)) {URI requestURI = request.getRequestURI();String uriStr = requestURI.getPath();if (uriStr.contains("/ws/websocket")) {userId = uriStr.substring(uriStr.lastIndexOf("/") + 1);}}//将用户sn放入配置类config.getUserProperties().put("sn", userId);}
}
4、WebSocketServer
package com.test.websocket;import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.net.URI;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;/*** WebSocket服务类*/
@Component
@Slf4j
@ServerEndpoint(value = "/ws/websocket/{userId}", configurator = CustomConfigurator.class)
public class WebSocketServer {/*** 心跳消息*/private final static String PING = "ping";private final static String PONG = "pong";/*** 存放每个客户端对应的 WebSocketServer 对象*/private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();/*** 与某个客户端的连接会话,需要通过它来给客户端发送数据*/private Session session;/*** 接收 userId*/private String userId = "";/*** 连接建立成功调用的方法*/@OnOpenpublic void onOpen(Session session, @PathParam("userId") String userId) {//获取用户idif (StringUtils.isEmpty(userId)) {userId = (String) session.getUserProperties().get("sn");}if (StringUtils.isEmpty(userId)) {URI requestURI = session.getRequestURI();log.info("请求连接未传用户id,请求路径:{}", requestURI.getPath());return;}//获取token并校验String token = (String) session.getUserProperties().get("Authorization");log.info("===>>>WebSocketServer.open token:{}", token);this.session = session;this.userId = userId;webSocketMap.put(userId, this);log.info("新用户上线:" + userId + ", 当前在线人数为:" + getOnlineCount());}/*** 连接关闭调用的方法*/@OnClosepublic void onClose() {if (!webSocketMap.containsKey(userId)) {return;}webSocketMap.remove(userId);log.info("用户下线:" + userId + ", 当前在线人数为:" + getOnlineCount());}/*** 收到客户端消息后调用的方法*/@OnMessagepublic void onMessage(String message, Session session) {log.info("===>>>接收到用户:{} 发送的消息:{}", userId, message);if (PING.equals(message)) {try {this.sendMessage(PONG);} catch (IOException e) {e.printStackTrace();}}}/*** 发生错误时调用*/@OnErrorpublic void onError(Session session, Throwable error) {log.error("发生错误");error.printStackTrace();}/*** 实现服务器主动推送*/private void sendMessage(String message) throws IOException {log.info("===>>>向用户:{} 发送消息:{}", this.userId, message);this.session.getBasicRemote().sendText(message);}/*** 群发消息*/public static void sendMessageToAll(String message) throws IOException {for (Map.Entry<String, WebSocketServer> entry : webSocketMap.entrySet()) {WebSocketServer webSocketServer = entry.getValue();webSocketServer.sendMessage(message);}}/*** 单发消息*/public static void sendMessageToUser(String toUserId, String message) throws IOException {if (webSocketMap.containsKey(toUserId)) {webSocketMap.get(toUserId).sendMessage(message);} else {log.error("请求的 userId:" + toUserId + "不在该服务器上");}}/*** 在线用户*/public static Set<String> getOnlineUsers() {Set<String> set = new HashSet<>();Enumeration<String> enumeration = webSocketMap.keys();while (enumeration.hasMoreElements()) {set.add(enumeration.nextElement());}return set;}/*** 获取在线人数*/public static int getOnlineCount() {return webSocketMap.size();}/*** 用户是否在线*/public static Boolean isOnline(String userId) {return webSocketMap.containsKey(userId);}}
二、连接测试
方法1:使用浏览器连接测试
F12打开浏览器控制台,在console面板粘贴下面内容即连接成功
(如果不能粘贴根据提示输入allow paste, 第一个单独复制等连接后,后面的再一起复制进控制台)ws = new WebSocket('ws://localhost:8080/ws/user1');
ws.onopen=function(data){console.log(data)};
ws.onmessage=function(data){console.log(data)};
ws.onclose=function(data){console.log(data)};
ws.onerror=function(data){console.log(data)};
ws.send('msg');
方法2:自己写个html连接测试
<!DOCTYPE html>
<html>
<head><meta charset="UTF-8">Netty WebSocket
</head>
<br/>
<body>
<br>
<script type="text/javascript">var socket;if (!window.WebSocket) {window.WebSocket = window.MozWebSocket;}if (window.WebSocket) {socket = new WebSocket("ws://localhost/ws/websocket/11111");socket.onmessage = function (event) {console.log("xxx" + event);var ta = document.getElementById('responseText');ta.value = ta.value + '\n' + event.data;};socket.onopen = function (event) {var ta = document.getElementById('responseText');ta.value = "打开WebSocket服务正常,浏览器支持WebSocket!";};socket.onclose = function (event) {var ta = document.getElementById('responseText');ta.value = "WebSocket 关闭!";}} else {alert("抱歉,您的浏览器不支持WebSocket协议!");}function send(message) {if (!window.WebSocket) {return;}if (socket.readyState == WebSocket.OPEN) {socket.send(message);}else {alert("WebSocket连接没有建立成功!");}}
</script>
<form onsubmit="return false;"><input type="text" name="message" value="输入要发送的消息"><br/><br/><input type="button" value="发送消息" onclick="send(this.form.message.value)"><hr color="blue"/><h3>服务端返回的应答消息</h3><textarea id="responseText" style="width:500px; height: 300px;"></textarea>
</form>
</body>
</html>
方法3:在线测试网址(没有token校验)
http://www.easyswoole.com/wstool.html
http://www.websocket-test.com/