nacos的服务发现主要采用服务端主动推送+客户端定时拉取;心跳机制通过每5s向服务端发送心跳任务来保活,当超过15s服务端未接收到心跳任务时,将该实例设置为非健康状态;当超过30s时,删除该实例。
1.服务发现
nacos主要采用服务端主动推送+客户端定时拉取来保证AP架构的高可用性。
NacosNamingService:服务发现的接口(客户端!)
每个nacos实例有本地缓存Map,存放所有的实例
调用查询服务实例列表
/nacos`/v1/ns/instance/list
NacosNamingService.getAllInstances() -> getServiceInfo()
核心方法是getServiceInfo,代码如下:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());String key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);// 从本地拿实例if (null == serviceObj) {// 如果拿到为空,则更新服务列表serviceObj = new ServiceInfo(serviceName, clusters);serviceInfoMap.put(serviceObj.getKey(), serviceObj);updatingMap.put(serviceName, new Object());updateServiceNow(serviceName, clusters);updatingMap.remove(serviceName);} else if (updatingMap.containsKey(serviceName)) {if (UPDATE_HOLD_INTERVAL > 0) {// hold a moment waiting for update finishsynchronized (serviceObj) {try {serviceObj.wait(UPDATE_HOLD_INTERVAL);} catch (InterruptedException e) {NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);}}}}scheduleUpdateIfAbsent(serviceName, clusters);// 定时拉取return serviceInfoMap.get(serviceObj.getKey());// 从map拿}
本地从服务端拉取服务实例,放到本地缓存中
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)throws NacosException {final Map<String, String> params = new HashMap<String, String>(8);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put("clusters", clusters);params.put("udpPort", String.valueOf(udpPort));params.put("clientIP", NetUtils.localIP());params.put("healthyOnly", String.valueOf(healthyOnly));return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);// 调用接口}
客户端通过定时任务-->定时拉取服务端的实例,更新本地缓存(在发起服务调用的时候,才会执行定时任务!)
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}synchronized (futureMap) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);}}
2.心跳机制
NamingService->register
注册实例的时候,客户端会有一个心跳任务,定时5s向服务端发送心跳!
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
BeatTask就是心跳任务
调用Instance/beat接口,发送实例心跳检查
BeatTask#run的代码如下:
@Overridepublic void run() {if (beatInfo.isStopped()) {return;}long nextTime = beatInfo.getPeriod();try {JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);long interval = result.get("clientBeatInterval").asLong();boolean lightBeatEnabled = false;if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();}BeatReactor.this.lightBeatEnabled = lightBeatEnabled;if (interval > 0) {nextTime = interval;}int code = NamingResponseCode.OK;if (result.has(CommonParams.CODE)) {code = result.get(CommonParams.CODE).asInt();}if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {Instance instance = new Instance();instance.setPort(beatInfo.getPort());instance.setIp(beatInfo.getIp());instance.setWeight(beatInfo.getWeight());instance.setMetadata(beatInfo.getMetadata());instance.setClusterName(beatInfo.getCluster());instance.setServiceName(beatInfo.getServiceName());instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(true);try {serverProxy.registerService(beatInfo.getServiceName(),NamingUtils.getGroupName(beatInfo.getServiceName()), instance);} catch (Exception ignore) {}}} catch (NacosException ex) {NAMING_LOGGER.warn("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());} catch (Exception unknownEx) {NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);} finally {executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);// 定时任务}}
服务端会更新心跳的时间!每隔5秒检查一下实例健康状态
对于集群而言,在一台服务端执行定时任务即可!不需要再所有节点都执行定时任务
当实例下线后,服务端通过InstanceController,检查健康状态(健康任务检查)。代码入口:
createEmptyService->createServiceIfAbsent->putServiceAndInit->init->scheduleCheck(定时任务)
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {// 心跳时间判断
如果超过15秒,设置实例为非健康(更新健康状态)
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// 如果已经过去了30s,直接删掉这个机器(删除实例)
asyncHttpRequest(url, headers, paramValues, callback, HttpMethod.DELETE);
核心代码如下:
@Overridepublic void run() {try {if (!getDistroMapper().responsible(service.getName())) {return;}if (!getSwitchDomain().isHealthCheckEnabled()) {return;}List<Instance> instances = service.allIPs(true);// 获取服务端的所有实例// first set health status of instances:for (Instance instance : instances) {if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {// 心跳时间判断if (!instance.isMarked()) {if (instance.isHealthy()) {instance.setHealthy(false);// 如果超过15s,健康设置为falseLoggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",instance.getIp(), instance.getPort(), instance.getClusterName(),service.getName(), UtilsAndCommons.LOCALHOST_SITE,instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}if (!getGlobalConfig().isExpireInstance()) {return;}// then remove obsolete instances:for (Instance instance : instances) {if (instance.isMarked()) {continue;}if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// delete instanceLoggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),JacksonUtils.toJson(instance));deleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}
3.服务事件变动
客户端每隔5秒从服务端拉取一次信息,是不是有延迟!
对于AP架构而言,有一点延迟 无所谓。Ribbon会进行重试
当服务端的服务列表变化时,服务端会主动推送给客户端信息
核心代码入口:notifier#run->handle->onChange
onChange更新注册表的相关代码!
getPushService().serviceChanged(this);// 发布 服务变化 的事件,通知客户端
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());for (String clusterName : clusterMap.keySet()) {ipMap.put(clusterName, new ArrayList<>());}for (Instance instance : instances) {try {if (instance == null) {Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");continue;}if (StringUtils.isEmpty(instance.getClusterName())) {instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);}if (!clusterMap.containsKey(instance.getClusterName())) {Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",instance.getClusterName(), instance.toJson());Cluster cluster = new Cluster(instance.getClusterName(), this);cluster.init();getClusterMap().put(instance.getClusterName(), cluster);}List<Instance> clusterIPs = ipMap.get(instance.getClusterName());if (clusterIPs == null) {clusterIPs = new LinkedList<>();ipMap.put(instance.getClusterName(), clusterIPs);}clusterIPs.add(instance);} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);}}for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {//make every ip mineList<Instance> entryIPs = entry.getValue();clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);// 核心注册逻辑}setLastModifiedMillis(System.currentTimeMillis());getPushService().serviceChanged(this);// 发布 服务变化 的事件StringBuilder stringBuilder = new StringBuilder();for (Instance instance : allIPs()) {stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");}Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),stringBuilder.toString());}
(通过全文搜索事件,找到这个方法)PushService#onApplicationEvent,它来处理事件。代码如下:
udpPush(ackEntry);// 服务端给客户端发送udp信息,通知客户端实例变动
@Override
public void onApplicationEvent(ServiceChangeEvent event) {Service service = event.getService();String serviceName = service.getName();String namespaceId = service.getNamespaceId();Future future = GlobalExecutor.scheduleUdpSender(() -> {try {Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));if (MapUtils.isEmpty(clients)) {return;}Map<String, Object> cache = new HashMap<>(16);long lastRefTime = System.nanoTime();for (PushClient client : clients.values()) {if (client.zombie()) {Loggers.PUSH.debug("client is zombie: " + client.toString());clients.remove(client.toString());Loggers.PUSH.debug("client is zombie: " + client.toString());continue;}Receiver.AckEntry ackEntry;Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());byte[] compressData = null;Map<String, Object> data = null;if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);compressData = (byte[]) (pair.getValue0());data = (Map<String, Object>) pair.getValue1();Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());}if (compressData != null) {ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);} else {ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);if (ackEntry != null) {cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));}}Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",client.getServiceName(), client.getAddrStr(), client.getAgent(),(ackEntry == null ? null : ackEntry.key));udpPush(ackEntry);// 给客户端发送udp信息}} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);} finally {futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));}}, 1000, TimeUnit.MILLISECONDS);futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);}
客户端接收udp报文的代码如下:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());String key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);// 从本地拿实例if (null == serviceObj) {// 如果拿到为空,则更新服务列表serviceObj = new ServiceInfo(serviceName, clusters);serviceInfoMap.put(serviceObj.getKey(), serviceObj);updatingMap.put(serviceName, new Object());updateServiceNow(serviceName, clusters);// upd的端口