1.项目背景 :使用websocket 和前端通信但是后端是分布式部署,session 无法共享,并且websocketSession 无法序列化,所以无法把session存到redis 中。
2.基础代码:
@Configuration
@Slf4j
public class RedisConfig {@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(listenerAdapter, new PatternTopic(Constants.REDIS_CHANNEL));return container;}@Beanpublic MessageListenerAdapter listenerAdapter(RedisReceiver receiver) {return new MessageListenerAdapter(receiver, "onMessage");}}
@Slf4j
@Component
public class RedisReceiver implements MessageListener {@Autowiredprivate TestHandler testHandler;@Overridepublic void onMessage(Message message, byte[] pattern) {try {log.info("RedisReceiver onMessage:{}",new String(message.getBody(), StandardCharsets.UTF_8));String channel = new String(message.getChannel());String content = new String(message.getBody(), StandardCharsets.UTF_8);if (Constants.REDIS_CHANNEL.endsWith(channel)) {testHandler.sendMessageToClient(content);}} catch (Exception e) {log.error("RedisReceiver 消息处理失败",e);}}}
@Slf4j
@Component
public class TestHandler extends TextWebSocketHandler {private static final ConcurrentHashMap<String, WebSocketSession> SESSION_DISPATCH_HOLDER = new ConcurrentHashMap<>();@Resourceprivate StringRedisTemplate stringRedisTemplate;@Overridepublic void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {super.afterConnectionEstablished(session);SESSION_DISPATCH_HOLDER.put(session.getId(), session);log.info("webSocket already connected:{}",session.getId());}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {super.afterConnectionClosed(session, status);SESSION_DISPATCH_HOLDER.remove(session.getId());log.info("webSocket disConnect:{}",session.getId());}@Overridepublic void handleMessage(WebSocketSession session, WebSocketMessage<?> message) {log.info("handleMessage enter message:{}", JsonUtils.objToJsonStr(message.getPayload()));TextMessage textMessage = new TextMessage("");try {session.sendMessage(textMessage);} catch (IOException e) {log.error("发送消息失败", e);}}public void sendMessageToClient(String contentList) {log.info("sendMessageToClient:{}", contentList);if (CollectionUtils.isEmpty(SESSION_DISPATCH_HOLDER)) {log.info("没有连接中的客户端");return;}SESSION_DISPATCH_HOLDER.values().parallelStream().forEach(session -> {try {TextMessage textMessage = new TextMessage(contentList);session.sendMessage(textMessage);log.info("websocket 消息推送成功:{}", contentList);} catch (Exception e) {log.error("发送消息失败", e);}});}}