您的位置:首页 > 房产 > 建筑 > 我的ip地址查询_javascript网页设计_武汉网络推广外包公司_百度推广登陆入口

我的ip地址查询_javascript网页设计_武汉网络推广外包公司_百度推广登陆入口

2025/1/24 8:54:31 来源:https://blog.csdn.net/a273967581/article/details/143083862  浏览:    关键词:我的ip地址查询_javascript网页设计_武汉网络推广外包公司_百度推广登陆入口
我的ip地址查询_javascript网页设计_武汉网络推广外包公司_百度推广登陆入口

1 公共部分

1.1 请求、响应对象

@Data
public class RpcRequest {private String serviceName;private String methodName;private Class<?>[] parameterTypes;private Object[] parameters;
}
@Data
public class RpcResponse {private int code;private String msg;private Object data;private String ex;
}

1.2 rpc协议

@Data
public class RpcProtocol {private int length;private byte[] content;}

1.3 简易注册中心,保存服务名和地址的映射

public class ServiceRegister {private Map<String, List<String>> register = new HashMap<>();public ServiceRegister() {register.put(RpcService.class.getName(), new ArrayList<>(List.of("localhost:1733")));}public List<String> findService(String serviceName) {return register.get(serviceName);}}

1.4 rpc上下文,用来获取单例的ServiceRegister

public class RpcContext {public static ServiceRegister register() {return RpcRegisterHodler.REGISTER;}private static class RpcRegisterHodler {private static final ServiceRegister REGISTER = new ServiceRegister();}}

1.7 帧解码器

// 帧解码器,要配置在ChannelPipeline的第一个,这样才能解决入站数据的粘包和半包
public class RpcFrameDecoder extends LengthFieldBasedFrameDecoder {public RpcFrameDecoder() {super(1024, 0, 4);}
}
// rpc协议的编解码器
public class RpcProtocolCodec extends ByteToMessageCodec<RpcProtocol> {// 将rpc协议对象编码成字节流@Overrideprotected void encode(ChannelHandlerContext ctx, RpcProtocol msg, ByteBuf out) throws Exception {out.writeInt(msg.getLength());out.writeBytes(msg.getContent());}// 将字节流解码成rpc协议对象@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in,List<Object> out) throws Exception {int length = in.readInt();byte[] content = new byte[length];in.readBytes(content);RpcProtocol protocol = new RpcProtocol();protocol.setLength(length);protocol.setContent(content);out.add(protocol);}
}
// rpc请求对象的编解码器
public class RpcRequestCodec extends MessageToMessageCodec<RpcProtocol, 
RpcRequest> {// 将请求对象编码成rpc协议对象@Overrideprotected void encode(ChannelHandlerContext ctx, RpcRequest msg, List<Object> out) throws Exception {byte[] content = JSON.toJSONBytes(msg);int length = content.length;RpcProtocol rpcProtocol = new RpcProtocol();rpcProtocol.setLength(length);rpcProtocol.setContent(content);out.add(rpcProtocol);}// 将rpc协议对象解码成请求对象@Overrideprotected void decode(ChannelHandlerContext ctx, RpcProtocol msg, List<Object> out) throws Exception {RpcRequest request = JSON.parseObject(msg.getData(),RpcRequest.class,JSONReader.Feature.SupportClassForName);out.add(request);}
}
// rpc响应对象的编解码器
public class RpcResponseCodec extends MessageToMessageCodec<RpcProtocol, 
RpcResponse> {// 将响应对象编码成rpc协议对象@Overrideprotected void encode(ChannelHandlerContext ctx, RpcResponse msg, List<Object> out) throws Exception {byte[] content = JSON.toJSONBytes(msg);int length = content.length;RpcProtocol rpcProtocol = new RpcProtocol();rpcProtocol.setLength(length);rpcProtocol.setContent(content);out.add(rpcProtocol);}// 将rpc协议对象解码成响应对象@Overrideprotected void decode(ChannelHandlerContext ctx, RpcProtocol msg, List<Object> out) throws Exception {RpcResponse response = JSON.parseObject(msg.getContent(), RpcResponse.class);out.add(response);}
}

1.6 服务接口

public interface RpcService {String hello(String name);}

2 服务端

2.1 接口实现类

@Slf4j
public class RpcServiceImpl implements RpcService {@Overridepublic String hello(String name) {log.info("service received: {} ", name);return "hello " + name;}
}

2.2 接口名和实现类的对象映射,通过接口名查找对应的实现类对象

public class ServiceMapping {private Map<String, RpcService> mappings = new HashMap<>();public ServiceMapping() {mappings.put(RpcService.class.getName(), new RpcServiceImpl());}public void registerMapping(String serviceName, RpcService service) {mappings.put(serviceName, service);}public RpcService findMapping(String serviceName) {return mappings.get(serviceName);}}

2.2 服务端rpc上下文,用来获取单例的ServiceMapping

public class RpcServerContext {public static ServiceMapping mapping() {return RpcMappingrHodler.MAPPING;}private static class RpcMappingrHodler {private static final ServiceMapping MAPPING = new ServiceMapping();}}

2.3 业务处理器handler

@Slf4j
public class RpcServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcRequest request = (RpcRequest) msg;RpcResponse response = invoke(request);ctx.writeAndFlush(response);}private RpcResponse invoke(RpcRequest request) {RpcResponse response = new RpcResponse();try {ServiceMapping register = RpcServerContext.mapping();RpcService rpcService = register.findMapping(request.getServiceName());String methodName = request.getMethodName();Class<?>[] parameterTypes = request.getParameterTypes();Object[] parameters = request.getParameters();// invokeMethod method = RpcService.class.getDeclaredMethod(methodName, parameterTypes);Object result = method.invoke(rpcService, parameters);//response.setCode(200);response.setMsg("ok");response.setData(result);} catch (Exception e) {response.setCode(500);response.setMsg("error");response.setEx(e.getMessage());}return response;}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info("channelInactive :{}", ctx.channel().remoteAddress());ctx.close();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("exceptionCaught :{}", ctx.channel().remoteAddress(), cause);ctx.close();}
}

2.4 启动类

public class RpcServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ChannelFuture channelFuture = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new RpcFrameDecoder());ch.pipeline().addLast(new RpcProtocolCodec());ch.pipeline().addLast(new RpcRequestCodec());ch.pipeline().addLast(new RpcResponseCodec());
//                            ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new RpcServerHandler());}}).bind(1733);channelFuture.sync();channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}

3 客户端

3.2 客户端rpc上下文,用来处理channel的响应数据

public class RpcClientContext {private Map<Channel, Promise<Object>> promises = new HashMap<>();public Promise<Object> getPromise(Channel channel) {return promises.remove(channel);}public void setPromise(Channel channel, Promise<Object> promise) {promises.put(channel, promise);}}

3.2 业务处理器handler

@Slf4j
public class RpcClientHandler extends ChannelInboundHandlerAdapter {private final RpcClientContext rpcClientContext;public RpcClientHandler(RpcClientContext rpcClientContext) {this.rpcClientContext = rpcClientContext;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("rpc invoke response: {}", msg);RpcResponse response = (RpcResponse) msg;//Promise<Object> promise = rpcClientContext.getPromise(ctx.channel());//if (response.getEx() != null)promise.setFailure(new RuntimeException(response.getEx()));elsepromise.setSuccess(response.getData());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info("channelInactive :{}", ctx.channel().remoteAddress());ctx.close();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("exceptionCaught :{}", ctx.channel().remoteAddress(), cause);ctx.close();}
}

3.3 启动类

@Slf4j
public class RpcClient {private final Map<String, NioSocketChannel> nioSocketChannels = new HashMap<>();private final RpcClientContext rpcClientContext = new RpcClientContext();public RpcService rpcService() {String serviceName = RpcService.class.getName();List<String> services = RpcContext.register().findService(serviceName);String url = services.get(0);if (!nioSocketChannels.containsKey(url)) {NioSocketChannel nioSocketChannel = createNioSocketChannel(url);nioSocketChannels.put(url, nioSocketChannel);log.info("create a new channel: {}", nioSocketChannel);}final NioSocketChannel nioSocketChannel = nioSocketChannels.get(url);return (RpcService) Proxy.newProxyInstance(RpcClient.class.getClassLoader(), new Class[]{RpcService.class},(proxy, method, args) -> {RpcRequest request = new RpcRequest();request.setServiceName(RpcService.class.getName());request.setMethodName(method.getName());request.setParameterTypes(method.getParameterTypes());request.setParameters(args);nioSocketChannel.writeAndFlush(request);// wait responseDefaultPromise<Object> promise = new DefaultPromise<>(nioSocketChannel.eventLoop());rpcClientContext.setPromise(nioSocketChannel, promise);promise.await();if (!promise.isSuccess())throw new RuntimeException(promise.cause());return promise.getNow();});}private NioSocketChannel createNioSocketChannel(String url) {//String host = url.substring(0, url.indexOf(":"));int port = Integer.parseInt(url.substring(url.indexOf(":") + 1));//EventLoopGroup group = new NioEventLoopGroup();try {ChannelFuture channelFuture = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new RpcFrameDecoder());ch.pipeline().addLast(new RpcProtocolCodec());ch.pipeline().addLast(new RpcResponseCodec());ch.pipeline().addLast(new RpcRequestCodec());
//                            ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new RpcClientHandler(rpcClientContext));}}).connect(host, port);channelFuture.sync();channelFuture.channel().closeFuture().addListener(future -> {nioSocketChannels.remove(RpcService.class.getName());group.shutdownGracefully();});//return (NioSocketChannel) channelFuture.channel();} catch (InterruptedException e) {throw new RuntimeException(e);}}private void close() {nioSocketChannels.values().forEach(NioSocketChannel::close);}public static void main(String[] args) {RpcClient rpcClient = new RpcClient();RpcService rpcService = rpcClient.rpcService();String netty = rpcService.hello("netty");System.out.println(netty);String world = rpcService.hello("world");System.out.println(world);String java = rpcService.hello("java");System.out.println(java);rpcClient.close();}}

4 总结

这样就实现了简单的rpc服务,通过公共部分的接口、注册中心、编解码器、服务端的服务映射,客户端就能进行远程过程调用了。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com