您的位置:首页 > 健康 > 养生 > 媒介星软文平台_创意网页设计模板_关键词优化一般收费价格_如何引流推广产品

媒介星软文平台_创意网页设计模板_关键词优化一般收费价格_如何引流推广产品

2025/2/25 22:24:23 来源:https://blog.csdn.net/okok__TXF/article/details/145779223  浏览:    关键词:媒介星软文平台_创意网页设计模板_关键词优化一般收费价格_如何引流推广产品
媒介星软文平台_创意网页设计模板_关键词优化一般收费价格_如何引流推广产品

手写Rpc框架 - 导读

git仓库-all-rpc

GTIEE:https://gitee.com/quercus-sp204/all-rpc 【参考源码 yrpc】

1. Rpc概念

RPC 即远程过程调用(Remote Procedure Call) ,就是通过网络从远程计算机程序上请求服务。

  • 本地调用抽象:允许程序像调用本地函数一样调用远程计算机上的函数。开发者无需编写复杂的网络通信代码来处理诸如建立连接、发送请求、等待响应等细节,只需关注业务逻辑的实现。例如,在一个分布式系统中,A 服务器上的程序需要调用 B 服务器上的某个函数来获取数据,使用 RPC 就可以像调用本地函数一样简单。
  • 通信协议与序列化:为了实现这种跨网络的函数调用,RPC 框架通常会使用特定的通信协议(如 TCP/IP)来传输数据,并通过序列化和反序列化技术,将调用的参数和返回值转换为适合在网络上传输的格式。比如,将参数对象转换为字节流进行传输,在接收端再将字节流还原为对象。

那么,它的应用场景有哪些呢?我们平时用到了吗?

  • 微服务架构:在微服务架构中,各个微服务之间通常需要相互通信来完成复杂的业务流程。例如,一个电商系统中,订单服务可能需要调用库存服务来检查商品库存,调用用户服务来验证用户信息等,RPC 可以高效地实现这些微服务间的通信。
  • 分布式系统:在大型分布式系统中,不同节点可能负责不同的功能模块。例如,在搜索引擎系统中,索引构建节点和查询服务节点可能分布在不同的服务器上,通过 RPC 可以实现节点之间的协同工作。

说白了,就是不同服务之间的网络通信嘛。那你可能会问了,假如我的系统有User、Order、Shipment三个服务【爪哇SpringBoot编写的嚯】,如果User想要访问Order上面的函数,我只需要将此函数以Http接口的形式暴露出去,然后User那边使用RestTemplate来访问不就好了吗?何必这么费劲还要用框架呢?

仔细想一下,确实有那么点儿卵道理,但是又仔细一想,

  • 你会发现,你使用RestTemplate的时候,需要手动处理请求的 URL 拼接、请求头设置、参数序列化和响应反序列化等操作,对了,还有异常也需要自己处理,需要自己处理调用失败的情况,如重试几次啊等等
  • HTTP 协议是一种文本协议,存在较多的头部信息,在数据传输时会带来额外的开销。而rpc框架的协议通常相比http协议,是很轻量的。
  • 如果Order部署在了多台机器上面,代码里面肯定是存了这些机器的地址,如果扩容或者缩容,都要修改代码,同时还需要我们手动选择调用哪一台机器上面的Order【需要手动实现负载均衡】

那么,rpc都可以完成这些,并且服务的地址信息那些啊,可以在rpc的注册中心拉取到,动态感知。

综上所述,一个基本的Rpc框架主要应该具有的能力是:

基础通信能力

  • 高效序列化、优化网络传输,采用高效的网络协议和传输方式

  • 稳定性与可靠性

    • 连接管理:具备完善的连接池管理机制,对连接进行复用,减少频繁建立和销毁连接的开销,同时保证连接的稳定性。例如,在高并发场景下,能自动处理连接的超时、重连等问题

服务发现与治理能力

  • 服务注册与发现: 1. 服务提供者能够在启动时自动将自己的服务信息(如服务名称、地址、端口等)注册到服务注册中心,方便服务消费者发现和调用。2.实时感知:调用方能够及时感知调用的服务信息并更新。
  • 负载均衡: 如随机、轮询、最少活跃调用数、一致性哈希
  • 服务容错
    • 熔断机制:当服务提供者出现故障或响应时间过长时,能够自动熔断对该服务的调用,避免大量请求积压,影响整个系统的稳定性。
    • 降级策略:在系统资源紧张或服务出现故障时,能够自动降级服务,提供默认的响应结果或采取其他临时措施,保证系统的基本可用性。

易用性与可扩展性

  • 简单的编程模型:让开发者能够像调用本地函数一样调用远程服务,无需关注底层的网络通信细节,降低开发难度,提高开发效率
  • 插件化与扩展性
    • SPI 机制:具备良好的插件化架构,通过服务提供者接口(SPI)机制,允许开发者根据实际需求扩展框架的功能,如自定义序列化方式、负载均衡策略、过滤器等。

在这里插入图片描述

现在就按照上面的能力,来一个一个实现,最终将其组合成一个框架。

2. 角色

一个Rpc框架的大致角色分布:

在这里插入图片描述

服务提供者将自己的数据信息【例如,端口、ip、接口等信息提供给注册中心】(服务注册)

消费者从注册中心拉取到可用的服务信息(服务发现),然后选择一个合适的服务(负载均衡),发送网络请求【请求里面封装了需要调用的接口,参数等等】,

服务提供者接收到请求之后,本地调用方法,然后通过网络把响应结果过传输给消费者

最后消费者解析响应结果。

注册中心: (本文就选择zookeeper为注册中心)

3. 注册中心的接入

zookeeper的安装与启动,就不在这里赘述了。

思考注册中心的主要能力:【服务注册,服务发现】

定义接口

Registry接口

/** 注册中心的能力: 注册服务, 拉取服务列表*/
public interface Registry {/*** 注册服务* @param serviceConfig 服务的配置内容*/void register(ServiceConfig<?> serviceConfig);/*** 从注册中心拉取服务列表* @param serviceName 服务的名称* @return 服务的地址*/List<InetSocketAddress> lookup(String serviceName, String group);
}

ServiceConfig 封装服务信息的class

/*服务信息*/
public class ServiceConfig<T> {// 接口的类型/*比如UserServiceImpl实现了UserService, 真实对象就是实现类,interfaceProvider就是UserService.class*/private Class<?> interfaceProvider;private Object ref; // 真实对象private String group = "default"; // 分组// get set.....
}

Zookeeper注册中心的实现类 ZookeeperRegistry

服务注册在zookeeper上面的节点如图所示

trpc根节点
==▶消费者节点
==▼生产者节点
====▼接口的全限定名 
======▼分组
========▼地址信息1...

在这里插入图片描述

@Slf4j
public class ZookeeperRegistry extends AbstractRegistry {// 维护一个zk实例private ZooKeeper zooKeeper;public ZookeeperRegistry() {this.zooKeeper = ZookeeperUtil.createZookeeper();}public ZookeeperRegistry(String connectString,int timeout) {this.zooKeeper = ZookeeperUtil.createZookeeper(connectString,timeout);}@Overridepublic void register(ServiceConfig<?> service) {// 服务名称的节点 ----  "/tprc-metadata/providers/接口全限定名"String parentNode = Constant.BASE_PROVIDERS_PATH +"/"+service.getInterface().getName();// 建立服务节点这个节点应该是一个持久节点if(!ZookeeperUtil.exists(zooKeeper,parentNode,null)){ZookeeperNode zookeeperNode = new ZookeeperNode(parentNode,null);ZookeeperUtil.createNode(zooKeeper, zookeeperNode, null, CreateMode.PERSISTENT);}// 建立分组节点parentNode = parentNode + "/" + service.getGroup();if(!ZookeeperUtil.exists(zooKeeper,parentNode,null)){ZookeeperNode zookeeperNode = new ZookeeperNode(parentNode,null);ZookeeperUtil.createNode(zooKeeper, zookeeperNode, null, CreateMode.PERSISTENT);}// 创建本机的临时节点, ip:port ,// 服务提供方的端口一般自己设定,我们还需要一个获取ip的方法// ip我们通常是需要一个局域网ip,不是127.0.0.1,也不是ipv6// 192.168.12.121String node = parentNode + "/" + NetUtils.getIp() + ":" + TrpcBootstrap.getInstance().getConfiguration().getPort();if(!ZookeeperUtil.exists(zooKeeper,node,null)){ZookeeperNode zookeeperNode = new ZookeeperNode(node,null);ZookeeperUtil.createNode(zooKeeper, zookeeperNode, null, CreateMode.EPHEMERAL);}log.info("服务{},注册ok",service.getInterface().getName());}/*** 拉取合适的服务列表* @param serviceName 服务名称* @return 服务列表*/@Overridepublic List<InetSocketAddress> lookup(String serviceName,String group) {// 1、找到服务对应的节点String serviceNode = Constant.BASE_PROVIDERS_PATH + "/" + serviceName + "/" +group;List<String> children = ZookeeperUtil.getChildren(zooKeeper, serviceNode, null);// 获取了所有的可用的服务列表List<InetSocketAddress> inetSocketAddresses = children.stream().map(ipString -> {String[] ipAndPort = ipString.split(":");String ip = ipAndPort[0];int port = Integer.parseInt(ipAndPort[1]);return new InetSocketAddress(ip, port);}).toList();if(inetSocketAddresses.isEmpty()){throw new DiscoveryException("未发现任何可用的服务主机.");}return inetSocketAddresses;}
}

上面的ZookeeperUtil是自定义的操作Zookeeper的工具类。-- 详情见源码里面的注释,值得说明一下,Zookeeper要先有父结点才能创建子节点,不能把路径直接写全了直接创建,故在源码里面会用createRoot方法初始化所有的父结点。

public static ZooKeeper createZookeeper(String connectPath, int timeout) {CountDownLatch countDownLatch = new CountDownLatch(1);try {.................// 连接成功就创建根节点,检查是否存在rpc根节点  /trpc-metadata/providers  &&  /trpc-metadata/consumerscreateRoot(zooKeeper);...............} catch (IOException | InterruptedException e) {log.info("创建zookeeper实例时发生异常:",e);throw new ZookeeperException("创建zookeeper实例时发生异常");}
}

至此,注册中心的两个基本方法就可以告一段落了。

4.Trpc框架启动器

既然是一个框架,那么,我们必然有一个入口来启动这一套流程。

①服务提供方

- 基本功能信息appName、registry

形如: all-tRpc-demo / demo-simple-provider / …/ProviderApplication.java 这样来启动我们的提供方。

public class ProviderApplication {public static void main(String[] args) {TrpcBootstrap.getInstance() .appName("user-provider")// 配置注册中心.registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))// 扫描包下的类,然后批量发布接口.scan("com.feng.demo")// 启动服务.start();}
}

在TrpcBootstrap.java里面

@Slf4j
public class TrpcBootstrap {// 单例,每个应用程序只有一个private static final TrpcBootstrap trpcBootstrap = new TrpcBootstrap();// 配置private final RpcConfiguration configuration;// 获取实例public static TrpcBootstrap getInstance() {return trpcBootstrap;}// 设置应用名称 *****public TrpcBootstrap appName( String appName ) {configuration.setAppName(appName);return this;}// 配置注册中心 *****public TrpcBootstrap registry( RegistryConfig registryConfig ) {// 传递过来的参数registryConfig,还没有创建连接,在这里创建与注册中心的连接registryConfig.createRegistryConnection(); // 创建注册中心的连接configuration.setRegistryConfig(registryConfig); // 设置服务注册中心return this;}
}

RpcConfiguration是封装的配置信息

// 全局的配置类,代码配置 --> xml配置 --> 默认项
@Data
public class RpcConfiguration {// 配置信息-->端口号private int port = 8094;// 配置信息-->应用程序的名字private String appName = "default";// 分组信息private String group = "default";// 配置信息-->注册中心private RegistryConfig registryConfig = new RegistryConfig("zookeeper://127.0.0.1:2181"); // 默认的// 配置信息-->序列化协议private String serializeType = "jdk";// 配置信息-->压缩使用的协议private String compressType = "gzip";// 配置信息@Getterpublic IdGenerator idGenerator = new IdGenerator(1, 2);// 配置信息-->负载均衡策略private LoadBalancer loadBalancer = new RoundRobinLoadBalancer();// 为每一个ip配置一个限流器private final Map<SocketAddress, RateLimiter> everyIpRateLimiter = new ConcurrentHashMap<>(16);// 为每一个ip配置一个断路器,熔断private final Map<SocketAddress, CircuitBreaker> everyIpCircuitBreaker = new ConcurrentHashMap<>(16);// 读xml,dom4jpublic RpcConfiguration() {............}
}// 里面又持有注册中心的类
@Slf4j
public class RegistryConfig {// 定义连接的 url zookeeper://127.0.0.1:2181  redis://192.168.12.125:3306private final String connectString;// 持有一个 Registryprivate Registry registry;public RegistryConfig(String connectString) {this.connectString = connectString;}public Registry getRegistry() {if ( registry == null ) createRegistryConnection();return registry;}/*** 可以使用简单工厂来完成* @return 具体的注册中心实例*/public void createRegistryConnection() {if ( connectString == null ) throw new DiscoveryException("未配置注册中心!");// 1、获取注册中心的类型String registryType = getRegistryType(connectString,true).toLowerCase().trim();log.info("【创建与注册中心的连接~~~】 注册中心的类型: {}", registryType);// 2、通过类型获取具体注册中心if( registryType.equals("zookeeper") ){String host = getRegistryType(connectString, false);this.registry = new ZookeeperRegistry(host, Constant.ZK_TIME_OUT);} else if (registryType.equals("nacos")){String host = getRegistryType(connectString, false);this.registry = new NacosRegistry(host, Constant.ZK_TIME_OUT);} else {throw new DiscoveryException("未发现合适的注册中心。");}}private String getRegistryType(String connectString,boolean ifType){String[] typeAndHost = connectString.split("://");if(typeAndHost.length != 2){throw new RuntimeException("给定的注册中心连接url不合法");}if(ifType){return typeAndHost[0];} else {return typeAndHost[1];}}
}
- 扫描接口并发布scan
// 扫描项目指定包下面的接口,并且将他们发布到注册中心
public TrpcBootstrap scan(String packageName) {// 1. 获取指定包 path 下的所有类的全限定名List<String> classNames = getAllClassName(packageName);// 2.1 拿到所有标注了TrpcApi注解的类List<? extends Class<?>> classes = getTrpcClassesByList(classNames);//  2.2遍历这些构建实例for (Class<?> clazz : classes) {Class<?>[] interfaces = clazz.getInterfaces(); // 获取到他的接口Object instance;try {instance = clazz.getConstructor().newInstance(); // 通过无参构造器创建一个实例} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {throw new RuntimeException(e);}// 获取注解的groupTrpcApi annotation = clazz.getAnnotation(TrpcApi.class);String group = annotation.group();// 3.将这些接口发布for (Class<?> anInterface : interfaces) {ServiceConfig<?> serviceConfig = new ServiceConfig<>();serviceConfig.setInterface(anInterface);serviceConfig.setRef(instance);serviceConfig.setGroup(group);if (log.isDebugEnabled()){log.debug("---->已经通过包扫描,将服务【{}】发布.",anInterface);}// 3、发布publish(serviceConfig);}}return this;
}//
private TrpcBootstrap publish( ServiceConfig<?> service ) {configuration.getRegistryConfig().getRegistry().register(service);// 维护一个映射关系SERVERS_LIST.put(service.getInterface().getName(), service);return this;
}

具体可以看源码里面的实现

- 启动netty
public void start() {// 1、创建eventLoop,老板只负责处理请求,之后会将请求分发至workerEventLoopGroup boss = new NioEventLoopGroup(2);EventLoopGroup worker = new NioEventLoopGroup(10);try {ServerBootstrap bootstrap = new ServerBootstrap();// 3.配置服务bootstrap = bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 是核心,我们需要添加很多入站和出站的handlersocketChannel.pipeline().addLast(new LoggingHandler()) // 打印日志.addLast(new TrpcRequestDecoder())  // 请求过来,需要解码// 根据请求进行方法调用.addLast(new MethodCallHandler()).addLast(new TrpcResponseEncoder()) // 响应回去,需要编码;}});// 4.绑定端口ChannelFuture channelFuture = bootstrap.bind(configuration.getPort()).sync();// 5.关闭channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}
}

② 服务消费方

形如 all-tRpc-demo / demo-simple-consumer /…/ConsumerApplication.java

public class ConsumerApplication {public static void main(String[] args) {ReferenceConfig<UserService> reference = new ReferenceConfig<>();reference.setReference(UserService.class);// 1、连接注册中心// 2、拉取服务列表// 3、选择一个服务并建立连接// 4、发送请求,携带一些信息(接口名,参数列表,方法的名字),获得结果TrpcBootstrap.getInstance().appName("user-consumer").registry(new RegistryConfig("zookeeper://127.0.0.1:2181")).reference(reference);UserService userService = reference.get();System.out.println("=======================================");for (int i = 0; i < 10; i++) {System.out.println("【rpc调用开始=============】");// 开始时间long start = System.currentTimeMillis();List<User> users = userService.getUserByName("田小锋q" + i);for (User user : users) {System.out.println(user);}// 结束时间long end = System.currentTimeMillis();System.out.println("rpc执行耗时:" + (end - start));System.out.println("【rpc调用=============结束-----】");}}
}@Slf4j
public class ReferenceConfig<T> {// 接口类型private Class<T> interfaceRef;// 注册中心private Registry registry;// 分组信息private String group;/*** 代理设计模式* @return 代理对象*/public T get() {// 此处一定是使用动态代理完成了一些工作ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); // 类加载器Class<T>[] classes = new Class[]{interfaceRef}; // 接口类型InvocationHandler handler = new ProxyConsumerInvocationHandler(registry,interfaceRef,group);// 使用动态代理生成代理对象Object helloProxy = Proxy.newProxyInstance(classLoader, classes, handler);return (T)helloProxy;}
}

主要是jdk动态代理 在invoke里面实现我们的远程调用

5. 序列化&&压缩

在 RPC(远程过程调用)框架中,序列化和压缩是两个重要的概念,它们在数据传输过程中起着关键作用

序列化

上图里面可以看出来,序列化是将对象转换为字节流的过程,以便在网络上传输或存储到文件中。在 RPC 中,客户端调用远程服务时,需要将调用的参数对象序列化为字节流,通过网络发送到服务端;服务端接收到字节流后,再将其反序列化为对象进行处理。处理完后,又将结果对象序列化为字节流返回给客户端,客户端再反序列化得到结果。

常见的序列化方式

那么我们就定义一下序列化的接口

public interface Serializer {/*** 抽象的用来做序列化的方法*/byte[] serialize(Object object);/*** 反序列化的方法*/<T> T deserialize(byte[] bytes, Class<T> clazz);
}
1. JDK序列化
@Slf4j // lombok里面的日志注解
public class JdkSerializer implements Serializer {@Overridepublic byte[] serialize(Object object) {if (object == null) return null;try (ByteArrayOutputStream baos = new ByteArrayOutputStream();ObjectOutputStream outputStream = new ObjectOutputStream(baos);) { // try - with写法outputStream.writeObject(object);byte[] result = baos.toByteArray();if(log.isInfoEnabled()){log.info("对象【{}】已经完成了序列化操作,序列化后的字节数为【{}】",object,result.length);}return result;} catch (IOException e) {log.error("序列化对象【{}】时放生异常.",object);throw new SerializeException(e);}}@Overridepublic <T> T deserialize(byte[] bytes, Class<T> clazz) {if(bytes == null || clazz == null) return null;try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);ObjectInputStream objectInputStream = new ObjectInputStream(bais);) {Object object = objectInputStream.readObject();if(log.isInfoEnabled()){log.info("类【{}】已经完成了反序列化操作.",clazz);}return (T)object;} catch (IOException | ClassNotFoundException e) {log.error("反序列化对象【{}】时放生异常.",clazz);throw new SerializeException(e);}}
}
2.JSON序列化

这里就用fastjson了

@Slf4j
public class JsonSerializer implements Serializer {@Overridepublic byte[] serialize(Object object) {if (object == null) return null;byte[] result = JSON.toJSONBytes(object);if (log.isInfoEnabled()) {log.info("对象【{}】已经完成了序列化操作,序列化后的字节数为【{}】", object, result.length);}return result;}@Overridepublic <T> T deserialize(byte[] bytes, Class<T> clazz) {if (bytes == null || clazz == null) return null;T t = JSON.parseObject(bytes, clazz);if (log.isInfoEnabled()) {log.info("类【{}】已经完成了反序列化操作.", clazz);}return t;}
}
3.Hessian序列化

Hessian是一个轻量级的、基于HTTP的RPC(远程过程调用)框架,由Resin开源提供。它使用一个简单的、基于二进制的协议来序列化对象,并通过HTTP进行传输。Hessian的设计目标是提供一种高效、可靠且易于使用的远程服务调用机制。

maven依赖

<dependency><groupId>com.caucho</groupId><artifactId>hessian</artifactId><version>版本号</version>  <!-- 4.0.66 -->
</dependency>
@Slf4j
public class HessianSerializer implements Serializer {@Overridepublic byte[] serialize(Object object) {if (object == null) return null;try (ByteArrayOutputStream baos = new ByteArrayOutputStream();) {Hessian2Output hessian2Output = new Hessian2Output(baos);hessian2Output.writeObject(object);hessian2Output.flush();byte[] result = baos.toByteArray();if(log.isInfoEnabled())log.info("对象【{}】已经完成了序列化操作,序列化后的字节数为【{}】",object,result.length);return result;} catch (IOException e) {log.error("使用hessian进行序列化对象【{}】时放生异常.",object);throw new SerializeException(e);}}@Overridepublic <T> T deserialize(byte[] bytes, Class<T> clazz) {if(bytes == null || clazz == null) return null;try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);) {Hessian2Input hessian2Input = new Hessian2Input(bais);T t = (T) hessian2Input.readObject();if(log.isInfoEnabled()log.info("类【{}】已经使用hessian完成了反序列化操作.",clazz);return t;} catch (IOException  e) {log.error("使用hessian进行反序列化对象【{}】时发生异常.",clazz);throw new SerializeException(e);}}
}
4.序列化工厂

转念一想嚯,我们这是实现一个框架额,肯定是要有对外扩展的能力的,那么,我们就将序列化的所有方式默认加载到内存中去,通过一个工厂类来获取指定的序列化方法就可以了,然后暴露添加其他序列化方法的接口。

/*** @version 1.0* @Author txf* @Date 2025/2/10 15:17* @注释 序列化工厂*/
@Slf4j
public class SerializerFactory {private final static ConcurrentHashMap<String, ObjectWrapper<Serializer>> SERIALIZER_CACHE = new ConcurrentHashMap<>(8);private final static ConcurrentHashMap<Byte, ObjectWrapper<Serializer>> SERIALIZER_CACHE_CODE = new ConcurrentHashMap<>(8);static {ObjectWrapper<Serializer> jdk = new  ObjectWrapper<>((byte) 1, "jdk", new JdkSerializer());ObjectWrapper<Serializer> json = new  ObjectWrapper<>((byte) 2, "json", new JsonSerializer());ObjectWrapper<Serializer> hessian = new  ObjectWrapper<>((byte) 3, "hessian", new HessianSerializer());SERIALIZER_CACHE.put("jdk",jdk);SERIALIZER_CACHE.put("json",json);SERIALIZER_CACHE.put("hessian",hessian);SERIALIZER_CACHE_CODE.put((byte) 1, jdk);SERIALIZER_CACHE_CODE.put((byte) 2, json);SERIALIZER_CACHE_CODE.put((byte) 3, hessian);}/*** 使用工厂方法获取一个SerializerWrapper* @param serializeType 序列化的类型* @return SerializerWrapper*/public static  ObjectWrapper<Serializer> getSerializer(String serializeType) {ObjectWrapper<Serializer> serializerWrapper = SERIALIZER_CACHE.get(serializeType);if(serializerWrapper == null){log.error("未找到您配置的【{}】序列化工具,默认选用jdk的序列化方式。",serializeType);return SERIALIZER_CACHE.get("jdk");}return SERIALIZER_CACHE.get(serializeType);}public static  ObjectWrapper<Serializer> getSerializer(Byte serializeCode) {ObjectWrapper<Serializer> serializerWrapper = SERIALIZER_CACHE_CODE.get(serializeCode);if(serializerWrapper == null){log.error("未找到您配置的【{}】序列化工具,默认选用jdk的序列化方式。",serializeCode);return SERIALIZER_CACHE.get("jdk");}return SERIALIZER_CACHE_CODE.get(serializeCode);}/*** 新增一个新的序列化器* @param serializerObjectWrapper 序列化器的包装*/public static void addSerializer(ObjectWrapper<Serializer> serializerObjectWrapper){SERIALIZER_CACHE.put(serializerObjectWrapper.getName(),serializerObjectWrapper);SERIALIZER_CACHE_CODE.put(serializerObjectWrapper.getCode(),serializerObjectWrapper);}
}

压缩

压缩是指通过特定的算法对数据进行处理,减少数据的存储空间或传输带宽。在 RPC 中,对序列化后的字节流进行压缩可以进一步减少数据传输量,提高传输效率。说白了,就是传的少了。RPC嘛,将性能追求到极致!!!!

设计方式同序列化一样的。

public interface Compressor {// 序列化后的字节数组压缩byte[] compress(byte[] bytes);// 解压缩byte[] decompress(byte[] bytes);
}

Gzip压缩

@Slf4j
public class GzipCompressor implements Compressor {@Overridepublic byte[] compress(byte[] bytes) {try (ByteArrayOutputStream baos = new ByteArrayOutputStream();GZIPOutputStream gzipOutputStream = new GZIPOutputStream(baos);) {gzipOutputStream.write(bytes);gzipOutputStream.finish();byte[] result = baos.toByteArray();if(log.isInfoEnabled())log.info("对字节数组进行了压缩长度由【{}】压缩至【{}】.",bytes.length,result.length);return result;} catch (IOException e){log.error("对字节数组进行压缩时发生异常",e);throw new CompressException(e);}}@Overridepublic byte[] decompress(byte[] bytes) {try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);GZIPInputStream gzipInputStream = new GZIPInputStream(bais);) {byte[] result = gzipInputStream.readAllBytes();if(log.isInfoEnabled())log.info("对字节数组进行了解压缩长度由【{}】变为【{}】.",bytes.length,result.length);return result;} catch (IOException e){log.error("对字节数组进行压缩时发生异常",e);throw new CompressException(e);}}
}

压缩工厂就不在这里写了。

导读部分结束了。。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com