您的位置:首页 > 娱乐 > 明星 > 一文搞懂EureKa原理

一文搞懂EureKa原理

2024/12/23 10:56:02 来源:https://blog.csdn.net/Tan_xiong/article/details/142139187  浏览:    关键词:一文搞懂EureKa原理

关注公众号 不爱总结的麦穗 将不定期推送技术好文

Eureka Server 服务注册中心,提供服务发现并实现负载均衡和故障转移。它由两个组件组成:Eureka服务器和Eureka客户端。

Eureka Server

  • 以 REST API 的形式为服务实例提供了注册、管理和查询等操作。同时,Eureka Server 也为我们提供了可视化的监控页面,可以直观地看到各个 Eureka Server 当前的运行状态和所有已注册服务的情况。

Eureka Client

  • 客户端将自身服务注册到Eureka,从而使服务消费方能够找到
  • 从Eureka服务器获取注册服务列表,从而能够消费服务

Eureka架构

在这里插入图片描述

从图上标注出来的地方,我们可以看到Eureka Server 和 Eureka Client通信过程:

  • Register :服务注册

Eureka客户端向Eureka Server注册时,它提供自身的元数据,比如IP地址、端口等

  • Renew:服务续约

Eureka客户端会每隔30秒发送一次心跳来续约。通过续约来告知Eureka Server该客户端仍然存在。

  • Get Registries:获取注册列表信息

Eureka客户端从服务器获取注册表信息,将其缓存到本地。客户端会使用该信息查找其他服务,从而进行远程调用。该注册列表信息定期(每30秒)更新一次。

  • Cancel:服务下线

Eureka客户端在程序关闭时向Eureka服务器发送取消请求。

  • Make Remote Call:

从eureka client到eureka client,远程调用

  • 服务剔除:

Eureka Server在启动的时候会创建一个定时任务,每隔一段时间(默认60秒),从当前服务清单中把超时没有续约(默认90秒)的服务剔除。

源码解析

接下来,从源码层面来分析上述过程。

服务注册

在这里插入图片描述

当Eureka Client 启动时EurekaClientAutoConfiguration配置类生效,会注入Bean CloudEurekaClient,然后调用父类DiscoveryClient的构造方法。最终通过RestTemplateEurekaHttpClient#register方法发出REST请求。

Eureka Server ApplicationResource#addInstance在收到请求后,最终通过AbstractInstanceRegistry#register方法完成了客户端注册的主要逻辑。

  • AbstractInstanceRegistry#register
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {read.lock();try {// 根据微服务名称从注册表 `registry` 中获取注册的 `Map<String, Lease<InstanceInfo>>`Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());REGISTER.increment(isReplication);if (gMap == null) {final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);if (gMap == null) {gMap = gNewMap;}}// 通过实例id,从map中获取服务实例对应的租约Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());// Retain the last dirty timestamp without overwriting it, if there is already a leaseif (existingLease != null && (existingLease.getHolder() != null)) {Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted// InstanceInfo instead of the server local copy.if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");registrant = existingLease.getHolder();}} else {// 新注册的服务实例synchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {// 预期续约客户端数量+1this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;// 自我保护阀值计算 2*(60/30)*0.85updateRenewsPerMinThreshold();}}logger.debug("No previous lease information found; it is new registration");}// 通过服务实例信息和默认的90s租期构建Lease 租约信息Lease<InstanceInfo> lease = new Lease<>(registrant, leaseDuration);if (existingLease != null) {// 更新服务启动时间lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());}// 将封装了服务实例信息的Lease对象,放到gMap里面去gMap.put(registrant.getId(), lease);recentRegisteredQueue.add(new Pair<Long, String>(System.currentTimeMillis(),registrant.getAppName() + "(" + registrant.getId() + ")"));// This is where the initial state transfer of overridden status happensif (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "+ "overrides", registrant.getOverriddenStatus(), registrant.getId());if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {logger.info("Not found overridden id {} and hence adding it", registrant.getId());overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());}}InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());if (overriddenStatusFromMap != null) {logger.info("Storing overridden status {} from map", overriddenStatusFromMap);registrant.setOverriddenStatus(overriddenStatusFromMap);}// Set the status based on the overridden status rulesInstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);registrant.setStatusWithoutDirty(overriddenInstanceStatus);// 启动状态 设置服务启动时间if (InstanceStatus.UP.equals(registrant.getStatus())) {lease.serviceUp();}registrant.setActionType(ActionType.ADDED);recentlyChangedQueue.add(new RecentlyChangedItem(lease));registrant.setLastUpdatedTimestamp();// 删除缓存invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());logger.info("Registered instance {}/{} with status {} (replication={})",registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);} finally {read.unlock();}
}

Eureka Server会把客户端的实列信息保存在自己的注册表中(双层Map结构)。

服务续约

在这里插入图片描述

当Eureka Client 启动时,会开启一个心跳任务,每隔30s向服务端发送一次心跳请求

  • DiscoveryClient#initScheduledTasks
private void initScheduledTasks() {// 部分代码省略// 构建一个心跳续约的定时任务,每30s(默认)执行一次heartbeatTask = new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,// 心跳续约逻辑new HeartbeatThread());scheduler.schedule(heartbeatTask,renewalIntervalInSecs, TimeUnit.SECONDS);// 部分代码省略

Eureka Server InstanceResource#renewLease方法接收到请求进行续约

  • InstanceResource#renewLease
@PUT
public Response renewLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,@QueryParam("overriddenstatus") String overriddenStatus,@QueryParam("status") String status,@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {boolean isFromReplicaNode = "true".equals(isReplication);// 调用renew进行续约boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);// 部分代码省略
}
  • AbstractInstanceRegistry#renew
public boolean renew(String appName, String id, boolean isReplication) {RENEW.increment(isReplication);// 根据服务名字获取实例信息Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);Lease<InstanceInfo> leaseToRenew = null;if (gMap != null) {leaseToRenew = gMap.get(id);}// 服务实例不存在,直接返回续约失败if (leaseToRenew == null) {RENEW_NOT_FOUND.increment(isReplication);logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);return false;} else {// 获取服务实例的基本信息InstanceInfo instanceInfo = leaseToRenew.getHolder();if (instanceInfo != null) {// touchASGCache(instanceInfo.getASGName());// 获取服务实例的运行状态InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(instanceInfo, leaseToRenew, isReplication);// 如果运行状态未知,也返回续约失败if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"+ "; re-register required", instanceInfo.getId());RENEW_NOT_FOUND.increment(isReplication);return false;}if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {logger.info("The instance status {} is different from overridden instance status {} for instance {}. "+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),overriddenInstanceStatus.name(),instanceInfo.getId());instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);}}renewsLastMin.increment();// 更新服务端最后一次收到心跳请求的时间leaseToRenew.renew();return true;}
}

服务续约,就是更新服务端最后一次收到心跳请求的时间。

获取注册列表信息

当Eureka Client 启动时,会开启一个缓存刷新任务,每隔30s向服务端发送一次请求

  • DiscoveryClient#initScheduledTasks
private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {// 构建一个缓存刷新的定时任务,每30s(默认)执行一次int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();cacheRefreshTask = new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,// 缓存刷新逻辑new CacheRefreshThread());scheduler.schedule(cacheRefreshTask,registryFetchIntervalSeconds, TimeUnit.SECONDS);}// 部分代码省略
}
  • DiscoveryClient#fetchRegistry
private boolean fetchRegistry(boolean forceFullRegistryFetch) {Stopwatch tracer = FETCH_REGISTRY_TIMER.start();try {// If the delta is disabled or if it is the first time, get all// applicationsApplications applications = getApplications();if (clientConfig.shouldDisableDelta()|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))|| forceFullRegistryFetch|| (applications == null)|| (applications.getRegisteredApplications().size() == 0)|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta{logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());logger.info("Force full registry fetch : {}", forceFullRegistryFetch);logger.info("Application is null : {}", (applications == null));logger.info("Registered Applications size is zero : {}",(applications.getRegisteredApplications().size() == 0));logger.info("Application version is -1: {}", (applications.getVersion() == -1));// 全量获取服务列表getAndStoreFullRegistry();} else {// 增量获取服务列表getAndUpdateDelta(applications);}applications.setAppsHashCode(applications.getReconcileHashCode());logTotalInstances();} catch (Throwable e) {logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}",appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e));return false;} finally {if (tracer != null) {tracer.stop();}}

Eureka Server ApplicationsResource接收到请求后,从缓存中返回服务列表

  • ApplicationsResource#getContainers 全量获取服务列表
Key cacheKey = new Key(Key.EntityType.Application,// 全量ResponseCacheImpl.ALL_APPS,keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
  • ApplicationsResource#getContainerDifferential 增量获取服务列表
Key cacheKey = new Key(Key.EntityType.Application,// 增量ResponseCacheImpl.ALL_APPS_DELTA,keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);

通过源码可以看到,这两个方法都是从缓存中获取服务列表。区别在于获取缓存的key

那Eureka Server是怎样把客户端的注册信息存到缓存的呢?

Eureka缓存机制

Eureka Server 为了提供响应效率,提供了三层的缓存结构,将 Eureka Client 所需要的注册信息,直接存储在缓存结构中。
在这里插入图片描述

  • 第一层缓存:readOnlyCacheMap,定时从 readWriteCacheMap 同步数据(默认时间为 30 秒)。定时器通过和 readWriteCacheMap 的值做对比,如果数据不一致,则以 readWriteCacheMap 的数据为准。
  • 第二层缓存:readWriteCacheMap,主要同步于存储层。当获取缓存时判断缓存中是否没有数据,如果不存在此数据,则通过 CacheLoader 的 load 方法去加载,加载成功之后将数据放入缓存,同时返回数据。缓存过期时间,默认为 180 秒。
  • 第三层缓存:registry注册信息表,当服务下线、过期、注册、状态变更,都会来清除readWriteCacheMap缓存中的数据。
服务下线

当Eureka Client 服务关闭的时候会取消本机的各种定时任务,给服务端发送请求告知自己下线

  • DiscoveryClient#shutdown
@PreDestroy
@Override
public synchronized void shutdown() {// 部分代码省略// 取消定时任务cancelScheduledTasks();// If APPINFO was registeredif (applicationInfoManager != null&& clientConfig.shouldRegisterWithEureka()&& clientConfig.shouldUnregisterOnShutdown()) {applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);// 向服务端发送请求告知自己下线unregister();}
// 部分代码省略}
}

Eureka Server在收到请求后,就会把该服务状态置为下线(DOWN),并把该下线事件传播出去。

服务剔除

Eureka Server在启动的时候会创建一个定时任务,每隔一段时间(默认60秒),从当前服务清单中把超时没有续约(默认90秒)的服务剔除。

  • AbstractInstanceRegistry#postInit
protected void postInit() {renewsLastMin.start();if (evictionTaskRef.get() != null) {evictionTaskRef.get().cancel();}//剔除逻辑evictionTaskRef.set(new EvictionTask());创建服务剔除定时任务,60s(默认)evictionTimer.schedule(evictionTaskRef.get(),serverConfig.getEvictionIntervalTimerInMs(),serverConfig.getEvictionIntervalTimerInMs());
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com