Etcd入门
什么是Etcd
GitHub:https://github.com/etcd-io/etcd
Etcd数据结构与特性
键值对格式,类似文件层次结构。
Etcd如何保证数据一致性?
表面来看,Etcd支持事务操作
,能够保证数据一致性。
底层来看,Etcd使用Raft一致性算法
保证数据一致性。
官方可视化地址:http://play.etcd.io/play
可以深度了解,raft算法运行机制。
现在是一主两从两stop。
停止主节点。
此时主节点挂了,并没有选择新的主节点上线,因为还剩两个节点,一人一票,都没有胜出无法选择出新的Leader,这种现象也成为“
脑裂
”。
启动node2,发现node3成为了Leader,此时不会有平票的情况。
Etcd基本操作
增删改查。
写数据
读数据
前缀搜索
Etcd安装
安装:https://github.com/etcd-io/etcd/releases
有不同系统安装启动脚本。
安装完成会有三个脚本:
- etcd: etcd服务本身
- etcdctl:客户端,用户操作etcd,如读写数据
- etcdutl:备份恢复工具
执行etcd脚本,会启动etcd服务,服务默认占用2379和2380两个端口
2379:提供HTTP API服务,和etcdctl交互
2380:集群中节点通讯
Etcd可视化工具
etcdkeeper:https://github.com/evildecay/etcdkeeper
下载安装启动完毕,访问http://127.0.0.1:8080/etcdkeeper
Etcd Java客户端
jtecd:https://github.com/etcd-io/jetcd
1)引入依赖
<dependency><groupId>io.etcd</groupId><artifactId>jetcd-core</artifactId><version>0.7.7</version>
</dependency>
2)demo
public class EtcdRegistry {public static void main(String[] args) throws ExecutionException, InterruptedException {// create client using endpointsClient client = Client.builder().endpoints("http://localhost:2379").build();KV kvClient = client.getKVClient();ByteSequence key = ByteSequence.from("likelong".getBytes());ByteSequence value = ByteSequence.from("666".getBytes());// put the key-valuekvClient.put(key, value).get();// get the CompletableFutureCompletableFuture<GetResponse> getFuture = kvClient.get(key);// get the value from CompletableFutureGetResponse response = getFuture.get();System.out.println("value = " + response);// delete the keykvClient.delete(key).get();}
}
上述代码使用KVClient操作Etcd读写数据,除了KVClient客户端外,Etcd还提供了其他客户端。
3)常用客户端
绝大多数情况,前三个就够用。
Java Etcd数据结构
除了有基本的KV,还有版本、创建版本、修改版本等字段。Etcd中每个键都有一个与之关联的版本号,用于跟踪键的修改历史。当键值发生变化,版本号也会随之增加。
存储结构设计
存储结构设计几个要点:
- key如何设计?
- value如何设计?
- key什么时候过期?
结合Etcd数据存储结构特点(支持层级查询),以及一个服务会有多个服务提供者实例(负载均衡),可以设计为层级结构。
层级结构:将服务理解为文件夹、将服务对应的一个节点理解为文件夹下的文件,可以通过服务名称,用前缀查询的方式查询到某个服务的所有节点。
如下:键名规则可以为:业务前缀/服务名/服务节点地址
如果是Redis作为注册中心,可以设计为列表结构(Redis本身支持列表数据结构)。
列表结构:将所有服务节点以列表的形式整体作为value。
设置key过期超时时间,如30s,当服务宕机时,超时自动移除。
etcd选择层级结构。
开发实现
1. 注册中心开发
1)注册信息定义
ServiceMetaInfo
类,封装服务注册信息,包括服务名称、服务版本号、服务地址(域名和端口)、服务分组等。
/*** 服务元信息(注册信息)*/
public class ServiceMetaInfo {/*** 服务名称*/private String serviceName;/*** 服务版本号*/private String serviceVersion = "1.0";/*** 服务域名*/private String serviceHost;/*** 服务端口号*/private Integer servicePort;/*** 服务分组(暂未实现)*/private String serviceGroup = "default";}
添加方法,获取服务键名、获取服务注册节点键名以及获取服务访问地址。
/*** 获取服务键名*/
public String getServiceKey() {// 后续可扩展服务分组// return String.format("%s:%s:%s", serviceName, serviceVersion, serviceGroup);return String.format("%s:%s", serviceName, serviceVersion);
}/*** 获取服务注册节点键名*/
public String getServiceNodeKey() {return String.format("%s/%s:%s", getServiceKey(), serviceHost, servicePor t);
}/*** 获取完整服务地址(服务调用会用到)*/
public String getServiceAddress() {if (!StrUtil.contains(serviceHost, "http")) {return String.format("http://%s:%s", serviceHost, servicePort);}return String.format("%s:%s", serviceHost, servicePort);
}
2)注册中心配置
/*** RPC 框架注册中心配置*/
public class RegistryConfig {/*** 注册中心类别*/private String registry = "etcd";/*** 注册中心地址*/private String address = "http://localhost:2380";/*** 用户名*/private String username;/*** 密码*/private String password;/*** 超时时间(单位毫秒)*/private Long timeout = 10000L;
}
3)注册中心接口
定义注册中心接口,后续可以实现多种不同的注册中心。可以使用SPI机制,动态加载。
提供注册中心初始化、注册服务、注销服务、服务发现(获取服务节点列表)、服务销毁
等方法。
/*** 注册中心接口*/
public interface Registry {/*** 初始化** @param registryConfig*/void init(RegistryConfig registryConfig);/*** 注册服务(服务端)** @param serviceMetaInfo*/void register(ServiceMetaInfo serviceMetaInfo) throws Exception;/*** 注销服务(服务端)** @param serviceMetaInfo*/void unRegister(ServiceMetaInfo serviceMetaInfo);/*** 服务发现(获取某服务的所有节点,消费端)** @param serviceKey 服务键名* @return*/List<ServiceMetaInfo> serviceDiscovery(String serviceKey);/*** 服务销毁*/void destroy();
}
4)Etcd注册中心实现
public class EtcdRegistry implements Registry {private static final Logger logger = LoggerFactory.getLogger(EtcdRegistry.class);private Client client;private KV kvClient;/*** 根节点*/private static final String ETCD_ROOT_PATH = "/rpc/";@Overridepublic void init(RegistryConfig registryConfig) {logger.info("etcd注册中心初始化...");client = Client.builder().endpoints(registryConfig.getAddress()).connectTimeout(Duration.ofMillis(registryConfig.getTimeout())).build();kvClient = client.getKVClient();}/*** 服务注册(默认30s自动剔除)** @param serviceMetaInfo 服务元信息*/@Overridepublic void register(ServiceMetaInfo serviceMetaInfo) throws Exception {// 创建 Lease 和 KV 客户端Lease leaseClient = client.getLeaseClient();// 创建一个 30 秒的租约long leaseId = leaseClient.grant(30).get().getID();// 设置要存储的键值对String registerKey = ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey();ByteSequence key = ByteSequence.from(registerKey, StandardCharsets.UTF_8);ByteSequence value = ByteSequence.from(JSONUtil.toJsonStr(serviceMetaInfo), StandardCharsets.UTF_8);// 将键值对与租约关联起来,并设置过期时间PutOption putOption = PutOption.builder().withLeaseId(leaseId).build();kvClient.put(key, value, putOption).get();}public void unRegister(ServiceMetaInfo serviceMetaInfo) {kvClient.delete(ByteSequence.from(ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey(), StandardCharsets.UTF_8)).get();}public List<ServiceMetaInfo> serviceDiscovery(String serviceKey) {// 前缀搜索,结尾一定要加 '/'String searchPrefix = ETCD_ROOT_PATH + serviceKey + "/";try {// 前缀查询GetOption getOption = GetOption.builder().isPrefix(true).build();List<KeyValue> keyValues = kvClient.get(ByteSequence.from(searchPrefix, StandardCharsets.UTF_8),getOption).get().getKvs();// 解析服务信息return keyValues.stream().map(keyValue -> {String value = keyValue.getValue().toString(StandardCharsets.UTF_8);return JSONUtil.toBean(value, ServiceMetaInfo.class);}).collect(Collectors.toList());} catch (Exception e) {throw new RuntimeException(String.format("serviceKey=%s, 获取服务列表失败", serviceKey), e);}}public void destroy() {logger.info("etcd注册中心下线...");// 释放资源if (kvClient != null) {kvClient.close();}if (client != null) {client.close();}}}
2.SPI机制配置和扩展注册中心
使用SPI机制,读取配置文件,初始化对应的注册中心。
对应类属性位置:RpcConfig#registryConfig#registry
可以自己实现接口,自行扩展注册中心,也可以使用默认etcd注册中心。
SPI机制:添加对应SPI配置文件。
注册中心工厂
类似序列化器,创建注册中心工厂,可以通过配置文件配置注册中心类型,指定对应注册中心。
/*** 注册中心工厂(用于获取注册中心对象)*/
public class RegistryFactory {static {SpiLoader.load(Registry.class);}/*** 默认注册中心*/private static final Registry DEFAULT_REGISTRY = new EtcdRegistry();/*** 获取实例** @param key 注册中心键值* @return 注册中心实例*/public static Registry getInstance(String key) {return SpiLoader.getInstance(Registry.class, key);}}
3.RPC调用
1)服务代理类
服务代理类,使用jdk动态代理(实现InvocationHandler类),用于生成代理对象实现远程调用
。
/*** 服务代理(JDK 动态代理)*/
public class ServiceProxy implements InvocationHandler {/*** 调用代理** @return* @throws Throwable*/@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {RpcConfig rpcConfig = RpcApplication.getRpcConfig();// 指定序列化器final Serializer serializer = SerializerFactory.getInstance(rpcConfig.getSerializer());// 构造请求String serviceName = method.getDeclaringClass().getName();RpcRequest rpcRequest = new RpcRequest();rpcRequest.setServiceName(serviceName);rpcRequest.setMethodName(method.getName());rpcRequest.setParameterTypes(method.getParameterTypes());rpcRequest.setArgs(args);try {// 序列化byte[] bodyBytes = serializer.serialize(rpcRequest);// 从注册中心获取服务提供者请求地址Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();serviceMetaInfo.setServiceName(serviceName);serviceMetaInfo.setServiceVersion(RpcConstants.DEFAULT_SERVICE_VERSION);List<ServiceMetaInfo> serviceMetaInfoList = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());if (CollUtil.isEmpty(serviceMetaInfoList)) {throw new RuntimeException("暂无服务地址");}// 先默认取第一个,后续优化ServiceMetaInfo selectedServiceMetaInfo = serviceMetaInfoList.get(0);// 发送请求try (HttpResponse httpResponse = HttpRequest.post(selectedServiceMetaInfo.getServiceAddress()).body(bodyBytes).execute()) {byte[] result = httpResponse.bodyBytes();// 反序列化RpcResponse rpcResponse = serializer.deserialize(result, RpcResponse.class);return rpcResponse.getData();}} catch (IOException e) {e.printStackTrace();}return null;}
}
2)服务代理工厂
/*** 服务代理工厂(工厂模式,用于创建代理对象)*/
public class ServiceProxyFactory {/*** 根据服务类获取代理对象** @param serviceClass* @param <T>* @return*/public static <T> T getProxy(Class<T> serviceClass) {return (T) Proxy.newProxyInstance(serviceClass.getClassLoader(),new Class[]{serviceClass},new ServiceProxy());}}
基于服务代理类,生成代理对象,实现服务远程调用
3)本地注册中心
用于存放接口名与具体实现类映射,便于获取
/*** 本地注册中心*/
public class LocalRegistry {/*** 注册信息存储*/private static final Map<String, Class<?>> map = new ConcurrentHashMap<>();/*** 注册服务** @param serviceName 接口* @param implClass 实现类*/public static void register(String serviceName, Class<?> implClass) {map.put(serviceName, implClass);}/*** 获取服务** @param serviceName* @return*/public static Class<?> get(String serviceName) {return map.get(serviceName);}/*** 删除服务** @param serviceName*/public static void remove(String serviceName) {map.remove(serviceName);}
}
4)web服务器
Vertx官方文档:https://vertx.io/
使用Vertx实现web服务器
添加依赖
<dependency><groupId>io.vertx</groupId><artifactId>vertx-core</artifactId><version>4.5.1</version></dependency>
/*** HTTP 服务器接口*/
public interface HttpServer {/*** 启动服务器** @param port*/void doStart(int port);
}
/*** Vertx HTTP 服务器*/
public class VertxHttpServer implements HttpServer {private static final Logger logger = LoggerFactory.getLogger(VertxHttpServer.class);/*** 启动服务器** @param port 端口*/public void doStart(int port) {// 创建 Vert.x 实例Vertx vertx = Vertx.vertx();// 创建 HTTP 服务器io.vertx.core.http.HttpServer server = vertx.createHttpServer();// 处理请求server.requestHandler(new HttpServerHandler());// 启动 HTTP 服务器并监听指定端口server.listen(port, result -> {if (result.succeeded()) {logger.info("Server is now listening on port " + port);} else {logger.error("Failed to start server: " + result.cause());}});}
}
具体业务处理逻辑
/*** HTTP 请求处理器*/
public class HttpServerHandler implements Handler<HttpServerRequest> {private static final Logger logger = LoggerFactory.getLogger(HttpServerHandler.class);@Overridepublic void handle(HttpServerRequest request) {// 指定序列化器final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());// 记录日志logger.info("Received request: " + request.method() + " " + request.uri());// 异步处理 HTTP 请求request.bodyHandler(body -> {byte[] bytes = body.getBytes();RpcRequest rpcRequest = null;try {rpcRequest = serializer.deserialize(bytes, RpcRequest.class);} catch (Exception e) {e.printStackTrace();}// 构造响应结果对象RpcResponse rpcResponse = new RpcResponse();// 如果请求为 null,直接返回if (rpcRequest == null) {rpcResponse.setMessage("rpcRequest is null");doResponse(request, rpcResponse, serializer);return;}try {// 获取要调用的服务实现类,通过反射调用Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());// 封装返回结果rpcResponse.setData(result);rpcResponse.setDataType(method.getReturnType());rpcResponse.setMessage("ok");} catch (Exception e) {e.printStackTrace();rpcResponse.setMessage(e.getMessage());rpcResponse.setException(e);}// 响应doResponse(request, rpcResponse, serializer);});}/*** 响应** @param request* @param rpcResponse* @param serializer*/void doResponse(HttpServerRequest request, RpcResponse rpcResponse, Serializer serializer) {HttpServerResponse httpServerResponse = request.response().putHeader("content-type", "application/json");try {// 序列化byte[] serialized = serializer.serialize(rpcResponse);httpServerResponse.end(Buffer.buffer(serialized));} catch (IOException e) {e.printStackTrace();httpServerResponse.end(Buffer.buffer());}}
}
5)服务提供者服务注册
创建服务提供者module,引入starlink-rpc-core
模块
RPC相关配置application.properties
rpc.name=starlink
rpc.version=1.0
rpc.serverPort=8081
rpc.serializer=hessian
服务接口及实现类
public class HelloServiceImpl implements HelloService {@Overridepublic String hello(String name) {return "hello, " + name;}
}
服务注册
public class ServiceStarter {public static void main(String[] args) {RpcApplication.init();RpcConfig rpcConfig = RpcApplication.getRpcConfig();Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());// 注册服务String serviceName = HelloService.class.getName();LocalRegistry.register(serviceName, HelloServiceImpl.class);ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();serviceMetaInfo.setServiceHost(rpcConfig.getServerHost());serviceMetaInfo.setServicePort(rpcConfig.getServerPort());serviceMetaInfo.setServiceName(serviceName);serviceMetaInfo.setServiceVersion(rpcConfig.getVersion());try {registry.register(serviceMetaInfo);} catch (Exception e) {throw new RuntimeException(e);}// 启动Http服务HttpServer httpServer = new VertxHttpServer();httpServer.doStart(rpcConfig.getServerPort());}
}
客户端查看,注册成功,类似文件夹结构。
6)服务调用者远程调用
依旧创建module,引入starlink-rpc-core
及服务提供者example-consumer依赖
先启动服务提供者,再远程调用。
public class TestService {public static void main(String[] args) {// 创建代理对象HelloService helloService = ServiceProxyFactory.getProxy(HelloService.class);System.out.println(helloService.hello("Jack"));}
}
远程调用成功。
核心:
动态代理【ServiceProxy】 + 反射调用【HttpServerHandler】
基本可用,后续优化。