您的位置:首页 > 财经 > 产业 > 俄罗斯最新军事动态_ppt成品免费下载的网站_营销策划公司取名大全_推广哪个app最挣钱

俄罗斯最新军事动态_ppt成品免费下载的网站_营销策划公司取名大全_推广哪个app最挣钱

2024/12/26 23:27:26 来源:https://blog.csdn.net/weixin_41645232/article/details/144637423  浏览:    关键词:俄罗斯最新军事动态_ppt成品免费下载的网站_营销策划公司取名大全_推广哪个app最挣钱
俄罗斯最新军事动态_ppt成品免费下载的网站_营销策划公司取名大全_推广哪个app最挣钱

spring-cloud-starter-netflix-eureka-client 版本是3.0.3
核心装备类:
EurekaClientAutoConfiguration
EurekaDiscoveryClientConfiguration
核心类,以及引用的关系如下

EurekaRegistration - EurekaInstanceConfigBean 实例配置bean- ApplicationInfoManager 应用信息管理器- CloudEurekaClient 客户端- EurekaHealthCheckHandler 健康检查处理器EurekaAutoServiceRegistration- EurekaServiceRegistry 服务注册- EurekaRegistration 

EurekaAutoServiceRegistration 实现了SmartLifecycle,在容器启动会调用它的start方法
start方法里面调用了 EurekaServiceRegistry.register(EurekaRegistration reg)
stop方法里面调用了 EurekaServiceRegistry.deregister(EurekaRegistration reg)

public void start() {// only set the port if the nonSecurePort or securePort is 0 and this.port != 0if (this.port.get() != 0) {if (this.registration.getNonSecurePort() == 0) {this.registration.setNonSecurePort(this.port.get());}if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {this.registration.setSecurePort(this.port.get());}}// only initialize if nonSecurePort is greater than 0 and it isn't already running// because of containerPortInitializer belowif (!this.running.get() && this.registration.getNonSecurePort() > 0) {// 服务注册this.serviceRegistry.register(this.registration);this.context.publishEvent(new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));this.running.set(true);}
}public void stop() {// 下线 this.serviceRegistry.deregister(this.registration);this.running.set(false);
}

EurekaServiceRegistry.register(EurekaRegistration reg)注册方法里面有两块逻辑

// 修改状态,会发送StatusChangeEvent事件,通知相关listener
reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
// 注册健康检查处理器
reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler));

这里setInstanceStatus方法,假如状态变更时,会通知监听器,执行notify方法

public synchronized void setInstanceStatus(InstanceStatus status) {InstanceStatus next = instanceStatusMapper.map(status);if (next == null) {return;}InstanceStatus prev = instanceInfo.setStatus(next);if (prev != null) {for (StatusChangeListener listener : listeners.values()) {try {listener.notify(new StatusChangeEvent(prev, next));} catch (Exception e) {logger.warn("failed to notify listener: {}", listener.getId(), e);}}}
}

继续追踪listener是如何放入ApplicationInfoManager的
1.ApplicationInfoManager.registerStatusChangeListener方法注册监听器
2.registerStatusChangeListener方法又是在initScheduledTasks方法里面执行的
3.initScheduledTasks方法在DiscoveryClient构造方法中执行的
4.CloudEurekaClient又是DiscoveryClient的子类,所以这块是在自动装配DiscoveryClient执行的
从DiscoveryClient构造器开始分析,由于代码太多,只保留核心逻辑

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {......try {// 调度线程池scheduler = Executors.newScheduledThreadPool(2,new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-%d").setDaemon(true).build());// 心跳线程池heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build());  // use direct handoff// 缓存刷新线程池cacheRefreshExecutor = new ThreadPoolExecutor(1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build());  // use direct handoff// eureka跟服务器交互的客户端eurekaTransport = new EurekaTransport();scheduleServerEndpointTask(eurekaTransport, args);AzToRegionMapper azToRegionMapper;if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);} else {azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);}if (null != remoteRegionsToFetch.get()) {azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));}instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());} catch (Throwable e) {throw new RuntimeException("Failed to initialize DiscoveryClient!", e);}if (clientConfig.shouldFetchRegistry()) {try {// 拉取服务信息boolean primaryFetchRegistryResult = fetchRegistry(false);if (!primaryFetchRegistryResult) {logger.info("Initial registry fetch from primary servers failed");}boolean backupFetchRegistryResult = true;if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {backupFetchRegistryResult = false;logger.info("Initial registry fetch from backup servers failed");}if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");}} catch (Throwable th) {logger.error("Fetch registry error at startup: {}", th.getMessage());throw new IllegalStateException(th);}}// call and execute the pre registration handler before all background tasks (inc registration) is startedif (this.preRegistrationHandler != null) {this.preRegistrationHandler.beforeRegistration();}...// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetchinitScheduledTasks();...
}

着重看下initScheduledTasks方法

  1. 开启缓存刷新定时调度
  2. 开启心跳续约定时调度
  3. 创建instanceInfoReplicator实例信息复制任务
  4. 创建匿名内部实现类ApplicationInfoManager$StatusChangeListener,notify方法调用了instanceInfoReplicator.onDemandUpdate();
  5. 将statusChangeListener实例注册到applicationInfoManager
  6. 调用instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());开启instanceInfoReplicator定时调度
private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();cacheRefreshTask = new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread());scheduler.schedule(cacheRefreshTask,registryFetchIntervalSeconds, TimeUnit.SECONDS);}if (clientConfig.shouldRegisterWithEureka()) {int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);// Heartbeat timerheartbeatTask = new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread());scheduler.schedule(heartbeatTask,renewalIntervalInSecs, TimeUnit.SECONDS);// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSizestatusChangeListener = new ApplicationInfoManager.StatusChangeListener() {@Overridepublic String getId() {return "statusChangeListener";}@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {logger.info("Saw local status change event {}", statusChangeEvent);// 状态变更,如果满足条件则执行注册服务实例信息instanceInfoReplicator.onDemandUpdate();}};if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener);}// 开启定时调度推送最新服务实例信息instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());} else {logger.info("Not registering with Eureka server per configuration");}
}

所以当状态变更时,applicationInfoManager会执行listener的notify方法,也就是执行了instanceInfoReplicator.onDemandUpdate()方法
通过调度器提交了一个任务,任务里面执行了当前InstanceInfoReplicator任务的run方法,最终完成服务注册。

public void run() {try {// 刷新实例信息 discoveryClient.refreshInstanceInfo();Long dirtyTimestamp = instanceInfo.isDirtyWithTime();if (dirtyTimestamp != null) {// 服务注册discoveryClient.register();instanceInfo.unsetIsDirty(dirtyTimestamp);}} catch (Throwable t) {logger.warn("There was a problem with the instance info replicator", t);} finally {// 固定间隔时间继续执行当前任务Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}
}

所以eureka是基于ScheduledExecutorService来定时刷新服务缓存,心跳续约,以及服务重新注册(如果服务实例信息和eureka服务器不一致的时候)

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com