您的位置:首页 > 新闻 > 热点要闻 > Nacos1.X中对NacosNamingService的实现

Nacos1.X中对NacosNamingService的实现

2025/1/6 14:35:32 来源:https://blog.csdn.net/ystyaoshengting/article/details/142207770  浏览:    关键词:Nacos1.X中对NacosNamingService的实现

NacosNamingService

Nacos Client包中的NamingService实现类为NacosNamingService,通过封装好的SDK供用户使用,来调用nacos对外暴露的OpenAPI

SDK方式只是提供了一种访问的封装,在底层仍然是基于HTTP协议完成请求的。

NamingService提供了以下方法:

  • registerInstance:注册实例

  • deregisterInstance:注销实例

  • getAllInstances:获取某一服务的所有实例

  • selectInstances:获取某一服务健康或不健康的实例

  • selectOneHealthyInstance:根据权重选择一个健康的实例

  • getServerStatus:检测服务端健康状态

  • subscribe:注册对某个服务的监听

  • unsubscribe:注销对某个服务的监听

  • getSubscribeServices:获取被监听的服务

  • getServicesOfServer:获取命名空间(namespace)下的所有服务名

NacosNamingService还初始化了其他核心类,外提供的方法都是委托给其他核心类处理的。按顺序将依次初始化NamingProxy、BeatReactor、HostReactor

  • NamingProxy:用于与Nacos服务端通信,注册服务、注销服务、发送心跳等都经由NamingProxy来请求服务端
  • BeatReactor:本地实例心跳,用于向Nacos服务端发送本地服务的心跳
  • HostReactor:用于从注册中心获取、保存、更新各服务实例信息
public class NacosNamingService implements NamingService {private String namespace;private String endpoint;private String serverList;private String cacheDir;private String logName;private HostReactor hostReactor;//心跳包响应private BeatReactor beatReactor;//进行服务注册的带来,通过NamingProxy与Nacos Server进行最终的通信private NamingProxy serverProxy;public NacosNamingService(String serverList) throws NacosException {Properties properties = new Properties();properties.setProperty("serverAddr", serverList);this.init(properties);}public NacosNamingService(Properties properties) throws NacosException {this.init(properties);}private void init(Properties properties) throws NacosException {ValidatorUtils.checkInitParam(properties);this.namespace = InitUtils.initNamespaceForNaming(properties);InitUtils.initSerialization();this.initServerAddr(properties);InitUtils.initWebRootContext(properties);this.initCacheDir();this.initLogName(properties);//NamingService网络层代理this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);//心跳包检测线程池this.beatReactor = new BeatReactor(this.serverProxy, this.initClientBeatThreadCount(properties));this.hostReactor = new HostReactor(this.serverProxy, this.beatReactor, this.cacheDir, this.isLoadCacheAtStart(properties), this.isPushEmptyProtect(properties), this.initPollingThreadCount(properties));}......省略......//注册服务,委托NamingProxy处理public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);//如果是临时节点if (instance.isEphemeral()) {//构造心跳包BeatInfo beatInfo = this.beatReactor.buildBeatInfo(groupedServiceName, instance);//将心跳包加到定时线程池中定时执行this.beatReactor.addBeatInfo(groupedServiceName, beatInfo);}//服务注册this.serverProxy.registerService(groupedServiceName, groupName, instance);}//注销服务,委托NamingProxy处理public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {//如果是临时节点,则移除心跳包if (instance.isEphemeral()) {this.beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(), instance.getPort());}//调用NamingProxy进行服务注销this.serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);}//获取所有服务实例的方法,委托HostReactor处理public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {ServiceInfo serviceInfo;// 如果该消费者订阅了这个服务,那么会先从本地维护的服务列表中获取,本地为空再从服务注册中心获取服务if (subscribe) {serviceInfo = this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));} else {// 否则实例会从服务中心进行获取serviceInfo = this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));}List list;return (List)(serviceInfo != null && !CollectionUtils.isEmpty(list = serviceInfo.getHosts()) ? list : new ArrayList());}//获取健康(不健康)服务实例方法,委托HostReactor处理public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {ServiceInfo serviceInfo;// 如果该消费者订阅了这个服务,那么会在本地维护一个服务列表,服务从本地获取if (subscribe) {serviceInfo = this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));} else {// 否则实例会从服务中心进行获取serviceInfo = this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));}return this.selectInstances(serviceInfo, healthy);}private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) {List list;if (serviceInfo != null && !CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {Iterator iterator = list.iterator();while(true) {Instance instance;do {if (!iterator.hasNext()) {return list;}instance = (Instance)iterator.next();} while(healthy == instance.isHealthy() && instance.isEnabled() && instance.getWeight() > 0.0D);iterator.remove();}} else {return new ArrayList();}}//获取一个健康的实例,委托HostReactor处理public Instance selectOneHealthyInstance(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {return subscribe ? RandomByWeight.selectHost(this.hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","))) : RandomByWeight.selectHost(this.hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")));}//监听服务实例,委托HostReactor处理public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException {this.hostReactor.subscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","), listener);}//取消监听服务public void unsubscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException {this.hostReactor.unSubscribe(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","), listener);}//查询服务列表public ListView<String> getServicesOfServer(int pageNo, int pageSize, String groupName, AbstractSelector selector) throws NacosException {return this.serverProxy.getServiceList(pageNo, pageSize, groupName, selector);}public List<ServiceInfo> getSubscribeServices() {return this.hostReactor.getSubscribeServices();}public String getServerStatus() {return this.serverProxy.serverHealthy() ? "UP" : "DOWN";}public BeatReactor getBeatReactor() {return this.beatReactor;}public void shutDown() throws NacosException {this.beatReactor.shutdown();this.hostReactor.shutdown();this.serverProxy.shutdown();}
}

NamingProxy

NamingProxy用于与Nacos服务端通信,注册服务、注销服务、发送心跳等都经由NamingProxy来请求服务端。

NamingProxy会启动1个名为com.alibaba.nacos.client.naming.serverlist.updater的线程,用于定期调用refreshSrvIfNeed()方法更新Nacos服务端地址,默认间隔为30秒。

public class NamingProxy implements Closeable {//Nacos自定义的RestTemplateprivate final NacosRestTemplate nacosRestTemplate = NamingHttpClientManager.getInstance().getNacosRestTemplate();//默认服务端口private static final int DEFAULT_SERVER_PORT = 8848;private int serverPort = 8848;//命名空间private final String namespaceId;private final String endpoint;private String nacosDomain;private List<String> serverList;private List<String> serversFromEndpoint = new ArrayList();private final SecurityProxy securityProxy;private long lastSrvRefTime = 0L;private final long vipSrvRefInterMillis;private final long securityInfoRefreshIntervalMills;private Properties properties;//刷新定时任务private ScheduledExecutorService executorService;//最大重试次数,默认值是3private int maxRetry;public NamingProxy(String namespaceId, String endpoint, String serverList, Properties properties) {this.vipSrvRefInterMillis = TimeUnit.SECONDS.toMillis(30L);this.securityInfoRefreshIntervalMills = TimeUnit.SECONDS.toMillis(5L);this.securityProxy = new SecurityProxy(properties, this.nacosRestTemplate);this.properties = properties;this.setServerPort(8848);this.namespaceId = namespaceId;this.endpoint = endpoint;this.maxRetry = ConvertUtils.toInt(properties.getProperty("namingRequestDomainMaxRetryCount", String.valueOf(3)));if (StringUtils.isNotEmpty(serverList)) {this.serverList = Arrays.asList(serverList.split(","));if (this.serverList.size() == 1) {this.nacosDomain = serverList;}}this.initRefreshTask();}//初始化刷新定时任务private void initRefreshTask() {//初始化线程池this.executorService = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {public Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.naming.updater");t.setDaemon(true);return t;}});this.refreshSrvIfNeed();this.securityProxy.login(this.getServerList());//设置定时刷新任务this.executorService.scheduleWithFixedDelay(new Runnable() {public void run() {NamingProxy.this.refreshSrvIfNeed();}}, 0L, this.vipSrvRefInterMillis, TimeUnit.MILLISECONDS);//this.executorService.scheduleWithFixedDelay(new Runnable() {public void run() {NamingProxy.this.securityProxy.login(NamingProxy.this.getServerList());}}, 0L, this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);}//public List<String> getServerListFromEndpoint() {try {String urlString = "http://" + this.endpoint + "/nacos/serverlist";Header header = this.builderHeader();HttpRestResult<String> restResult = this.nacosRestTemplate.get(urlString, header, Query.EMPTY, String.class);if (!restResult.ok()) {throw new IOException("Error while requesting: " + urlString + "'. Server returned: " + restResult.getCode());} else {String content = (String)restResult.getData();List<String> list = new ArrayList();Iterator var6 = IoUtils.readLines(new StringReader(content)).iterator();while(var6.hasNext()) {String line = (String)var6.next();if (!line.trim().isEmpty()) {list.add(line.trim());}}return list;}} catch (Exception var8) {var8.printStackTrace();return null;}}//进行刷新private void refreshSrvIfNeed() {try {if (!CollectionUtils.isEmpty(this.serverList)) {return;}if (System.currentTimeMillis() - this.lastSrvRefTime < this.vipSrvRefInterMillis) {return;}List<String> list = this.getServerListFromEndpoint();if (CollectionUtils.isEmpty(list)) {throw new Exception("Can not acquire Nacos list");}if (!CollectionUtils.isEqualCollection(list, this.serversFromEndpoint)) {}this.serversFromEndpoint = list;this.lastSrvRefTime = System.currentTimeMillis();} catch (Throwable var2) {LogUtils.NAMING_LOGGER.warn("failed to update server list", var2);}}//注册服务public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {Map<String, String> params = new HashMap(16);params.put("namespaceId", this.namespaceId);params.put("serviceName", serviceName);params.put("groupName", groupName);params.put("clusterName", instance.getClusterName());params.put("ip", instance.getIp());params.put("port", String.valueOf(instance.getPort()));params.put("weight", String.valueOf(instance.getWeight()));params.put("enable", String.valueOf(instance.isEnabled()));params.put("healthy", String.valueOf(instance.isHealthy()));params.put("ephemeral", String.valueOf(instance.isEphemeral()));params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));// 把上述服务实例的一些必要参数保存到一个Map中,通过OpenAPI的方式发送注册请求this.reqApi(UtilAndComs.nacosUrlInstance, params, "POST");}//注销服务public void deregisterService(String serviceName, Instance instance) throws NacosException {Map<String, String> params = new HashMap(8);params.put("namespaceId", this.namespaceId);params.put("serviceName", serviceName);params.put("clusterName", instance.getClusterName());params.put("ip", instance.getIp());params.put("port", String.valueOf(instance.getPort()));params.put("ephemeral", String.valueOf(instance.isEphemeral()));this.reqApi(UtilAndComs.nacosUrlInstance, params, "DELETE");}public void updateInstance(String serviceName, String groupName, Instance instance) throws NacosException {Map<String, String> params = new HashMap(8);params.put("namespaceId", this.namespaceId);params.put("serviceName", serviceName);params.put("groupName", groupName);params.put("clusterName", instance.getClusterName());params.put("ip", instance.getIp());params.put("port", String.valueOf(instance.getPort()));params.put("weight", String.valueOf(instance.getWeight()));params.put("enabled", String.valueOf(instance.isEnabled()));params.put("ephemeral", String.valueOf(instance.isEphemeral()));params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));this.reqApi(UtilAndComs.nacosUrlInstance, params, "PUT");}public Service queryService(String serviceName, String groupName) throws NacosException {Map<String, String> params = new HashMap(3);params.put("namespaceId", this.namespaceId);params.put("serviceName", serviceName);params.put("groupName", groupName);String result = this.reqApi(UtilAndComs.nacosUrlService, params, "GET");return (Service)JacksonUtils.toObj(result, Service.class);}public void createService(Service service, AbstractSelector selector) throws NacosException {Map<String, String> params = new HashMap(6);params.put("namespaceId", this.namespaceId);params.put("serviceName", service.getName());params.put("groupName", service.getGroupName());params.put("protectThreshold", String.valueOf(service.getProtectThreshold()));params.put("metadata", JacksonUtils.toJson(service.getMetadata()));params.put("selector", JacksonUtils.toJson(selector));this.reqApi(UtilAndComs.nacosUrlService, params, "POST");}public boolean deleteService(String serviceName, String groupName) throws NacosException {Map<String, String> params = new HashMap(6);params.put("namespaceId", this.namespaceId);params.put("serviceName", serviceName);params.put("groupName", groupName);String result = this.reqApi(UtilAndComs.nacosUrlService, params, "DELETE");return "ok".equals(result);}public void updateService(Service service, AbstractSelector selector) throws NacosException {Map<String, String> params = new HashMap(6);params.put("namespaceId", this.namespaceId);params.put("serviceName", service.getName());params.put("groupName", service.getGroupName());params.put("protectThreshold", String.valueOf(service.getProtectThreshold()));params.put("metadata", JacksonUtils.toJson(service.getMetadata()));params.put("selector", JacksonUtils.toJson(selector));this.reqApi(UtilAndComs.nacosUrlService, params, "PUT");}public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException {Map<String, String> params = new HashMap(8);params.put("namespaceId", this.namespaceId);params.put("serviceName", serviceName);params.put("clusters", clusters);params.put("udpPort", String.valueOf(udpPort));params.put("clientIP", NetUtils.localIP());params.put("healthyOnly", String.valueOf(healthyOnly));return this.reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, "GET");}public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {LogUtils.NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", this.namespaceId, beatInfo.toString());}Map<String, String> params = new HashMap(8);Map<String, String> bodyMap = new HashMap(2);if (!lightBeatEnabled) {bodyMap.put("beat", JacksonUtils.toJson(beatInfo));}params.put("namespaceId", this.namespaceId);params.put("serviceName", beatInfo.getServiceName());params.put("clusterName", beatInfo.getCluster());params.put("ip", beatInfo.getIp());params.put("port", String.valueOf(beatInfo.getPort()));String result = this.reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, "PUT");return JacksonUtils.toObj(result);}public boolean serverHealthy() {try {String result = this.reqApi(UtilAndComs.nacosUrlBase + "/operator/metrics", new HashMap(2), "GET");JsonNode json = JacksonUtils.toObj(result);String serverStatus = json.get("status").asText();return "UP".equals(serverStatus);} catch (Exception var4) {return false;}}//查询服务列表public ListView<String> getServiceList(int pageNo, int pageSize, String groupName) throws NacosException {return this.getServiceList(pageNo, pageSize, groupName, (AbstractSelector)null);}public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector) throws NacosException {Map<String, String> params = new HashMap(4);params.put("pageNo", String.valueOf(pageNo));params.put("pageSize", String.valueOf(pageSize));params.put("namespaceId", this.namespaceId);params.put("groupName", groupName);if (selector != null) {switch(SelectorType.valueOf(selector.getType())) {case none:default:break;case label:ExpressionSelector expressionSelector = (ExpressionSelector)selector;params.put("selector", JacksonUtils.toJson(expressionSelector));}}String result = this.reqApi(UtilAndComs.nacosUrlBase + "/service/list", params, "GET");JsonNode json = JacksonUtils.toObj(result);ListView<String> listView = new ListView();listView.setCount(json.get("count").asInt());listView.setData((List)JacksonUtils.toObj(json.get("doms").toString(), new TypeReference<List<String>>() {}));return listView;}public String reqApi(String api, Map<String, String> params, String method) throws NacosException {return this.reqApi(api, params, Collections.EMPTY_MAP, method);}public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method) throws NacosException {return this.reqApi(api, params, body, this.getServerList(), method);}public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException {params.put("namespaceId", this.getNamespaceId());if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(this.nacosDomain)) {throw new NacosException(400, "no server available");} else {NacosException exception = new NacosException();if (StringUtils.isNotBlank(this.nacosDomain)) {int i = 0;while(i < this.maxRetry) {try {return this.callServer(api, params, body, this.nacosDomain, method);} catch (NacosException var12) {exception = var12;if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {LogUtils.NAMING_LOGGER.debug("request {} failed.", this.nacosDomain, var12);}++i;}}} else {Random random = new Random(System.currentTimeMillis());int index = random.nextInt(servers.size());int i = 0;while(i < servers.size()) {String server = (String)servers.get(index);try {return this.callServer(api, params, body, server, method);} catch (NacosException var13) {exception = var13;if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {LogUtils.NAMING_LOGGER.debug("request {} failed.", server, var13);}index = (index + 1) % servers.size();++i;}}}throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());}}private List<String> getServerList() {List<String> snapshot = this.serversFromEndpoint;if (!CollectionUtils.isEmpty(this.serverList)) {snapshot = this.serverList;}return snapshot;}public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer) throws NacosException {return this.callServer(api, params, body, curServer, "GET");}public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer, String method) throws NacosException {long start = System.currentTimeMillis();long end = 0L;this.injectSecurityInfo(params);Header header = this.builderHeader();String url;if (!curServer.startsWith("https://") && !curServer.startsWith("http://")) {if (!IPUtil.containsPort(curServer)) {curServer = curServer + ":" + this.serverPort;}url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;} else {url = curServer + api;}try {HttpRestResult<String> restResult = this.nacosRestTemplate.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);end = System.currentTimeMillis();MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())).observe((double)(end - start));if (restResult.ok()) {return (String)restResult.getData();} else if (304 == restResult.getCode()) {return "";} else {throw new NacosException(restResult.getCode(), restResult.getMessage());}} catch (Exception var13) {LogUtils.NAMING_LOGGER.error("[NA] failed to request", var13);throw new NacosException(500, var13);}}private void injectSecurityInfo(Map<String, String> params) {if (StringUtils.isNotBlank(this.securityProxy.getAccessToken())) {params.put("accessToken", this.securityProxy.getAccessToken());}String ak = this.getAccessKey();String sk = this.getSecretKey();params.put("app", AppNameUtils.getAppName());if (StringUtils.isNotBlank(ak) && StringUtils.isNotBlank(sk)) {try {String signData = getSignData((String)params.get("serviceName"));String signature = SignUtil.sign(signData, sk);params.put("signature", signature);params.put("data", signData);params.put("ak", ak);} catch (Exception var6) {LogUtils.NAMING_LOGGER.error("inject ak/sk failed.", var6);}}}public Header builderHeader() {Header header = Header.newInstance();header.addParam("Client-Version", VersionUtils.version);header.addParam("User-Agent", UtilAndComs.VERSION);header.addParam("Accept-Encoding", "gzip,deflate,sdch");header.addParam("Connection", "Keep-Alive");header.addParam("RequestId", UuidUtils.generateUuid());header.addParam("Request-Module", "Naming");return header;}private static String getSignData(String serviceName) {return StringUtils.isNotEmpty(serviceName) ? System.currentTimeMillis() + "@@" + serviceName : String.valueOf(System.currentTimeMillis());}public String getAccessKey() {return this.properties == null ? SpasAdapter.getAk() : TemplateUtils.stringEmptyAndThenExecute(this.properties.getProperty("accessKey"), new Callable<String>() {public String call() {return SpasAdapter.getAk();}});}public String getSecretKey() {return this.properties == null ? SpasAdapter.getSk() : TemplateUtils.stringEmptyAndThenExecute(this.properties.getProperty("secretKey"), new Callable<String>() {public String call() throws Exception {return SpasAdapter.getSk();}});}public void setProperties(Properties properties) {this.properties = properties;this.setServerPort(8848);}public String getNamespaceId() {return this.namespaceId;}public void setServerPort(int serverPort) {this.serverPort = serverPort;String sp = System.getProperty("nacos.naming.exposed.port");if (StringUtils.isNotBlank(sp)) {this.serverPort = Integer.parseInt(sp);}}public void shutdown() throws NacosException {String className = this.getClass().getName();LogUtils.NAMING_LOGGER.info("{} do shutdown begin", className);ThreadUtils.shutdownThreadPool(this.executorService, LogUtils.NAMING_LOGGER);NamingHttpClientManager.getInstance().shutdown();SpasAdapter.freeCredentialInstance();LogUtils.NAMING_LOGGER.info("{} do shutdown stop", className);}
}

BeatReactor

在这里插入图片描述

BeatReactor用于向Nacos服务端发送已注册服务的心跳。

成员变量Map<String, BeatInfo> dom2Beat中保存了需要发送的BeatInfo,key为{serviceName}#{ip}#{port},value为对应的BeatInfo。

BeatReactor会启动名为com.alibaba.nacos.naming.beat.sender的线程来发送心跳,默认线程数为1~CPU核心数的一半,可由namingClientBeatThreadCount参数指定。

默认情况下每5秒发送一次心跳,可根据Nacos服务端返回的clientBeatInterval的值调整心跳间隔。

public class BeatReactor implements Closeable {//定时任务线程池private final ScheduledExecutorService executorService;//private final NamingProxy serverProxy;private boolean lightBeatEnabled;//心跳包Mappublic final Map<String, BeatInfo> dom2Beat;public BeatReactor(NamingProxy serverProxy) {this(serverProxy, UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);}public BeatReactor(NamingProxy serverProxy, int threadCount) {this.lightBeatEnabled = false;this.dom2Beat = new ConcurrentHashMap();this.serverProxy = serverProxy;this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {public Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("com.alibaba.nacos.naming.beat.sender");return thread;}});}//添加心跳包到定时线程池中public void addBeatInfo(String serviceName, BeatInfo beatInfo) {//构造心跳包keyString key = this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());BeatInfo existBeat = null;//从dom2Beat移除该key,如果key对应的value不为空的话,表明该beatInfo已经存在了,则停止心跳检测if ((existBeat = (BeatInfo)this.dom2Beat.remove(key)) != null) {//停止心跳检测existBeat.setStopped(true);}//如果dom2Beat中不存在该key,则将key放到map中,并进行定时心跳检测this.dom2Beat.put(key, beatInfo);//定时任务线程池,定时执行里面的线程this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());}public void removeBeatInfo(String serviceName, String ip, int port) {//从心跳包map中删除对应的心跳包信息BeatInfo beatInfo = (BeatInfo)this.dom2Beat.remove(this.buildKey(serviceName, ip, port));//将心跳包状态设置为stopif (beatInfo != null) {beatInfo.setStopped(true);MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());}}//根据服务实例,构造心跳包public BeatInfo buildBeatInfo(Instance instance) {return this.buildBeatInfo(instance.getServiceName(), instance);}public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {BeatInfo beatInfo = new BeatInfo();beatInfo.setServiceName(groupedServiceName);beatInfo.setIp(instance.getIp());beatInfo.setPort(instance.getPort());beatInfo.setCluster(instance.getClusterName());beatInfo.setWeight(instance.getWeight());beatInfo.setMetadata(instance.getMetadata());beatInfo.setScheduled(false);beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());return beatInfo;}public String buildKey(String serviceName, String ip, int port) {return serviceName + "#" + ip + "#" + port;}public void shutdown() throws NacosException {String className = this.getClass().getName();ThreadUtils.shutdownThreadPool(this.executorService, LogUtils.NAMING_LOGGER);}//心跳包发送线程class BeatTask implements Runnable {BeatInfo beatInfo;public BeatTask(BeatInfo beatInfo) {this.beatInfo = beatInfo;}public void run() {//如果心跳包检查没有停止,则发送心跳包if (!this.beatInfo.isStopped()) {long nextTime = this.beatInfo.getPeriod();try {//发送心跳包JsonNode result = BeatReactor.this.serverProxy.sendBeat(this.beatInfo, BeatReactor.this.lightBeatEnabled);long interval = result.get("clientBeatInterval").asLong();boolean lightBeatEnabled = false;if (result.has("lightBeatEnabled")) {lightBeatEnabled = result.get("lightBeatEnabled").asBoolean();}BeatReactor.this.lightBeatEnabled = lightBeatEnabled;if (interval > 0L) {nextTime = interval;}int code = 10200;if (result.has("code")) {code = result.get("code").asInt();}//如果该实例没有在注册中心注册,则进行注册if (code == 20404) {Instance instance = new Instance();instance.setPort(this.beatInfo.getPort());instance.setIp(this.beatInfo.getIp());instance.setWeight(this.beatInfo.getWeight());instance.setMetadata(this.beatInfo.getMetadata());instance.setClusterName(this.beatInfo.getCluster());instance.setServiceName(this.beatInfo.getServiceName());instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(true);try {//注册服务BeatReactor.this.serverProxy.registerService(this.beatInfo.getServiceName(), NamingUtils.getGroupName(this.beatInfo.getServiceName()), instance);} catch (Exception var15) {//log日志}}} catch (NacosException var16) {//log日志} catch (Exception var17) {//log日志} finally {//将线程再次放到定时任务线程池中执行下次的心跳包发送BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(this.beatInfo), nextTime, TimeUnit.MILLISECONDS);}}}}
}

HostReactor

在这里插入图片描述

HostReactor用于获取、保存、更新各Service实例信息。

成员变量Map<String, ServiceInfo> serviceInfoMap中保存了已获取到的服务的信息,key为{服务名}@@{集群名}。

HostReactor会启动名为com.alibaba.nacos.client.naming.updater的线程来更新服务信息,默认线程数为1~CPU核心数的一半,可由namingPollingThreadCount参数指定。

定时任务UpdateTask会根据服务的cacheMillis值定时更新服务信息,默认值为10秒。该定时任务会在获取某一服务信息时创建,保存在成员变量Map<String, ScheduledFuture<?>> futureMap中。

public class HostReactor implements Closeable {private static final long DEFAULT_DELAY = 1000L;private static final long UPDATE_HOLD_INTERVAL = 5000L;private final Map<String, ScheduledFuture<?>> futureMap;// 本地已存在的服务列表,key是服务名称,value是ServiceInfoprivate final Map<String, ServiceInfo> serviceInfoMap;// 待更新的实例列表private final Map<String, Object> updatingMap;private final PushReceiver pushReceiver;private final BeatReactor beatReactor;private final NamingProxy serverProxy;private final FailoverReactor failoverReactor;private final String cacheDir;private final boolean pushEmptyProtection;// 定时任务(负责服务列表的实时更新)private final ScheduledExecutorService executor;//实例变化通知者,负责管理服务的订阅信息,并进行回调private final InstancesChangeNotifier notifier;public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir) {this(serverProxy, beatReactor, cacheDir, false, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);}public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir, boolean loadCacheAtStart, boolean pushEmptyProtection, int pollingThreadCount) {this.futureMap = new HashMap();this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {public Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("com.alibaba.nacos.client.naming.updater");return thread;}});this.beatReactor = beatReactor;this.serverProxy = serverProxy;this.cacheDir = cacheDir;if (loadCacheAtStart) {this.serviceInfoMap = new ConcurrentHashMap(DiskCache.read(this.cacheDir));} else {this.serviceInfoMap = new ConcurrentHashMap(16);}this.pushEmptyProtection = pushEmptyProtection;this.updatingMap = new ConcurrentHashMap();this.failoverReactor = new FailoverReactor(this, cacheDir);this.pushReceiver = new PushReceiver(this);this.notifier = new InstancesChangeNotifier();NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);NotifyCenter.registerSubscriber(this.notifier);}public Map<String, ServiceInfo> getServiceInfoMap() {return this.serviceInfoMap;}//增加定时刷新服务任务public synchronized ScheduledFuture<?> addTask(HostReactor.UpdateTask task) {return this.executor.schedule(task, 1000L, TimeUnit.MILLISECONDS);}//订阅服务,serviceName服务名称,clusters集群列表,EventLintener回调Listenerpublic void subscribe(String serviceName, String clusters, EventListener eventListener) {//给该服务增加监听器,服务发生变化后进行回调this.notifier.registerListener(serviceName, clusters, eventListener);//将该服务添加到HostReactor的定时任务中,定时刷新this.getServiceInfo(serviceName, clusters);}//取消订阅服务,serviceName服务名称,clusters集群列表,EventLintener回调Listenerpublic void unSubscribe(String serviceName, String clusters, EventListener eventListener) {this.notifier.deregisterListener(serviceName, clusters, eventListener);}public List<ServiceInfo> getSubscribeServices() {return this.notifier.getSubscribeServices();}//处理从注册中心获取到的JSON格式的服务实例,并更新到本地serviceInfoMap中public ServiceInfo processServiceJson(String json) {ServiceInfo serviceInfo = (ServiceInfo)JacksonUtils.toObj(json, ServiceInfo.class);String serviceKey = serviceInfo.getKey();if (serviceKey == null) {return null;} else {ServiceInfo oldService = (ServiceInfo)this.serviceInfoMap.get(serviceKey);if (this.pushEmptyProtection && !serviceInfo.validate()) {return oldService;} else {boolean changed = false;if (oldService != null) {if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {LogUtils.NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + serviceInfo.getLastRefTime());}this.serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);Map<String, Instance> oldHostMap = new HashMap(oldService.getHosts().size());Iterator var7 = oldService.getHosts().iterator();while(var7.hasNext()) {Instance host = (Instance)var7.next();oldHostMap.put(host.toInetAddr(), host);}Map<String, Instance> newHostMap = new HashMap(serviceInfo.getHosts().size());Iterator var17 = serviceInfo.getHosts().iterator();while(var17.hasNext()) {Instance host = (Instance)var17.next();newHostMap.put(host.toInetAddr(), host);}Set<Instance> modHosts = new HashSet();Set<Instance> newHosts = new HashSet();Set<Instance> remvHosts = new HashSet();List<Entry<String, Instance>> newServiceHosts = new ArrayList(newHostMap.entrySet());Iterator var12 = newServiceHosts.iterator();while(true) {Entry entry;Instance host;String key;while(var12.hasNext()) {entry = (Entry)var12.next();host = (Instance)entry.getValue();key = (String)entry.getKey();if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(), ((Instance)oldHostMap.get(key)).toString())) {modHosts.add(host);} else if (!oldHostMap.containsKey(key)) {newHosts.add(host);}}var12 = oldHostMap.entrySet().iterator();while(var12.hasNext()) {entry = (Entry)var12.next();host = (Instance)entry.getValue();key = (String)entry.getKey();if (!newHostMap.containsKey(key) && !newHostMap.containsKey(key)) {remvHosts.add(host);}}if (newHosts.size() > 0) {changed = true;}if (remvHosts.size() > 0) {changed = true;}if (modHosts.size() > 0) {changed = true;this.updateBeatInfo(modHosts);}serviceInfo.setJsonFromServer(json);if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));DiskCache.write(serviceInfo, this.cacheDir);}break;}} else {changed = true;this.serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));serviceInfo.setJsonFromServer(json);DiskCache.write(serviceInfo, this.cacheDir);}MetricsMonitor.getServiceInfoMapSizeMonitor().set((double)this.serviceInfoMap.size());if (changed) {//记录日志}return serviceInfo;}}}private void updateBeatInfo(Set<Instance> modHosts) {Iterator var2 = modHosts.iterator();while(var2.hasNext()) {Instance instance = (Instance)var2.next();String key = this.beatReactor.buildKey(instance.getServiceName(), instance.getIp(), instance.getPort());if (this.beatReactor.dom2Beat.containsKey(key) && instance.isEphemeral()) {BeatInfo beatInfo = this.beatReactor.buildBeatInfo(instance);this.beatReactor.addBeatInfo(instance.getServiceName(), beatInfo);}}}//从本地缓存中获取服务实例信息private ServiceInfo getServiceInfo0(String serviceName, String clusters) {String key = ServiceInfo.getKey(serviceName, clusters);return (ServiceInfo)this.serviceInfoMap.get(key);}//直接从服务注册中心获取服务public ServiceInfo getServiceInfoDirectlyFromServer(String serviceName, String clusters) throws NacosException {String result = this.serverProxy.queryList(serviceName, clusters, 0, false);return StringUtils.isNotEmpty(result) ? (ServiceInfo)JacksonUtils.toObj(result, ServiceInfo.class) : null;}//获取服务,先从本地获取,本地没有,则进行维护,并从注册中心更新最新服务信息public ServiceInfo getServiceInfo(String serviceName, String clusters) {String key = ServiceInfo.getKey(serviceName, clusters);if (this.failoverReactor.isFailoverSwitch()) {return this.failoverReactor.getService(key);} else {// 1.先通过serverName即服务名获得一个serviceInfoServiceInfo serviceObj = this.getServiceInfo0(serviceName, clusters);if (null == serviceObj) {//如果没有serviceInfo,则通过传进来的参数new出一个新的serviceInfo对象,并且同时维护到本地Map和更新MapserviceObj = new ServiceInfo(serviceName, clusters);this.serviceInfoMap.put(serviceObj.getKey(), serviceObj);this.updatingMap.put(serviceName, new Object());// 2.updateServiceNow(),立刻去Nacos服务端拉取该服务最新实例列表,更新serviceInfoMapthis.updateServiceNow(serviceName, clusters);this.updatingMap.remove(serviceName);} else if (this.updatingMap.containsKey(serviceName)) {synchronized(serviceObj) {try {serviceObj.wait(5000L);} catch (InterruptedException var8) {}}}// 3.定时更新实例信息this.scheduleUpdateIfAbsent(serviceName, clusters);// 4.最后返回服务实例数据(前面已经进行了更新)return (ServiceInfo)this.serviceInfoMap.get(serviceObj.getKey());}}//立即从注册中心拉取该服务最新实例列表,并更新到本地private void updateServiceNow(String serviceName, String clusters) {try {this.updateService(serviceName, clusters);} catch (NacosException var4) {  }}//通过定时任务,每10秒去更新一次数据public void scheduleUpdateIfAbsent(String serviceName, String clusters) {if (this.futureMap.get(ServiceInfo.getKey(serviceName, clusters)) == null) {synchronized(this.futureMap) {if (this.futureMap.get(ServiceInfo.getKey(serviceName, clusters)) == null) {//创建一个UpdateTask的更新线程任务,每10秒去异步更新集合数据ScheduledFuture<?> future = this.addTask(new HostReactor.UpdateTask(serviceName, clusters));this.futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);}}}}//从注册中心拉取该服务最新实例列表,并更新到本地public void updateService(String serviceName, String clusters) throws NacosException {ServiceInfo oldService = this.getServiceInfo0(serviceName, clusters);boolean var12 = false;try {var12 = true;//从注册中心查询服务下的实例列表String result = this.serverProxy.queryList(serviceName, clusters, this.pushReceiver.getUdpPort(), false);if (StringUtils.isNotEmpty(result)) {//处理从注册中心获取到的服务实例JSON数据,更新本地服务列表this.processServiceJson(result);var12 = false;} else {var12 = false;}} finally {if (var12) {if (oldService != null) {synchronized(oldService) {oldService.notifyAll();}}}}if (oldService != null) {synchronized(oldService) {oldService.notifyAll();}}}//仅仅执行刷新,从Nacos注册中心获取服务,但不刷新本地列表public void refreshOnly(String serviceName, String clusters) {try {this.serverProxy.queryList(serviceName, clusters, this.pushReceiver.getUdpPort(), false);} catch (Exception var4) {    }}public void shutdown() throws NacosException {String className = this.getClass().getName();LogUtils.NAMING_LOGGER.info("{} do shutdown begin", className);ThreadUtils.shutdownThreadPool(this.executor, LogUtils.NAMING_LOGGER);this.pushReceiver.shutdown();this.failoverReactor.shutdown();NotifyCenter.deregisterSubscriber(this.notifier);LogUtils.NAMING_LOGGER.info("{} do shutdown stop", className);}//更新任务线程public class UpdateTask implements Runnable {long lastRefTime = 9223372036854775807L;private final String clusters;private final String serviceName;private int failCount = 0;public UpdateTask(String serviceName, String clusters) {this.serviceName = serviceName;this.clusters = clusters;}private void incFailCount() {int limit = 6;if (this.failCount != limit) {++this.failCount;}}private void resetFailCount() {this.failCount = 0;}public void run() {long delayTime = 1000L;try {//从本地缓存中获取服务实例列表ServiceInfo serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters));//如果服务为空,则更新本地服务列表if (serviceObj == null) {HostReactor.this.updateService(this.serviceName, this.clusters);return;}// 过期服务(服务的最新更新时间小于等于缓存刷新时间),从注册中心重新查询if (serviceObj.getLastRefTime() <= this.lastRefTime) {HostReactor.this.updateService(this.serviceName, this.clusters);serviceObj = (ServiceInfo)HostReactor.this.serviceInfoMap.get(ServiceInfo.getKey(this.serviceName, this.clusters));} else {HostReactor.this.refreshOnly(this.serviceName, this.clusters);}// 刷新更新时间 this.lastRefTime = serviceObj.getLastRefTime();// 判断该注册的Service是否被订阅,如果没有订阅则不再执行if (!HostReactor.this.notifier.isSubscribed(this.serviceName, this.clusters) && !HostReactor.this.futureMap.containsKey(ServiceInfo.getKey(this.serviceName, this.clusters))) {return;}if (CollectionUtils.isEmpty(serviceObj.getHosts())) {this.incFailCount();return;}// 下次更新缓存时间设置,默认为10秒delayTime = serviceObj.getCacheMillis();//任务运行成功,重置失败次数为0this.resetFailCount();} catch (Throwable var7) {//任务执行失败,失败次数+1this.incFailCount();} finally {//取delayTime<<failCount的值与60000L之间的小值,作为下次运行的时间间隔//下次调度刷新时间,下次执行的时间与failCount有关 HostReactor.this.executor.schedule(this, Math.min(delayTime << this.failCount, 60000L), TimeUnit.MILLISECONDS);}}}
}

获取服务实例流程

在这里插入图片描述

更新服务实例流程

在这里插入图片描述

PushReceiver

PushReceiver用于接收Nacos服务端的推送,初始化时会创建DatagramSocket使用UDP的方式接收推送。

会启动1个名为com.alibaba.nacos.naming.push.receiver的线程。

public class PushReceiver implements Runnable, Closeable {private static final Charset UTF_8 = Charset.forName("UTF-8");private static final int UDP_MSS = 65536;private ScheduledExecutorService executorService;private DatagramSocket udpSocket;private HostReactor hostReactor;private volatile boolean closed = false;public static String getPushReceiverUdpPort() {return System.getenv("push.receiver.udp.port");}public PushReceiver(HostReactor hostReactor) {try {this.hostReactor = hostReactor;String udpPort = getPushReceiverUdpPort();if (StringUtils.isEmpty(udpPort)) {this.udpSocket = new DatagramSocket();} else {this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));}this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {public Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("com.alibaba.nacos.naming.push.receiver");return thread;}});this.executorService.execute(this);} catch (Exception var3) {LogUtils.NAMING_LOGGER.error("[NA] init udp socket failed", var3);}}public void run() {while(!this.closed) {try {byte[] buffer = new byte[65536];DatagramPacket packet = new DatagramPacket(buffer, buffer.length);this.udpSocket.receive(packet);String json = (new String(IoUtils.tryDecompress(packet.getData()), UTF_8)).trim();LogUtils.NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());PushReceiver.PushPacket pushPacket = (PushReceiver.PushPacket)JacksonUtils.toObj(json, PushReceiver.PushPacket.class);String ack;if (!"dom".equals(pushPacket.type) && !"service".equals(pushPacket.type)) {if ("dump".equals(pushPacket.type)) {ack = "{\"type\": \"dump-ack\", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(this.hostReactor.getServiceInfoMap())) + "\"}";} else {ack = "{\"type\": \"unknown-ack\", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":\"\"}";}} else {//调用HostReactor处理收到的json数据this.hostReactor.processServiceJson(pushPacket.data);ack = "{\"type\": \"push-ack\", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":\"\"}";}this.udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length, packet.getSocketAddress()));} catch (Exception var6) {if (this.closed) {return;}LogUtils.NAMING_LOGGER.error("[NA] error while receiving push data", var6);}}}public void shutdown() throws NacosException {String className = this.getClass().getName();LogUtils.NAMING_LOGGER.info("{} do shutdown begin", className);ThreadUtils.shutdownThreadPool(this.executorService, LogUtils.NAMING_LOGGER);this.closed = true;this.udpSocket.close();LogUtils.NAMING_LOGGER.info("{} do shutdown stop", className);}public int getUdpPort() {return this.udpSocket.getLocalPort();}public static class PushPacket {public String type;public long lastRefTime;public String data;public PushPacket() {}}
}

InstancesChangeNotifier

该对象负责保存服务实例的监听,当服务实例发生变化的时候,负责进行通知,回调监听器方法

public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {//监听器mapprivate final Map<String, ConcurrentHashSet<EventListener>> listenerMap = new ConcurrentHashMap();private final Object lock = new Object();public InstancesChangeNotifier() {}//注册服务监听器public void registerListener(String serviceName, String clusters, EventListener listener) {String key = ServiceInfo.getKey(serviceName, clusters);ConcurrentHashSet<EventListener> eventListeners = (ConcurrentHashSet)this.listenerMap.get(key);if (eventListeners == null) {synchronized(this.lock) {eventListeners = (ConcurrentHashSet)this.listenerMap.get(key);if (eventListeners == null) {eventListeners = new ConcurrentHashSet();this.listenerMap.put(key, eventListeners);}}}eventListeners.add(listener);}//取消注册服务监听器public void deregisterListener(String serviceName, String clusters, EventListener listener) {String key = ServiceInfo.getKey(serviceName, clusters);ConcurrentHashSet<EventListener> eventListeners = (ConcurrentHashSet)this.listenerMap.get(key);if (eventListeners != null) {eventListeners.remove(listener);if (CollectionUtils.isEmpty(eventListeners)) {this.listenerMap.remove(key);}}}//判断是否被订阅public boolean isSubscribed(String serviceName, String clusters) {String key = ServiceInfo.getKey(serviceName, clusters);ConcurrentHashSet<EventListener> eventListeners = (ConcurrentHashSet)this.listenerMap.get(key);return CollectionUtils.isNotEmpty(eventListeners);}public List<ServiceInfo> getSubscribeServices() {List<ServiceInfo> serviceInfos = new ArrayList();Iterator var2 = this.listenerMap.keySet().iterator();while(var2.hasNext()) {String key = (String)var2.next();serviceInfos.add(ServiceInfo.fromKey(key));}return serviceInfos;}//服务实例变化事件public void onEvent(InstancesChangeEvent event) {String key = ServiceInfo.getKey(event.getServiceName(), event.getClusters());//获取该服务的订阅者ConcurrentHashSet<EventListener> eventListeners = (ConcurrentHashSet)this.listenerMap.get(key);if (!CollectionUtils.isEmpty(eventListeners)) {Iterator var4 = eventListeners.iterator();while(true) {//循环该服务的订阅者,调用订阅者的回调方法while(var4.hasNext()) {final EventListener listener = (EventListener)var4.next();final Event namingEvent = this.transferToNamingEvent(event);//如果该listener继承了Nacos定义的AbstractEventListener,并且executor不为空,则通过executor以新的线程的方式回调监听onEvent方法if (listener instanceof AbstractEventListener && ((AbstractEventListener)listener).getExecutor() != null) {((AbstractEventListener)listener).getExecutor().execute(new Runnable() {public void run() {listener.onEvent(namingEvent);}});} //否则直接调用该监听器的onEvent方法else {listener.onEvent(namingEvent);}}//回调结束,返回return;}}}//转换事件private Event transferToNamingEvent(InstancesChangeEvent instancesChangeEvent) {return new NamingEvent(instancesChangeEvent.getServiceName(), instancesChangeEvent.getGroupName(), instancesChangeEvent.getClusters(), instancesChangeEvent.getHosts());}public Class<? extends com.alibaba.nacos.common.notify.Event> subscribeType() {return InstancesChangeEvent.class;}
}

参考http://dreamphp.cn/blog/detail?blog_id=25851

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com