1 概述
Dubbo中的路由策略的作用是服务消费端使用路由策略对服务提供者列表进行过滤和选择,最终获取符合路由规则的服务提供者。
在Dubbo 3.0 以后主要使用的是 StateRouter。StateRouter是Dubbo中的一种特殊类型的Router,它基于状态进行路由决策。具体来说,StateRouter可以根据服务提供者的状态信息(如负载、响应时间等)进行路由选择。
2 路由策略的使用
服务消费端的集群容错策略在获取可用的服务提供者列表时,会先调用 RouterChain 的 route() 方方法,其内部根据路由规则信息和 invoker 列表获取符合路由规则的服务提供者。具体细节如下所示。
org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker#invoke
public Result invoke(final Invocation invocation) throws RpcException {checkWhetherDestroyed();// binding attachments into invocation.
// Map<String, Object> contextAttachments = RpcContext.getClientAttachment().getObjectAttachments();
// if (contextAttachments != null && contextAttachments.size() != 0) {
// ((RpcInvocation) invocation).addObjectAttachmentsIfAbsent(contextAttachments);
// }InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Router route.");// 1、获取服务提供者列表-InvokersList<Invoker<T>> invokers = list(invocation);InvocationProfilerUtils.releaseDetailProfiler(invocation);checkInvokers(invokers, invocation);// 2、获取负载均衡策略。根据url参数找LoadBalance扩展,默认RandomLoadBalanceLoadBalance loadbalance = initLoadBalance(invokers, invocation);RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Cluster " + this.getClass().getName() + " invoke.");try {// 3、执行远程调用。子类实现,会有不同的集群容错方式return doInvoke(invocation, invokers, loadbalance);} finally {InvocationProfilerUtils.releaseDetailProfiler(invocation);}
}protected List<Invoker<T>> list(Invocation invocation) throws RpcException {return getDirectory().list(invocation);
}
org.apache.dubbo.rpc.cluster.directory.AbstractDirectory#list
public List<Invoker<T>> list(Invocation invocation) throws RpcException {if (destroyed) {throw new RpcException("Directory of type " + this.getClass().getSimpleName() + " already destroyed for service " + getConsumerUrl().getServiceKey() + " from registry " + getUrl());}BitList<Invoker<T>> availableInvokers;SingleRouterChain<T> singleChain = null;try {try {if (routerChain != null) {routerChain.getLock().readLock().lock();}// use clone to avoid being modified at doList().if (invokersInitialized) {availableInvokers = validInvokers.clone();} else {availableInvokers = invokers.clone();}// 获取路由规则链if (routerChain != null) {singleChain = routerChain.getSingleChain(getConsumerUrl(), availableInvokers, invocation);singleChain.getLock().readLock().lock();}} finally {if (routerChain != null) {routerChain.getLock().readLock().unlock();}}// 根据路由规则信息和invoker列表,获取经过路由规则筛选后的服务提供者列表List<Invoker<T>> routedResult = doList(singleChain, availableInvokers, invocation);if (routedResult.isEmpty()) {// 2-2 - No provider available.logger.warn(CLUSTER_NO_VALID_PROVIDER, "provider server or registry center crashed", "","No provider available after connectivity filter for the service " + getConsumerUrl().getServiceKey()+ " All routed invokers' size: " + routedResult.size()+ " from registry " + this+ " on the consumer " + NetUtils.getLocalHost()+ " using the dubbo version " + Version.getVersion() + ".");}return Collections.unmodifiableList(routedResult);} finally {if (singleChain != null) {singleChain.getLock().readLock().unlock();}}
}
org.apache.dubbo.registry.integration.DynamicDirectory#doList
public List<Invoker<T>> doList(SingleRouterChain<T> singleRouterChain,BitList<Invoker<T>> invokers, Invocation invocation) {if (forbidden && shouldFailFast) {// 1. No service provider 2. Service providers are disabledthrow new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +this + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() +", please check status of providers(disabled, not registered or in blacklist).");}if (multiGroup) {return this.getInvokers();}try {// 执行路由策略,获取服务提供者列表// Get invokers from cache, only runtime routers will be executed.List<Invoker<T>> result = singleRouterChain.route(getConsumerUrl(), invokers, invocation);return result == null ? BitList.emptyList() : result;} catch (Throwable t) {// 2-1 - Failed to execute routing.logger.error(CLUSTER_FAILED_SITE_SELECTION, "", "","Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);return BitList.emptyList();}
}
org.apache.dubbo.rpc.cluster.SingleRouterChain#route
public List<Invoker<T>> route(URL url, BitList<Invoker<T>> availableInvokers, Invocation invocation) {if (invokers.getOriginList() != availableInvokers.getOriginList()) {logger.error(INTERNAL_ERROR, "", "Router's invoker size: " + invokers.getOriginList().size() +" Invocation's invoker size: " + availableInvokers.getOriginList().size(),"Reject to route, because the invokers has changed.");throw new IllegalStateException("reject to route, because the invokers has changed.");}if (RpcContext.getServiceContext().isNeedPrintRouterSnapshot()) {return routeAndPrint(url, availableInvokers, invocation);} else {return simpleRoute(url, availableInvokers, invocation);}
}public List<Invoker<T>> simpleRoute(URL url, BitList<Invoker<T>> availableInvokers, Invocation invocation) {BitList<Invoker<T>> resultInvokers = availableInvokers.clone();// 1. route state routerresultInvokers = headStateRouter.route(resultInvokers, url, invocation, false, null);if (resultInvokers.isEmpty() && (shouldFailFast || routers.isEmpty())) {printRouterSnapshot(url, availableInvokers, invocation);return BitList.emptyList();}if (routers.isEmpty()) {return resultInvokers;}List<Invoker<T>> commonRouterResult = resultInvokers.cloneToArrayList();// 2. route common routerfor (Router router : routers) {// Copy resultInvokers to a arrayList. BitList not supportRouterResult<Invoker<T>> routeResult = router.route(commonRouterResult, url, invocation, false);commonRouterResult = routeResult.getResult();if (CollectionUtils.isEmpty(commonRouterResult) && shouldFailFast) {printRouterSnapshot(url, availableInvokers, invocation);return BitList.emptyList();}// stop continue routingif (!routeResult.isNeedContinueRoute()) {return commonRouterResult;}}if (commonRouterResult.isEmpty()) {printRouterSnapshot(url, availableInvokers, invocation);return BitList.emptyList();}return commonRouterResult;
}