(今天就把netty笔记搬上来和更新一下,没有去整理了)
netty版本4.1.99.final
本文主要记录了netty中websocket handler的源码处理流程
1:先简要概括下netty使用websocket流程
websocket请求流程:client先发送一个http请求表示想要建立websocket连接,然后服务端解析http请求,发现这是个http请求websocket的握手请求,所以建立websocket连接,当处理完第一个http请求后,后续就是websocket请求了,所以服务端在处理完第一个http请求后就要移除pipeline中的http相关handler,同样,client在收到请求完成以后也要删除http相关handler,因为后续的协议是websocket而不是http,而httphandler是无法解析websocket协议请求的,所以要移除
2:测试代码-server和client
server:
package org.example;
public class Server {public static void main(String[] args) throws Exception {new Server().start(7070);}public void start(int port) throws InterruptedException {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//!!!需要添加httphandler,不用担心,websocket处理完第一个请求后会自动移除httphandlerpipeline.addLast(new HttpServerCodec());pipeline.addLast(new HttpObjectAggregator(64 * 1024));//!!!ws://localhost:7070/helloWs 可以访问server//!!!创建时会自动添加websocketHandShake handler,处理完后会自动移除pipeline.addLast(new WebSocketServerProtocolHandler("/helloWs"));pipeline.addLast(new SimpleChannelInboundHandler<WebSocketFrame>() {@Overrideprotected void channelRead0(io.netty.channel.ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {if (frame instanceof TextWebSocketFrame) {// 处理文本消息String request = ((TextWebSocketFrame) frame).text();System.out.println("收到客户端消息: " + request);// 回复客户端消息ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息: " + request));} else {System.out.println("收到非文本消息,不支持处理");}}@Overridepublic void exceptionCaught(io.netty.channel.ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}@Overridepublic void handlerAdded(io.netty.channel.ChannelHandlerContext ctx) throws Exception {System.out.println("客户端连接: " + ctx.channel().remoteAddress());}@Overridepublic void handlerRemoved(io.netty.channel.ChannelHandlerContext ctx) throws Exception {System.out.println("客户端断开连接: " + ctx.channel().remoteAddress());}});}});ChannelFuture f = b.bind(port).sync();System.out.println("服务器启动,监听端口: " + port);f.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
client:
package org.example;public class Client {final CountDownLatch latch = new CountDownLatch(1);public static void main(String[] args) throws Exception {Client client = new Client();client.test();}public void test() throws Exception {Channel dest = dest();latch.await();dest.writeAndFlush(new TextWebSocketFrame("CountDownLatch完成后发送的消息"));}public Channel dest() throws Exception {final URI webSocketURL = new URI("ws://127.0.0.1:7070/helloWs");EventLoopGroup group = new NioEventLoopGroup();Bootstrap boot = new Bootstrap();boot.option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true).group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel sc) throws Exception {ChannelPipeline pipeline = sc.pipeline();//!!!需要添加httphandler,不用担心,websocket处理完第一个请求后会自动移除httphandlerpipeline.addLast(new HttpClientCodec());pipeline.addLast(new ChunkedWriteHandler());pipeline.addLast(new HttpObjectAggregator(64 * 1024));//!!!ws://localhost:7070/helloWs 可以访问server//!!!创建时会自动添加websocketHandShake handler,处理完后会自动移除pipeline.addLast(new WebSocketClientProtocolHandler(WebSocketClientHandshakerFactory.newHandshaker(webSocketURL, WebSocketVersion.V13, null, false, new DefaultHttpHeaders())));pipeline.addLast(new SimpleChannelInboundHandler<TextWebSocketFrame>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg)throws Exception {System.err.println(" 客户端收到消息======== " + msg.text());}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE.equals(evt)) {System.out.println(ctx.name()+" 握手完成!");latch.countDown();send(ctx.channel());}super.userEventTriggered(ctx, evt);}});}});ChannelFuture cf = boot.connect(webSocketURL.getHost(), webSocketURL.getPort()).sync();return cf.channel();}public static void send(Channel channel) {final String textMsg = "握手完成后直接发送的消息";if (channel != null && channel.isActive()) {TextWebSocketFrame frame = new TextWebSocketFrame(textMsg);channel.writeAndFlush(frame).addListener((ChannelFutureListener) channelFuture -> {if (channelFuture.isDone() && channelFuture.isSuccess()) {System.out.println(" ================= 发送成功.");} else {channelFuture.channel().close();System.out.println(" ================= 发送失败. cause = " + channelFuture.cause());channelFuture.cause().printStackTrace();}});} else {System.out.println("消息发送失败! textMsg = " + textMsg);}}}
2024/10/11 哎,今天没事干。。。不怎么想上班。。看看源码摸摸鱼算了
2:源码笔记
!!!注意:每个连接都会有一套pipeline,即pipeline是专属的,而不是共用的
server的websocket handler详解:
.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel sc) throws Exception {ChannelPipeline pipeline = ch.pipeline();//1:httpHandler,处理websocket的第一个http请求,处理完后会被删除 pipeline.addLast(new HttpServerCodec()); #httpHandlerpipeline.addLast(new HttpObjectAggregator(64 * 1024)); #httpHandler//2:处理websocket协议pipeline.addLast(new WebSocketServerProtocolHandler("/helloWs")) #websocket协议handler,做2件事#1:channel建立时自动添加2个websocket handler#2:这些添加的handler在处理完第一个http请求后会自动移除httpHandlerWebSocketServerProtocolHandshakeHandler.handlerAdded #handlerAdded:当channel建立后netty会给channel添加handler#当添加完以后就会调用handler的handlerAdded方法ChannelPipeline.addBefore(new WebSocketServerProtocolHandshakeHandler) ##添加websocket handleShake handler即握手处理器#当握手完成后即处理完第一个http请求后会移除自己和其他httpHandler#因为websocket只有第一个请求才是http请求WebSocketServerProtocolHandshakeHandler.channelRead #WebSocketServerProtocolHandshakeHandler chanelRead只处理http请求if httpObject instanceof HttpRequest: #就是处理器第一个http请求,处理后再移除httphandler和自己handler=WebSocketServerHandshakerFactory.newHandshaker #如果第一个请求是http请求,那么就创建webSocket HandShakerWebSocketServerProtocolHandler.setHandshaker(handler)ctx.pipeline().remove(this) #这里就是WebSocketServerProtocolHandshakeHandler移除自己WebSocketServerHandshake.handsShake #处理握手请求ChannelPipeline.remove(HttpObjectAggregator.class) #移除HttpObjectAggregator HandlerChannelPipeline.remove(HttpContentCompressor.class)#移除htpContentCompressor handlerChannelPipeline.addBefore(newWebSocketEncoder()) #添加websocket encoderChannelPipeline.addBefore(newWebsocketDecoder()) #添加websocket decoder#!!!缕一下:WebSocketServerProtocolHandler的handlerAdded#!!!在WebSocketServerProtocolHandler之前添加一个握手handler,#!!!这个handler就是WebSocketServerProtocolHandshakeHandler#!!!而WebSocketServerProtocolHandshakeHandler处理完成后会重置#!!!WebSocketServerProtocolHandshakeHandler的handler #!!!然后再移除自己,因为这是第一个http请求,处理完第一个http请求就不需要握手了#!!!handsshake负责具体的握手逻辑,握手逻辑包括两个操作:#!!!1:移除httphandler,因为后续就是websocket协议请求了,httphandler无法解析#!!!2:添加websocket 编解码器ctx.fireUserEventTriggered(HANDSHAKE_COMPLETE) #处理完握手后发送websocket握手完成事件ctx.fireUserEventTriggered(HandshakeComplete)ChannelPipeline.addBefore(new Utf8FrameValidator) #添加websocket验证handler//3:处理webscoket请求 pipeline.addLast(new MyWebSocketHandler<WebSocketFrame>()) #处理webSocket请求if (frame instanceof TextWebSocketFrame) { #捋一下:如果不是websocket请求,那么这里就会跳过#如果websocket握手阶段已完成,那么在此之前就会有一个websocket decoder#websocket decoder会输出TextWebSocketFrame对象#我们的handler检测到TextWebSocketFrame对象,就会进行处理了 ..处理websocket请求... #至此websocket流程就讲完了,client端也是一样的逻辑:#client端也是一样的逻辑:1:握手;2:握手完成后删除httpHandler}
client的websocket详解:
pipeline.addLast(new HttpClientCodec());pipeline.addLast(new ChunkedWriteHandler());pipeline.addLast(new HttpObjectAggregator(64 * 1024));pipeline.addLast(new WebSocketClientProtocolHandler("/helloWs")) #一样的逻辑:添加handshake处理握手,#处理完后删除httphandler和自己,然后添加编解码器WebSocketClientProtocolHandler.handlerAddedChannelPipeline.addBefore(WebSocketClientProtocolHandshakeHandler) #添加握手handler处理第一个http握手请求WebSocketClientProtocolHandshakeHandler.channelActive #连接建立后会发起握手,握手完成后会添加websocket 编码器WebSocketClientHandsShake.handshake #FullHttpRequest request = newHandshakeRequest() #创建http 握手请求channel.writeAndFlush(request) #发起握手addLister.operationComplete #添加listner,当握手完成后会执行ChannelPipeline.addAfter(WebSocketEncoder()) #添加websocket编码器addLister.operationComplete ctx.fireUserEventTriggered(HANDSHAKE_ISSUED) #握手完成后发送握手完成事件WebSocketClientProtocolHandshakeHandler.channelReadif !(msg instanceof FullHttpResponse): #只处理第一个http请求,如果不是http请求,则跳过ctx.fireChannelRead(msg)returnWebSocketClientHandshake.finishHandshake #完成握手...校验http请求是不是server发来的websocket响应,略... ChannelPipeline.remove(HttpContentDecompressor.class) #握手完成后移除httphandler,因为后续就是websocket请求了ChannelPipeline.remove(HttpObjectAggregator.class)ChannelPipeline.addAfter(WebsocketDecoder()) #添加websocket解码器ChannelPipeline.addBefore(new Utf8FrameValidator) #添加websocket验证handlerpipeline.addLast(new MyWebSocketHandler<WebSocketFrame>())