您的位置:首页 > 汽车 > 新车 > 南昌建站系统外包_电子商务网站提供的主要功能有_常州谷歌推广_网络营销这个专业怎么样

南昌建站系统外包_电子商务网站提供的主要功能有_常州谷歌推广_网络营销这个专业怎么样

2024/11/17 2:46:10 来源:https://blog.csdn.net/qq_29166327/article/details/142537213  浏览:    关键词:南昌建站系统外包_电子商务网站提供的主要功能有_常州谷歌推广_网络营销这个专业怎么样
南昌建站系统外包_电子商务网站提供的主要功能有_常州谷歌推广_网络营销这个专业怎么样

6f3ceaf83f7d086cdaa12f3d37927247.jpeg


背景

我们继续分析EurekaClient的两个自动化配置类:

自动化配置类
功能职责
EurekaClientAutoConfiguration配置EurekaClient确保了Eureka客户端能够正确地:
- 注册到Eureka服务端
- 周期性地发送心跳信息来更新服务租约
- 下线时通知Eureka服务端
- 获取服务实例列表;

更侧重于Eureka客户端的基本配置和功能实现
EurekaDiscoveryClientConfiguration配置EurekaDiscoveryClient创建RefreshScopeRefreshedEvent事件的监听类,用于重启注册;
更多地涉及到服务的自动注册、健康检查以及事件处理等方面

CloudEurekaClient分析

原理

客户端本质就是4个动作:

  1. 获取服务列表

  2. 注册服务实例

  3. 租约续约

  4. 取消租约

源码

让我们继续关注 第一个自动装配类 EurekaClientAutoConfiguration 对CloudEurekaClient 的构造封装,即如下代码块:

@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,EurekaClientConfig config) {return new CloudEurekaClient(manager, config, this.optionalArgs,this.context);
}

分析代码:

  • CloudEurekaClient对象,并交给容器管理bean

CloudEurekaClient    

public class CloudEurekaClient extends DiscoveryClient {public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,ApplicationEventPublisher publisher) {super(applicationInfoManager, config, args);this.applicationInfoManager = applicationInfoManager;this.publisher = publisher;this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,"eurekaTransport");ReflectionUtils.makeAccessible(this.eurekaTransportField);}
}

分析代码:

  • 实际上CloudEurekaClient调用了父类DiscoveryClient的构造器

DiscoveryClient

经历了多个重载构造器的嵌套,我们进入了最终的构造器:

private final ScheduledExecutorService scheduler;
// additional executors for supervised subtasks
private final ThreadPoolExecutor heartbeatExecutor;
private final ThreadPoolExecutor cacheRefreshExecutor;@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {// .... 一些初始化工作logger.info("Initializing Eureka in region {}", clientConfig.getRegion());try {// default size of 2 - 1 each for heartbeat and cacheRefreshscheduler = Executors.newScheduledThreadPool(2,new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-%d").setDaemon(true).build());heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build());  // use direct handoffcacheRefreshExecutor = new ThreadPoolExecutor(1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build());  // use direct handoffeurekaTransport = new EurekaTransport();scheduleServerEndpointTask(eurekaTransport, args);AzToRegionMapper azToRegionMapper;if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);} else {azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);}if (null != remoteRegionsToFetch.get()) {azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));}instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());} catch (Throwable e) {throw new RuntimeException("Failed to initialize DiscoveryClient!", e);}// .......if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {try {if (!register() ) {throw new IllegalStateException("Registration error at startup. Invalid server response.");}} catch (Throwable th) {logger.error("Registration error at startup: {}", th.getMessage());throw new IllegalStateException(th);}}// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetchinitScheduledTasks();// ...其他初始化工作
}

代码分析:

  • 这里初始化了3个异步线程池:scheduler、heartbeatExecutor、cacheRefreshExecutor

    • scheduler:coreSize=2的周期任务线程池,线程名命名是DiscoveryClient-%s

    • heartbeatExecutor:coreSize=1的异步线程池,线程名命名是DiscoveryClient-HeartbeatExecutor-%d

    • cacheRefreshExecutor:coreSize=1的异步线程池,线程名命名是DiscoveryClient-CacheRefreshExecutor-%d

  • 这三个线程池,是怎么配合工作的呢?不着急,慢慢往下看

initScheduledTasks()的代码如下:
private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();cacheRefreshTask = new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread());// 【1】scheduler.schedule(cacheRefreshTask,registryFetchIntervalSeconds, TimeUnit.SECONDS);}if (clientConfig.shouldRegisterWithEureka()) {int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);// Heartbeat timerheartbeatTask = new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread());// 【2】scheduler.schedule(heartbeatTask,renewalIntervalInSecs, TimeUnit.SECONDS);} else {logger.info("Not registering with Eureka server per configuration");}
}

分析代码:

  • 【1】检查是否需要获取注册表信息(配置项eureka.client.fetchRegistry默认=true)

    • 用注入的异步线程池cacheRefreshExecutor,按指定时间间隔registryFetchIntervalSeconds,去执行CacheRefreshThread,即缓存刷新refreshRegistry()任务

    • 缓存刷新任务cacheRefreshTask

    • 使用调度器 scheduler 安排任务

  • 【2】检查是否需要注册入Eureka(配置项eureka.client.registerWithEureka默认=true)

    • 用注入的异步线程池heartbeatExecutor,按指定时间间隔renewalIntervalInSecs,去执行HeartbeatThread,即执行续租renew()任务

    • 心跳续租任务heartbeatTask

    • 使用调度器 scheduler 安排任务

CacheRefreshThread - 缓存刷新
class CacheRefreshThread implements Runnable {public void run() {refreshRegistry();}
}@VisibleForTesting
void refreshRegistry() {try {//.....//【1】刷新本地注册服务列表boolean success = fetchRegistry(remoteRegionsModified);//.....} catch (Throwable e) {logger.error("Cannot fetch registry from server", e);}
}private boolean fetchRegistry(boolean forceFullRegistryFetch) {try {// 【2】获取本地localRegionApps的服务列表Applications applications = getApplications();// 【3】获取远程数据并更新服务列表getAndUpdateDelta(applications);}// registry was fetched successfully, so return truereturn true;
}private void getAndUpdateDelta(Applications applications) throws Throwable {// .....//【4】检查缓存delta的服务注册列表Applications delta = null;if (delta == null) {// 【4.1】如果缓存为空,就再去拉取一次EurekaServer的数据getAndStoreFullRegistry();} else {if (fetchRegistryUpdateLock.tryLock()) {try {//【5】获取EurekaServer最新的服务注册表,并执行delta更新getAndStoreFullRegistry()updateDelta(delta);} finally {fetchRegistryUpdateLock.unlock();}}    }
} private void getAndStoreFullRegistry() throws Throwable {long currentUpdateGeneration = fetchRegistryGeneration.get();Applications apps = null;EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()): eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {apps = httpResponse.getEntity();}
}private void updateDelta(Applications delta) {int deltaCount = 0;//【6】遍历服务注册列表的每个appfor (Application app : delta.getRegisteredApplications()) {//【7】遍历每个服务的所有实例instancefor (InstanceInfo instance : app.getInstances()) {//【8】获取本地cache的服务注册信息Applications applications = getApplications();String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {Applications remoteApps = remoteRegionVsApps.get(instanceRegion);if (null == remoteApps) {remoteApps = new Applications();remoteRegionVsApps.put(instanceRegion, remoteApps);}applications = remoteApps;}++deltaCount;//【9】如果实例是新增的类型if (ActionType.ADDED.equals(instance.getActionType())) {Application existingApp = applications.getRegisteredApplications(instance.getAppName());if (existingApp == null) {//【10】执行实例添加applications.addApplication(app);}applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);} //【11】如果实例是修改的类型else if (ActionType.MODIFIED.equals(instance.getActionType())) {Application existingApp = applications.getRegisteredApplications(instance.getAppName());if (existingApp == null) {//【12】没有已有实例,执行添加操作applications.addApplication(app);}//【13】存在已有实例,则注册新的实例信息applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);} //【14】如果实例是删除的类型else if (ActionType.DELETED.equals(instance.getActionType())) {Application existingApp = applications.getRegisteredApplications(instance.getAppName());if (existingApp != null) {//【15】删除这个服务的实例existingApp.removeInstance(instance);//【16】如果这个服务的实例数量=0,则直接删除服务信息appif (existingApp.getInstancesAsIsFromEureka().isEmpty()) {applications.removeApplication(existingApp);}}}}}
}

代码分析:见下面流程图

34a3a1ce9a8b23a83da835f99ab9195e.png

HeartbeatThread - 心跳续租
private final Counter REREGISTER_COUNTER = Monitors.newCounter(PREFIX+ "Reregister");private class HeartbeatThread implements Runnable {public void run() {//【1】更新操作if (renew()) {lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();}}
}boolean renew() {EurekaHttpResponse<InstanceInfo> httpResponse;try {//【2】客户端发送心跳包,获取响应httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);//【3】响应码=404,说明服务在EurekaServer不存在if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {REREGISTER_COUNTER.increment();long timestamp = instanceInfo.setIsDirtyWithTime();//【4】客户端重新发起一次register操作,给EurekaServerboolean success = register();if (success) {instanceInfo.unsetIsDirty(timestamp);}//【5】EurekaServer注册成功,则续约成功return success;}//【6】响应码=200,则在EurekaServer侧续约成功了return httpResponse.getStatusCode() == Status.OK.getStatusCode();} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);return false;}
}

代码分析:见下面流程图

345ce2121267fe7585d492bf0921565c.png

取消租约
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class,search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,EurekaClientConfig config) {return new CloudEurekaClient(manager, config, this.optionalArgs,this.context);
}@PreDestroy
@Override
public synchronized void shutdown() {if (isShutdown.compareAndSet(false, true)) {logger.info("Shutting down DiscoveryClient ...");if (statusChangeListener != null && applicationInfoManager != null) {applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());}cancelScheduledTasks();// If APPINFO was registeredif (applicationInfoManager != null&& clientConfig.shouldRegisterWithEureka()&& clientConfig.shouldUnregisterOnShutdown()) {applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);unregister();}if (eurekaTransport != null) {eurekaTransport.shutdown();}heartbeatStalenessMonitor.shutdown();registryStalenessMonitor.shutdown();Monitors.unregisterObject(this);logger.info("Completed shut down of DiscoveryClient");}
}private void cancelScheduledTasks() {if (instanceInfoReplicator != null) {instanceInfoReplicator.stop();}if (heartbeatExecutor != null) {heartbeatExecutor.shutdownNow();}if (cacheRefreshExecutor != null) {cacheRefreshExecutor.shutdownNow();}if (scheduler != null) {scheduler.shutdownNow();}if (cacheRefreshTask != null) {cacheRefreshTask.cancel();}if (heartbeatTask != null) {heartbeatTask.cancel();}
}

代码分析:见下面流程图

2f689c9147234a3979b9b72c3722c439.png

小结

我们回到开头的原理,知道EurekaClient客户端本质就是4个动作:

  1. 获取服务列表:在CacheRefreshThread里有实现,即CacheRefreshThread的【4.1】步骤的eurekaTransport.queryClient.getApplications

  2. 注册服务实例:在HeartbeatThread里有实现,即HeartbeatThread的【4】步骤的eurekaTransport.registrationClient.register

  3. 租约续约:在HeartbeatThread里有实现,即HeartbeatThread的【2】步骤的eurekaTransport.registrationClient.sendHeartBeat

  4. 取消租约:在定义CloudEurekaClient的@Bean(destroyMethod = "shutdown")注解有生命

但我们还想知道,CacheRefreshThread 和 HeartbeatThread的背后通信,以及在EurekaServer的原理细节。可以,我们放到下一个章节再讲。

其他文章

Kafka消息堆积问题排查

基于SpringMVC的API灰度方案

理解到位:灾备和只读数据库

SQL治理经验谈:索引覆盖

Mybatis链路分析:JDK动态代理和责任链模式的应用

大模型安装部署、测试、接入SpringCloud应用体系

Mybatis插件-租户ID的注入&拦截应用

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com