您的位置:首页 > 娱乐 > 八卦 > 简单h5_重庆网络推广交流_新浪网今日乌鲁木齐新闻_重庆seo网站推广费用

简单h5_重庆网络推广交流_新浪网今日乌鲁木齐新闻_重庆seo网站推广费用

2024/12/27 2:01:28 来源:https://blog.csdn.net/weixin_43779268/article/details/144617987  浏览:    关键词:简单h5_重庆网络推广交流_新浪网今日乌鲁木齐新闻_重庆seo网站推广费用
简单h5_重庆网络推广交流_新浪网今日乌鲁木齐新闻_重庆seo网站推广费用

Eureka学习笔记

客户端

模块设计

在这里插入图片描述

  • Applications :保存了从 Eureka Server 获取到的服务信息,相当于 Eureka Client 端的服务列表缓存。
  • InstanceInfo :维护了自身服务实例的信息,注册和心跳时需要用到。
  • QueryClient :负责从 Eureka Server 获取已注册的服务,并且更新Applications
  • RegistrationClient :负责在服务启动时发送注册请求,然后定期发送心跳,最后在服务下线之前取消注册。
  • ClusterResolverQueryClientRegistrationClient 在发送请求前需要先知道 Eureka Server 的地址,ClusterResolver 可以根据不同的策略和实现返回 Eureka Server 地址列表以供选择。
  • JerseyApplicationClient :是真正发送网络请求的 Http client,QueryClientRegistrationClient 获取到 Eureka Server 地址后会创建一个 JerseyApplicationClient 和该 Eureka Server 通讯。
EurekaHttpClient

EurekaHttpClient的设计使用了装饰器模式,类图结构如下

在这里插入图片描述

层次关系如下图
在这里插入图片描述

  • JerseyApplicationClient: 使用 Jersey 实现的基础 HTTP 客户端,负责与 Eureka 服务端进行核心通信操作。
  • MetricsCollectingEurekaHttpClient: 统计和收集 HTTP 请求的性能指标(如延迟、成功率),用于监控和分析。
  • RedirectingEurekaHttpClient: 处理 Eureka 服务端返回的重定向响应,并将请求转发到正确的目标地址。
  • RetryableEurekaHttpClient: 为 HTTP 请求增加重试机制,确保在网络波动或短暂故障情况下提高请求的可靠性。
  • SessionedEurekaHttpClient: 维护与 Eureka 服务端的会话状态,优化频繁请求中的连接管理和资源利用。
RetryableEurekaHttpClient

见名知意,这个client的作用是使得请求具有自动重试的功能

protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {List<EurekaEndpoint> candidateHosts = null;int endpointIdx = 0;for (int retry = 0; retry < numberOfRetries; retry++) {EurekaHttpClient currentHttpClient = delegate.get();EurekaEndpoint currentEndpoint = null;if (currentHttpClient == null) {if (candidateHosts == null) {candidateHosts = getHostCandidates();if (candidateHosts.isEmpty()) {throw new TransportException("There is no known eureka server; cluster server list is empty");}}if (endpointIdx >= candidateHosts.size()) {throw new TransportException("Cannot execute request on any known server");}currentEndpoint = candidateHosts.get(endpointIdx++);currentHttpClient = clientFactory.newClient(currentEndpoint);}try {EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);// 判断请求是否成功if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {delegate.set(currentHttpClient);if (retry > 0) {logger.info("Request execution succeeded on retry #{}", retry);}return response;}logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());} catch (Exception e) {logger.warn("Request execution failed with message: {}", e.getMessage());  // just log message as the underlying client should log the stacktrace}// Connection error or 5xx from the server that must be retried on another serverdelegate.compareAndSet(currentHttpClient, null);if (currentEndpoint != null) {quarantineSet.add(currentEndpoint);}}throw new TransportException("Retry limit reached; giving up on completing the request");
}

ServerStatusEvaluators#LEGACY_EVALUATOR

private static final ServerStatusEvaluator LEGACY_EVALUATOR = new ServerStatusEvaluator() {public boolean accept(int statusCode, EurekaHttpClientDecorator.RequestType requestType) {if ((statusCode < 200 || statusCode >= 300) && statusCode != 302) {if (requestType == RequestType.Register && statusCode == 404) {return true;} else if (requestType == RequestType.SendHeartBeat && statusCode == 404) {return true;} else if (requestType == RequestType.Cancel) {return true;} else {return requestType == RequestType.GetDelta && (statusCode == 403 || statusCode == 404);}} else {return true;}}
};

这里根据请求类型以及响应码来判断是否**“成功”**,但这里的在非2xx返回的成功实际上意味着真正的成功,只表明无需进行重试,在调用上游还是会根据响应码判断操作本身是否成功,如

// 注册判断是否成功
boolean register() throws Throwable {...return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
// 心跳判断是否成功
boolean renew() {// 返回404重新发起注册if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {long timestamp = instanceInfo.setIsDirtyWithTime();}...return httpResponse.getStatusCode() == Status.OK.getStatusCode();
}
// 下线实际不太关心结果
void unregister() {...EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), 
}

发生失败时将会记录这次失败的Service到隔离集合,而重试的需要请求的Service将会重新选择,选择的过程将会优先排除掉之前失败的Servie,但在隔离结合的数量超过阈值后将会清空隔离集合

private List<EurekaEndpoint> getHostCandidates() {List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints();quarantineSet.retainAll(candidateHosts);...// 操作阈值清空隔离集合else if (quarantineSet.size() >= threshold) {logger.debug("Clearing quarantined list of size {}", quarantineSet.size());quarantineSet.clear();} else {List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());for (EurekaEndpoint endpoint : candidateHosts) {// 排除隔离集合中的Serviceif (!quarantineSet.contains(endpoint)) {remainingHosts.add(endpoint);}}candidateHosts = remainingHosts;}return candidateHosts;
}
SessionedEurekaHttpClient

它的主要作用是实现一定程度的EurekaService负载均衡,在一定时间间隔后强制重新获取一个新的client

protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {long now = System.currentTimeMillis();long delay = now - lastReconnectTimeStamp;// 超过时间关闭当前clientif (delay >= currentSessionDurationMs) {logger.debug("Ending a session and starting anew");lastReconnectTimeStamp = now;currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs);TransportUtils.shutdown(eurekaHttpClientRef.getAndSet(null));}// 重新构建EurekaHttpClient eurekaHttpClient = eurekaHttpClientRef.get();if (eurekaHttpClient == null) {eurekaHttpClient = TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient());}return requestExecutor.execute(eurekaHttpClient);
}
Applications的维护

应用/应用实例的上实例的变更并不是一个频繁的操作,所以应用列表会在DiscoverClient本地缓存,DiscoverClient会定时刷新(默认30s)本地缓存
而刷新通常都是增量更新,即从Service获取最近的增量数据,然后根据增量类型(注册、下线、修改)来更新本地缓存。由于增量数据本身是会过期的(默认3分钟),如果client这这个过程中出现网络问题,就有可能丢失部分实例数据。

DiscoverClient#fetchRegistry()

private boolean fetchRegistry(boolean forceFullRegistryFetch) {Stopwatch tracer = FETCH_REGISTRY_TIMER.start();try {...Applications applications = getApplications();// 触发全量更新的条件,第一次/强制更新/本地没有数据等if (clientConfig.shouldDisableDelta()|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))|| forceFullRegistryFetch|| (applications == null)|| (applications.getRegisteredApplications().size() == 0)|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta{getAndStoreFullRegistry();} else {// 触发增量更新getAndUpdateDelta(applications);}...
}

DiscoverClient#getAndUpdateDelta()

这里如果没有获取到增量数据还是进行全量更新,否则会进行一个原子更新,防止多线程并发更新

private void getAndUpdateDelta(Applications applications) throws Throwable {long currentUpdateGeneration = fetchRegistryGeneration.get();Applications delta = null;EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());...if (delta == null) {getAndStoreFullRegistry();} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {updateDelta(delta);...} else {logger.warn("Not updating application delta as another thread is updating it already");}
}

DiscoverClient#updateDelta()

这里会更新不同的增量类型更新本地缓存

  • 对于注册:如果应用不存在则增加应用,然后使用增量实例数据覆盖本地数据
  • 对于修改:如果应用不存在则增加应用,然后使用增量实例数据覆盖本地数据
  • 对于下线:首先移除实例,如果应用的实例列表为空则移除应用
private void updateDelta(Applications delta) {int deltaCount = 0;for (Application app : delta.getRegisteredApplications()) {for (InstanceInfo instance : app.getInstances()) {Applications applications = getApplications();String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);++deltaCount;// 注册if (ActionType.ADDED.equals(instance.getActionType())) {Application existingApp = applications.getRegisteredApplications(instance.getAppName());if (existingApp == null) {applications.addApplication(app);}applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);} // 修改else if (ActionType.MODIFIED.equals(instance.getActionType())) {Application existingApp = applications.getRegisteredApplications(instance.getAppName());if (existingApp == null) {applications.addApplication(app);}applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);}// 删除else if (ActionType.DELETED.equals(instance.getActionType())) {Application existingApp = applications.getRegisteredApplications(instance.getAppName());if (existingApp != null) {existingApp.removeInstance(instance);if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {applications.removeApplication(existingApp);}}}}}// 设置application版本、随机化应用实例getApplications().setVersion(delta.getVersion());getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());...
}
ClusterResolver

ClusterResolver和EurekaHttpClient具有层层委托的特性,主要有如下最基础用到的有如下几个Resolver

在这里插入图片描述

  • ConfigClusterResolver :从配置文件中解析树说所有的EurekaService地址
  • ZoneAffinityClusterResolver:根据client所在zone对ServiceUrl分组排序,将处于相同Zone的ServiceUrl放在list的前面,并且做随机化处理,保证client尽可能的均匀的选择EurekaService。
  • AsyncResolver :动态拉取服务实例列表并在本地缓存,以提供更高效、非阻塞的服务解析能力。
ZoneAffinityClusterResolver

ZoneAffinityClusterResolver#getClusterEndpoints()

public List<AwsEndpoint> getClusterEndpoints() {// 根据client的所在zone将所有Endpoint划为相同区和保留区List<AwsEndpoint>[] parts = ResolverUtils.splitByZone(delegate.getClusterEndpoints(), myZone);List<AwsEndpoint> myZoneEndpoints = parts[0];List<AwsEndpoint> remainingEndpoints = parts[1];// 对相同区和保留区做随机化处理,并且把相同区放在list的前面List<AwsEndpoint> randomizedList = randomizeAndMerge(myZoneEndpoints, remainingEndpoints);if (!zoneAffinity) {Collections.reverse(randomizedList);}logger.debug("Local zone={}; resolved to: {}", myZone, randomizedList);return randomizedList;
}
...
private List<AwsEndpoint> randomizeAndMerge(List<AwsEndpoint> myZoneEndpoints, List<AwsEndpoint> remainingEndpoints) {...List<AwsEndpoint> mergedList = randomizer.randomize(myZoneEndpoints);mergedList.addAll(randomizer.randomize(remainingEndpoints));return mergedList;
}
AsyncResolver

AsyncResolver#getClusterEndpoints()

public List<T> getClusterEndpoints() {long delay = refreshIntervalMs;// 第一次获取做一个预热if (warmedUp.compareAndSet(false, true)) {if (!doWarmUp()) {delay = 0;}}// 定时刷新if (scheduled.compareAndSet(false, true)) {scheduleTask(delay);}return resultsRef.get();
}
private final Runnable updateTask = new Runnable() {@Overridepublic void run() {try {...// 从委托者中获取EndPointsList<T> newList = delegate.getClusterEndpoints();if (newList != null) {resultsRef.getAndSet(newList);}}
};

ps:这里的数据源头是静态的配置,AsyncResolver没有什么太实际的意义,但如果底层使用如:DNSClusterResolver这种动态的数据源就能发挥其作用了

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com