您的位置:首页 > 游戏 > 游戏 > 基于 SpringBoot 和 Netty 实现消息推送,中心服务器推送到不同用户群体

基于 SpringBoot 和 Netty 实现消息推送,中心服务器推送到不同用户群体

2024/10/6 6:39:57 来源:https://blog.csdn.net/waiter456/article/details/140836097  浏览:    关键词:基于 SpringBoot 和 Netty 实现消息推送,中心服务器推送到不同用户群体

方案思路:

  1. 用户连接和角色管理: 维护一个映射关系,存储每个用户的连接信息和角色信息。可以使用 ConcurrentHashMap 存储,key 为 ChannelHandlerContext (Netty 连接上下文),value 为用户角色。
  2. 消息分组: 根据用户角色将用户分组,以便实现向特定角色用户推送消息。
  3. 消息推送: 当需要推送消息时,根据目标角色获取对应的用户连接,并通过 Netty 发送消息。
  4. 心跳机制: 使用心跳机制检测断开连接的用户,及时清理资源。

实现步骤:

1. 项目搭建和依赖引入:

  • 创建一个 Spring Boot 项目。
  • 在 pom.xml 中添加 Netty 和 Guava 依赖:

复制代码

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.87.Final</version></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>31.1-jre</version></dependency>
</dependencies>

2. 创建 Netty 服务端:

  • 创建一个 NettyServer 类,使用 @Component 注解将其注册为 Spring Bean。
  • 在 @PostConstruct 方法中初始化 Netty 服务端,并在 @PreDestroy 方法中关闭服务端资源。

复制代码

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Component
public class NettyServer {private EventLoopGroup bossGroup;private EventLoopGroup workerGroup;private Channel channel;// 存储用户连接和角色信息private static final Map<ChannelHandlerContext, String> userChannelMap = new ConcurrentHashMap<>();@PostConstructpublic void start() throws InterruptedException {bossGroup = new NioEventLoopGroup();workerGroup = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// 添加 WebSocket 协议处理器pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));// 添加自定义处理器pipeline.addLast(new WebSocketFrameHandler());}});ChannelFuture future = bootstrap.bind(8081).sync();if (future.isSuccess()) {channel = future.channel();System.out.println("Netty server started on port 8081");}}@PreDestroypublic void stop() {if (channel != null) {channel.close();}bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();System.out.println("Netty server stopped");}// ... 其他代码 ...
}

3. 创建 WebSocket 处理器:

  • 创建一个 WebSocketFrameHandler 类,继承 SimpleChannelInboundHandler<TextWebSocketFrame> 处理 WebSocket 文本帧。

复制代码

// ... 其他代码 ...class WebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("New client connected: " + ctx.channel().remoteAddress());// TODO: 获取用户角色并存储到 userChannelMap 中// 例如:userChannelMap.put(ctx, getUserRoleFromToken(ctx));}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("Client disconnected: " + ctx.channel().remoteAddress());// TODO: 从 userChannelMap 中移除用户}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {String message = msg.text();System.out.println("Received message from client: " + message);// TODO: 处理接收到的消息,例如广播给其他用户}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}

4. 实现消息推送方法:

  • 在 NettyServer 中添加 sendMessage 方法,根据目标角色发送消息:

复制代码

// ... 其他代码 ...public void sendMessage(String message, String role) {userChannelMap.entrySet().stream().filter(entry -> role == null || role.equals(entry.getValue())) // 根据角色过滤.forEach(entry -> {ChannelHandlerContext ctx = entry.getKey();if (ctx.channel().isActive()) {ctx.writeAndFlush(new TextWebSocketFrame(message));} else {// 处理失效连接userChannelMap.remove(ctx);}});
}

5. 创建 Controller 提供发送消息接口:

  • 创建一个 MessageController,提供 RESTful 接口发送消息:

复制代码

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MessageController {@Autowiredprivate NettyServer nettyServer;@GetMapping("/send")public String sendMessage(@RequestParam String message, @RequestParam(required = false) String role) {nettyServer.sendMessage(message, role);return "Message sent successfully.";}
}

6. 处理用户认证和角色获取:

  • 在 WebSocketFrameHandler 的 channelActive 方法中,需要添加逻辑从连接信息中获取用户角色。这部分逻辑取决于你的认证方式,例如 JWT 认证。

复制代码

// ... 其他代码 ...@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("New client connected: " + ctx.channel().remoteAddress());// 从 WebSocket 连接信息中获取 tokenString token = getJwtTokenFromHandshake(ctx.channel());// 验证 token 并获取用户角色String role = validateTokenAndGetRole(token);if (role != null) {userChannelMap.put(ctx, role);} else {// 处理未授权连接ctx.close();}
}

7. (可选) 实现心跳机制:

  • 可以使用 Netty 的 IdleStateHandler 检测空闲连接,并定期发送心跳包。
  • 客户端需要响应心跳包,以保持连接活跃。

8. 客户端实现:

  • 使用 JavaScript 或其他 WebSocket 客户端库连接到 Netty 服务端。
  • 处理服务端推送的消息。

代码示例:

  • 服务端 (SpringBoot + Netty): 参考上面步骤的代码片段。
  • 客户端 (JavaScript):

复制代码

var websocket = new WebSocket("ws://localhost:8081/ws");websocket.onopen = function(event) {console.log("WebSocket connection opened");
};websocket.onmessage = function(event) {console.log("Received message: " + event.data);// 处理接收到的消息
};websocket.onerror = function(event) {console.error("WebSocket error:", event);
};function sendMessage() {var message = document.getElementById("message").value;websocket.send(message);
}

总结:

  • 使用 Netty 可以高效地实现消息推送功能,并灵活控制消息发送目标。
  • 需要根据实际需求,结合用户认证、心跳机制等功能,完善消息推送系统。
  • 以上代码示例提供了一个基本框架,你可以根据自己的业务逻辑进行修改和扩展。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com