本文基于 Dubbo 2.6.1 版本,望知悉。
1. 概述 本文接 《精尽 Dubbo 源码解析 —— 集群容错(二)之 Cluster 实现》 一文,分享 dubbo-cluster
模块, directory
包,各种 Directory 实现类 。
Directory ,中文直译为目录 ,代表了多个 Invoker ,可以把它看成 List<Invoker>
。但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更。
Directory 子类如下图:
Directory 子类
我们看到有两个实现类:
StaticDirectory ,静态 Directory 实现类,从命名上看出它是静态 的 List<Invoker>
。
RegistryDirectory ,基于注册中心 的动态 Directory 实现类,从命名上看出它是动态 的,会根据注册中心的推送变更 List<Invoker>
。
2. Directory com.alibaba.dubbo.rpc.cluster.Directory
,继承 Node 接口,Directory 接口。代码如下:
public interface Directory <T > extends Node { Class<T> getInterface () ; List<Invoker<T>> list(Invocation invocation) throws RpcException; }
定义了两个 接口方法,分别返回服务的类型 和 Invoker 集合 。
一个 Directory 只对应 一个服务类型。
3. AbstractDirectory com.alibaba.dubbo.rpc.cluster.directory.AbstractDirectory
,实现 Directory 接口,Directory 抽象实现类,实现了公用的路由规则( Router ) 的逻辑。
3.1 构造方法 private volatile boolean destroyed = false ;private final URL url;private volatile URL consumerUrl;private volatile List<Router> routers;public AbstractDirectory (URL url) { this (url, null ); } public AbstractDirectory (URL url, List<Router> routers) { this (url, url, routers); } public AbstractDirectory (URL url, URL consumerUrl, List<Router> routers) { if (url == null ) { throw new IllegalArgumentException("url == null" ); } this .url = url; this .consumerUrl = consumerUrl; setRouters(routers); }
consumerUrl
字段,认真看下注释和构造方法。
调用 #setRouters(routers)
方法,初始化并设置 Router 数组。
3.2 setRouters #setRouters(routers)
方法,初始化并设置 Router 数组。详细解析,见 《精尽 Dubbo 源码解析 —— 集群容错(七)之 Router 实现》 中。
3.3 list #list(Invocation)
实现 方法,获得所有服务 Invoker 集合。代码如下:
1 : @Override 2 : public List<Invoker<T>> list(Invocation invocation) throws RpcException { 3 : if (destroyed) { 4 : throw new RpcException("Directory already destroyed .url: " + getUrl()); 5 : } 6 : 7 : List<Invoker<T>> invokers = doList(invocation); 8 : 9 : List<Router> localRouters = this .routers; 10 : if (localRouters != null && !localRouters.isEmpty()) {11 : for (Router router : localRouters) {12 : try {13 : if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false )) {14 : invokers = router.route(invokers, getConsumerUrl(), invocation);15 : }16 : } catch (Throwable t) {17 : logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);18 : }19 : }20 : }21 : return invokers;22 : }
4. RegistryDirectory com.alibaba.dubbo.registry.integration.RegistryDirectory
,实现 NotifyListener 接口,实现 AbstractDirectory 抽象类,基于注册中心 的 Directory 实现类。
RegistryDirectory 在 dubbo-registry
模块,integration
包下,是 Dubbo 注册中心模块集成 Directory 的实现类。
RegistryDirectory 作为一个 NotifyListener ,订阅 注册中心( Registry ) 的数据,实现对变更的监听 。
4.1 构造方法
RegistryDirectory 的字段有 17 个,比较多,所以胖友请耐心。
private static final Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();private static final RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();private static final ConfiguratorFactory configuratorFactory = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getAdaptiveExtension();private final Class<T> serviceType; private final Map<String, String> queryMap; private final String[] serviceMethods;private final boolean multiGroup;private Protocol protocol; private Registry registry; private final String serviceKey; private volatile boolean forbidden = false ;private final URL directoryUrl; private volatile URL overrideDirectoryUrl; private volatile List<Configurator> configurators; private volatile Map<String, Invoker<T>> urlInvokerMap; private volatile Map<String, List<Invoker<T>>> methodInvokerMap; private volatile Set<URL> cachedInvokerUrls; public RegistryDirectory (Class<T> serviceType, URL url) { super (url); if (serviceType == null ) { throw new IllegalArgumentException("service type is null." ); } if (url.getServiceKey() == null || url.getServiceKey().length() == 0 ) { throw new IllegalArgumentException("registry serviceKey is null." ); } this .serviceType = serviceType; this .serviceKey = url.getServiceKey(); this .queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); this .overrideDirectoryUrl = this .directoryUrl = url.setPath(url.getServiceInterface()).clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY); String group = directoryUrl.getParameter(Constants.GROUP_KEY, "" ); this .multiGroup = group != null && ("*" .equals(group) || group.contains("," )); String methods = queryMap.get(Constants.METHODS_KEY); this .serviceMethods = methods == null ? null : Constants.COMMA_SPLIT_PATTERN.split(methods); }
分成五类 变量。胖友自己看注释。
如果不理解,可以结合下面的具体方法的使用。🙂 当然也可以给我留言,因为确实变量有点多和复杂。
4.2 subscribe #subscribe(URL)
方法,向注册中心 发起订阅。代码如下:
public void subscribe (URL url) { setConsumerUrl(url); registry.subscribe(url, this ); }
调用父 #setConsumerUrl(url)
方法,设置 consumerUrl
消费者 URL 。
调用 Registry#subscribe(url, NotifyListener)
方法,向注册中心,发起订阅。
服务消费者,再引用服务时,会创建 RegistryDirectory 对象,并发起1)服务提供者 + 2)路由规则 + 3)配置规则 的数据订阅。如下图:
doRefer
对应为 RegistryProtocol#doRefer(Cluster, Registry, Class<T> type, URL url)
方法。
4.3 notify 在注册中心( Registry )发现数据发生变化时,会通知对应的 NotifyListener 们。如下图:
notify
对应为 AbstractRegistry#notify(URL url, NotifyListener, List<URL> urls)
方法。
因为 RegistryDirectory 作为一个 NotifyListener ,向注册中心( Registry )发起了订阅,所以此时会被通知。注意,是按照分类循环通知的 ,也就是说,一次只有一类 URL 。
#notify(List<URL> urls)
实现 方法,代码如下:
1 : @Override 2 : public synchronized void notify (List<URL> urls) { 3 : 4 : List<URL> invokerUrls = new ArrayList<URL>(); 5 : List<URL> routerUrls = new ArrayList<URL>(); 6 : List<URL> configuratorUrls = new ArrayList<URL>(); 7 : for (URL url : urls) { 8 : String protocol = url.getProtocol(); 9 : String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); 10 : if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) {11 : routerUrls.add(url);12 : } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {13 : configuratorUrls.add(url);14 : } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {15 : invokerUrls.add(url);16 : } else {17 : logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());18 : }19 : }20 : 21 : 22 : if (!configuratorUrls.isEmpty()) {23 : this .configurators = toConfigurators(configuratorUrls);24 : }25 : 26 : 27 : if (!routerUrls.isEmpty()) {28 : List<Router> routers = toRouters(routerUrls);29 : if (routers != null ) { 30 : setRouters(routers);31 : }32 : }33 : 34 : List<Configurator> localConfigurators = this .configurators; 35 : 36 : this .overrideDirectoryUrl = directoryUrl;37 : if (localConfigurators != null && !localConfigurators.isEmpty()) {38 : for (Configurator configurator : localConfigurators) {39 : this .overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);40 : }41 : }42 : 43 : refreshInvoker(invokerUrls);44 : }
4.3.1 toConfigurators 详细解析,见 《精尽 Dubbo 源码解析 —— 集群容错(六)之 Configurator 实现》 的 「4.1.1 toConfigurators」 。
4.3.2 toRouters 详细解析,见 《精尽 Dubbo 源码解析 —— 集群容错(七)之 Router 实现》 。
4.7 内部类 4.7.1 InvokerDelegate InvokerDelegate ,实现 com.alibaba.dubbo.rpc.protocol.InvokerWrapper
类,Invoker 代理类,主要用于存储注册中心下发的 url 地址 ( providerUrl
),用于重新重新 refer 时能够根据 providerURL
queryMap overrideMap 重新组装。 代码如下:
老艿艿:目前貌似没看到这块逻辑噢 😯😯😯
private static class InvokerDelegate <T > extends InvokerWrapper <T > { private URL providerUrl; public InvokerDelegate (Invoker<T> invoker, URL url, URL providerUrl) { super (invoker, url); this .providerUrl = providerUrl; } public URL getProviderUrl () { return providerUrl; } }
4.7.2 InvokerComparator InvokerComparator ,实现 Comparator 接口,Invoker 排序器实现类,根据 URL 升序 。代码如下:
private static class InvokerComparator implements Comparator <Invoker <?>> { private static final InvokerComparator comparator = new InvokerComparator(); private InvokerComparator () { } public static InvokerComparator getComparator () { return comparator; } @Override public int compare (Invoker<?> o1, Invoker<?> o2) { return o1.getUrl().toString().compareTo(o2.getUrl().toString()); } }
4.3.3 refreshInvoker #refreshInvoker(List<URL> invokerUrls)
方法,官方注释其如下:
根据 invokerURL 列表转换为 invoker 列表。转换规则如下:
如果 url 已经被转换为 invoker ,则不在重新引用,直接从缓存中获取,注意如果 url 中任何一个参数变更也会重新引用
如果传入的 invoker 列表不为空,则表示最新的 invoker 列表
如果传入的 invokerUrl 列表是空,则表示只是下发的 override 规则或 route 规则,需要重新交叉对比,决定是否需要重新引用。
1 : private void refreshInvoker (List<URL> invokerUrls) { 2 : if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0 ) != null 3 : && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0 ).getProtocol())) { 4 : 5 : this .forbidden = true ; 6 : 7 : this .methodInvokerMap = null ; 8 : 9 : destroyAllInvokers(); 10 : } else {11 : 12 : this .forbidden = false ; 13 : 14 : Map<String, Invoker<T>> oldUrlInvokerMap = this .urlInvokerMap; 15 : 16 : if (invokerUrls.isEmpty() && this .cachedInvokerUrls != null ) {17 : invokerUrls.addAll(this .cachedInvokerUrls);18 : 19 : } else {20 : this .cachedInvokerUrls = new HashSet<URL>();21 : this .cachedInvokerUrls.addAll(invokerUrls); 22 : }23 : 24 : if (invokerUrls.isEmpty()) {25 : return ;26 : }27 : 28 : Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);29 : 30 : Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); 31 : 32 : 33 : if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ) {34 : logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));35 : return ;36 : }37 : 38 : this .methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;39 : this .urlInvokerMap = newUrlInvokerMap;40 : 41 : try {42 : destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); 43 : } catch (Exception e) {44 : logger.warn("destroyUnusedInvokers error. " , e);45 : }46 : }47 : }
========== 第一部分 ==========
第 2 至 3 行:当 invokerUrls
集合大小为 1 ,并且协议为 empty://
,说明所有服务提供者都已经下线 。若注册中心为 Zookeeper ,可参见 ZookeeperRegistry#toUrlsWithEmpty(URL consumer, String path, List<String> providers)
方法。
第 5 行:设置禁止 访问,因为没有服务提供者了。
第 7 行:methodInvokerMap
置空。
第 9 行:调用 #destroyAllInvokers()
方法,销毁所有服务提供者 Invoker 集合。详细解析,见 「4.3.3.5 destroyAllInvokers」 。
========== 第二部分 ==========
第 12 行:设置允许 访问,因为有服务提供者了。
第 15 至 17 行:传入的 invokerUrls
为空,说明是路由规则或配置规则发生改变 ,此时 invokerUrls
是空的,直接使用 cachedInvokerUrls
。对应官方注释【第 3 点】(部分,不包括“需要重新交叉对比,决定是否需要重新引用”)。
第 18 至 22 行:传入的 invokerUrls
非空,更新 cachedInvokerUrls
。考虑到并发的问题,更新的方式为创建新的 HashSet 。对应官方注释【第 2 点】。
为什么【第 15 至 17 行】不需要更新 呢?因为 invokerUrls
为空,直接使用 cachedInvokerUrls
,相当于进行了“更新”。
第 23 至 26 行:忽略,若无 invokerUrls
。出现情况为,初始是按照 configurators => routers => providers
,所以前两个会出现这个情况。关于这一点,胖友可以调试感受下。
第 28 行:调用 #toInvokers(List<URL> urls)
方法,将传入的 invokerUrls
,转换成新的 urlInvokerMap
。详细解析,见 「4.3.3.1 toInvokers」 。
第 30 行:调用 #toMethodInvokers(newUrlInvokerMap)
方法,将 urlInvokerMap
转成与方法的映射关系,即新的 methodInvokerMap
。详细解析,见 「4.3.3.2 toMethodInvokers」 。
第 31 至 36 行:如果计算错误,则不进行处理。一般来说,是防御性编程。
第 38 行:若服务引用多 group ,则调用 #toMergeMethodInvokerMap(newMethodInvokerMap)
方法,按照 method + group 聚合 Invoker 集合。详细解析,见 「4.3.3.3 toMethodInvokers」 。
第 39 行:赋值 urlInvokerMap
属性。
第 40 至 45 行:调用 #destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap)
方法,销毁 不再使用的 Invoker 集合。详细解析,见 「4.3.3.4 toMethodInvokers」 。
4.3.3.1 toInvokers #toInvokers(List<URL> urls)
方法,
1 : private Map<String, Invoker<T>> toInvokers(List<URL> urls) { 2 : 3 : Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>(); 4 : 5 : if (urls == null || urls.isEmpty()) { 6 : return newUrlInvokerMap; 7 : } 8 : 9 : Set<String> keys = new HashSet<String>(); 10 : 11 : String queryProtocols = this .queryMap.get(Constants.PROTOCOL_KEY);12 : 13 : for (URL providerUrl : urls) {14 : 15 : 16 : if (queryProtocols != null && queryProtocols.length() > 0 ) {17 : boolean accept = false ;18 : String[] acceptProtocols = queryProtocols.split("," ); 19 : for (String acceptProtocol : acceptProtocols) {20 : if (providerUrl.getProtocol().equals(acceptProtocol)) {21 : accept = true ;22 : break ;23 : }24 : }25 : if (!accept) {26 : continue ;27 : }28 : }29 : 30 : if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {31 : continue ;32 : }33 : 34 : if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {35 : logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()36 : + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));37 : continue ;38 : }39 : 40 : URL url = mergeUrl(providerUrl);41 : 42 : String key = url.toFullString(); 43 : if (keys.contains(key)) { 44 : continue ;45 : }46 : 47 : keys.add(key);48 : 49 : 50 : Map<String, Invoker<T>> localUrlInvokerMap = this .urlInvokerMap; 51 : Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);52 : if (invoker == null ) { 53 : try {54 : 55 : boolean enabled;56 : if (url.hasParameter(Constants.DISABLED_KEY)) {57 : enabled = !url.getParameter(Constants.DISABLED_KEY, false );58 : } else {59 : enabled = url.getParameter(Constants.ENABLED_KEY, true );60 : }61 : 62 : if (enabled) {63 : 64 : invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);65 : }66 : } catch (Throwable t) {67 : logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);68 : }69 : 70 : if (invoker != null ) { 71 : newUrlInvokerMap.put(key, invoker);72 : }73 : } else { 74 : newUrlInvokerMap.put(key, invoker);75 : }76 : }77 : 78 : keys.clear();79 : return newUrlInvokerMap;80 : }
第 3 行:newUrlInvokerMap
变量,新的 urlInvokerMap
字段,后面会赋值给它。
第 4 至 7 行:若 urls
为空,直接返回,防御性编程。
第 9 行:keys
变量,已初始化 的服务器提供 URL 集合,即服务提供者 URL 已经处理。
第 11 行:获得引用服务的协议。一般情况下,我们不会设置 <dubbo:reference protocol=""/>
配置项。
第 13 行:循环 urls
集合,转成 Invoker 集合。
协议处理相关
第 14 至 28 行:如果 reference 端配置了 protocol ,则只选择匹配 的 protocol 。
第 29 至 32 行:忽略 ,若为 empty://
协议。
第 33 至 38 行:忽略 ,若应用程序不支持该协议。
第 40 行:调用 #mergeUrl(providerUrl)
方法,合并 URL 参数。详细解析,见 「4.3.3.1 mergeUrl」 。
第 41 至 47 行:忽略 ,通过 keys
判断已经初始化。
第 48 至 75 行:“创建”服务 Invoker 对象。
第 50 至 51 行:获得 url
对应在 localUrlInvokerMap
缓存的 Invoker 对象。
第 52 至 72 行:不在缓存中,需要重新 refer 引用,创建 Invoker 对象。
第 54 至 60 行:通过配置项 enable
和 disable
判断,服务是否开启。
第 61 至 65 行: 若开启,创建 Invoker 对象。
第 73 至 75 行:在缓存中,直接使用缓存的 Invoker 对象,添加到 newUrlInvokerMap
中。
第 78 行:清空 keys
。
第 79 行:返回结果 newUrlInvokerMap
。
4.3.3.1.1 mergeUrl #mergeUrl(providerUrl)
方法,合并 URL 参数,优先级 为配置规则 > 服务消费者配置 > 服务提供者配置。代码如下:
1 : private URL mergeUrl (URL providerUrl) { 2 : 3 : providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); 4 : 5 : 6 : List<Configurator> localConfigurators = this .configurators; 7 : if (localConfigurators != null && !localConfigurators.isEmpty()) { 8 : for (Configurator configurator : localConfigurators) { 9 : providerUrl = configurator.configure(providerUrl); 10 : }11 : }12 : 13 : 14 : providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false )); 15 : 16 : 17 : 18 : this .overrideDirectoryUrl = this .overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters()); 19 : 20 : 21 : if ((providerUrl.getPath() == null || providerUrl.getPath().length() == 0 )22 : && "dubbo" .equals(providerUrl.getProtocol())) { 23 : 24 : String path = directoryUrl.getParameter(Constants.INTERFACE_KEY);25 : if (path != null ) {26 : int i = path.indexOf('/' );27 : if (i >= 0 ) {28 : path = path.substring(i + 1 );29 : }30 : i = path.lastIndexOf(':' );31 : if (i >= 0 ) {32 : path = path.substring(0 , i);33 : }34 : providerUrl = providerUrl.setPath(path);35 : }36 : }37 : 38 : 39 : return providerUrl;40 : }
4.3.3.2 toMethodInvokers #toMethodInvokers(Map<String, Invoker<T>> invokersMap)
方法,将 invokersMap
转成与方法 的映射关系。代码如下:
1 : private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) { 2 : 3 : Map<String, List<Invoker<T>>> newMethodInvokerMap = new HashMap<String, List<Invoker<T>>>(); 4 : 5 : List<Invoker<T>> invokersList = new ArrayList<Invoker<T>>(); 6 : 7 : 8 : if (invokersMap != null && invokersMap.size() > 0 ) { 9 : 10 : for (Invoker<T> invoker : invokersMap.values()) {11 : String parameter = invoker.getUrl().getParameter(Constants.METHODS_KEY); 12 : if (parameter != null && parameter.length() > 0 ) {13 : String[] methods = Constants.COMMA_SPLIT_PATTERN.split(parameter);14 : if (methods != null && methods.length > 0 ) {15 : 16 : for (String method : methods) {17 : if (method != null && method.length() > 0 && !Constants.ANY_VALUE.equals(method)) { 18 : List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);19 : if (methodInvokers == null ) {20 : methodInvokers = new ArrayList<Invoker<T>>();21 : newMethodInvokerMap.put(method, methodInvokers);22 : }23 : methodInvokers.add(invoker);24 : }25 : }26 : }27 : }28 : 29 : invokersList.add(invoker);30 : }31 : }32 : 33 : List<Invoker<T>> newInvokersList = route(invokersList, null );34 : 35 : newMethodInvokerMap.put(Constants.ANY_VALUE, newInvokersList);36 : 37 : if (serviceMethods != null && serviceMethods.length > 0 ) {38 : for (String method : serviceMethods) {39 : List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);40 : if (methodInvokers == null || methodInvokers.isEmpty()) {41 : methodInvokers = newInvokersList;42 : }43 : newMethodInvokerMap.put(method, route(methodInvokers, method));44 : }45 : }46 : 47 : 48 : for (String method : new HashSet<String>(newMethodInvokerMap.keySet())) {49 : List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);50 : Collections.sort(methodInvokers, InvokerComparator.getComparator());51 : newMethodInvokerMap.put(method, Collections.unmodifiableList(methodInvokers));52 : }53 : return Collections.unmodifiableMap(newMethodInvokerMap);54 : }
第 3 行:newMethodInvokerMap
变量,新的 methodInvokerMap
字段,后面会赋值给它。
第 5 行:创建 Invoker 集合。在【第 29 行】,我们可以看到,实际就是 invokersMap
的值的集合。
第 8 至 31 行:按照方法名为维度 ( KEY ) ,聚合对应的 Invoker 集合 到 newMethodInvokerMap
中。
第 33 行:路由全 invokersList
,匹配合适的 Invoker 集合。详细解析,见 《精尽 Dubbo 源码解析 —— 集群容错(七)之 Router 实现》 。
第 35 行:添加 newInvokersList
到 newMethodInvokerMap
中,表示该服务提供者的全量 Invoker 集合。
第 36 至 45 行:循环 ,基于每个方法路由,匹配合适的 Invoker 集合。详细解析,见 《精尽 Dubbo 源码解析 —— 集群容错(七)之 Router 实现》 。
第 46 至 53 行:循环排序 每个方法的 Invoker 集合,并设置为不可变 。
4.3.3.3 toMergeMethodInvokerMap #toMergeMethodInvokerMap(Map<String, List<Invoker<T>>> methodMap)
,按照 method + group 聚合 Invoker 集合。代码如下:
1 : private Map<String, List<Invoker<T>>> toMergeMethodInvokerMap(Map<String, List<Invoker<T>>> methodMap) { 2 : Map<String, List<Invoker<T>>> result = new HashMap<String, List<Invoker<T>>>(); 3 : 4 : for (Map.Entry<String, List<Invoker<T>>> entry : methodMap.entrySet()) { 5 : String method = entry.getKey(); 6 : List<Invoker<T>> invokers = entry.getValue(); 7 : 8 : Map<String, List<Invoker<T>>> groupMap = new HashMap<String, List<Invoker<T>>>(); 9 : 10 : for (Invoker<T> invoker : invokers) {11 : String group = invoker.getUrl().getParameter(Constants.GROUP_KEY, "" );12 : List<Invoker<T>> groupInvokers = groupMap.get(group);13 : if (groupInvokers == null ) {14 : groupInvokers = new ArrayList<Invoker<T>>();15 : groupMap.put(group, groupInvokers);16 : }17 : groupInvokers.add(invoker);18 : }19 : 20 : if (groupMap.size() == 1 ) {21 : result.put(method, groupMap.values().iterator().next());22 : 23 : } else if (groupMap.size() > 1 ) {24 : List<Invoker<T>> groupInvokers = new ArrayList<Invoker<T>>();25 : for (List<Invoker<T>> groupList : groupMap.values()) {26 : groupInvokers.add(cluster.join(new StaticDirectory<T>(groupList)));27 : }28 : result.put(method, groupInvokers);29 : 30 : } else {31 : result.put(method, invokers);32 : }33 : }34 : return result;35 : }
第 2 行:result
属性,新的 methodInvokerMap
字段,后面会赋值给它。
第 3 终 33 行:循环 ,按照 method + group 聚合 Invoker 集合。
第 8 行: 按照 Group 聚合 Invoker 集合的结果。其中,KEY :group ,VALUE :Invoker 集合。
第 9 至 18 行:循环 Invoker 集合,按照 group 聚合 Invoker 集合。
========== 结果 groupMap
处理 ==========
第 19 至 21 行:若数量为 1 ,使用第一个。
第 29 至 32 行:若数量为 0 ,使用原有值 invokers
。实际上,和【第 19 至 21 行】等价 。
第 22 至 28 行:若数量大于 1 ,循环每个 Group 的 Invoker 集合,调用 Cluster$Adaptive#join(Directory)
方法,创建对应的 Cluster Invoker 对象。
那么,引用多个服务分组有什么用呢?为什么要按照 group 进行聚合,直接调用不可以么?让我们来打开 ProtocolRegistry#refer(Class<T> type, URL url)
方法,如下图所示:
refer
当引用多个服务分组时,会自动 使用到分组聚合 的特性。那么之后 MergeableCluster 会怎么做呢?详细解析,见后文 😈。
4.3.3.4 destroyUnusedInvokers #destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap)
方法,销毁 不再使用的 Invoker 集合。代码如下:
private void destroyUnusedInvokers (Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) { if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ) { destroyAllInvokers(); return ; } List<String> deleted = null ; if (oldUrlInvokerMap != null ) { Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values(); for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) { if (!newInvokers.contains(entry.getValue())) { if (deleted == null ) { deleted = new ArrayList<String>(); } deleted.add(entry.getKey()); } } } if (deleted != null ) { for (String url : deleted) { if (url != null ) { Invoker<T> invoker = oldUrlInvokerMap.remove(url); if (invoker != null ) { try { invoker.destroy(); if (logger.isDebugEnabled()) { logger.debug("destroy invoker[" + invoker.getUrl() + "] success. " ); } } catch (Exception e) { logger.warn("destroy invoker[" + invoker.getUrl() + "] failed. " + e.getMessage(), e); } } } } } }
4.3.3.5 destroyAllInvokers #destroyAllInvokers()
方法,销毁所有服务提供者 Invoker 。代码如下:
private void destroyAllInvokers () { Map<String, Invoker<T>> localUrlInvokerMap = this .urlInvokerMap; if (localUrlInvokerMap != null ) { for (Invoker<T> invoker : new ArrayList<Invoker<T>>(localUrlInvokerMap.values())) { try { invoker.destroy(); } catch (Throwable t) { logger.warn("Failed to destroy service " + serviceKey + " to provider " + invoker.getUrl(), t); } } localUrlInvokerMap.clear(); } methodInvokerMap = null ; }
4.4 doList #doList(Invocation)
实现 方法,获得对应的 Invoker 集合。代码如下:
1 : @Override 2 : public List<Invoker<T>> doList(Invocation invocation) { 3 : if (forbidden) { 4 : 5 : throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, 6 : "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost() 7 : + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist)." ); 8 : } 9 : List<Invoker<T>> invokers = null ; 10 : Map<String, List<Invoker<T>>> localMethodInvokerMap = this .methodInvokerMap; 11 : 12 : if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0 ) {13 : 14 : String methodName = RpcUtils.getMethodName(invocation);15 : Object[] args = RpcUtils.getArguments(invocation);16 : 17 : if (args != null && args.length > 0 && args[0 ] != null 18 : && (args[0 ] instanceof String || args[0 ].getClass().isEnum())) {19 : 20 : invokers = localMethodInvokerMap.get(methodName + args[0 ]); 21 : }22 : 23 : if (invokers == null ) {24 : invokers = localMethodInvokerMap.get(methodName);25 : }26 : 27 : if (invokers == null ) {28 : invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);29 : }30 : 31 : if (invokers == null ) {32 : Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();33 : if (iterator.hasNext()) {34 : invokers = iterator.next();35 : }36 : }37 : }38 : return invokers == null ? new ArrayList<Invoker<T>>(0 ) : invokers;39 : }
4.5 isAvailable @Override public boolean isAvailable () { if (isDestroyed()) { return false ; } Map<String, Invoker<T>> localUrlInvokerMap = urlInvokerMap; if (localUrlInvokerMap != null && localUrlInvokerMap.size() > 0 ) { for (Invoker<T> invoker : new ArrayList<Invoker<T>>(localUrlInvokerMap.values())) { if (invoker.isAvailable()) { return true ; } } } return false ; }
4.6 destroy @Override public void destroy () { if (isDestroyed()) { return ; } try { if (getConsumerUrl() != null && registry != null && registry.isAvailable()) { registry.unsubscribe(getConsumerUrl(), this ); } } catch (Throwable t) { logger.warn("unexpeced error when unsubscribe service " + serviceKey + "from registry" + registry.getUrl(), t); } super .destroy(); try { destroyAllInvokers(); } catch (Throwable t) { logger.warn("Failed to destroy service " + serviceKey, t); } }
5. StaticDirectory com.alibaba.dubbo.rpc.cluster.directory.StaticDirectory
,实现 AbstractDirectory 抽象类,静态 Directory 实现类。逻辑比较简单,将传入的 invokers
集合,封装成静态的 Directory 对象。代码如下:
public class StaticDirectory <T > extends AbstractDirectory <T > { private final List<Invoker<T>> invokers; public StaticDirectory (List<Invoker<T>> invokers) { this (null , invokers, null ); } public StaticDirectory (List<Invoker<T>> invokers, List<Router> routers) { this (null , invokers, routers); } public StaticDirectory (URL url, List<Invoker<T>> invokers) { this (url, invokers, null ); } public StaticDirectory (URL url, List<Invoker<T>> invokers, List<Router> routers) { super (url == null && invokers != null && !invokers.isEmpty() ? invokers.get(0 ).getUrl() : url, routers); if (invokers == null || invokers.isEmpty()) { throw new IllegalArgumentException("invokers == null" ); } this .invokers = invokers; } @Override public Class<T> getInterface () { return invokers.get(0 ).getInterface(); } @Override public boolean isAvailable () { if (isDestroyed()) { return false ; } for (Invoker<T> invoker : invokers) { if (invoker.isAvailable()) { return true ; } } return false ; } @Override public void destroy () { if (isDestroyed()) { return ; } super .destroy(); for (Invoker<T> invoker : invokers) { invoker.destroy(); } invokers.clear(); } @Override protected List<Invoker<T>> doList(Invocation invocation) throws RpcException { return invokers; } }
除了在 「4.3.3.3 toMergeMethodInvokerMap」 方法中,使用到了 StaticDirectory 对象。我们来看看 ReferenceConfig#createProxy(Map<String, String> map)
的使用,代码如下图:
createProxy
6. ClusterUtils com.alibaba.dubbo.rpc.cluster.support.ClusterUtils
,Cluster 工具类。代码如下:
1 : public class ClusterUtils { 2 : 3 : private ClusterUtils () { 4 : } 5 : 6 : public static URL mergeUrl (URL remoteUrl, Map<String, String> localMap) { 7 : 8 : Map<String, String> map = new HashMap<String, String>(); 9 : 10 : Map<String, String> remoteMap = remoteUrl.getParameters();11 :12 : 13 : if (remoteMap != null && remoteMap.size() > 0 ) {14 : map.putAll(remoteMap);15 :16 : 17 : map.remove(Constants.THREAD_NAME_KEY);18 : map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.THREAD_NAME_KEY);19 :20 : map.remove(Constants.THREADPOOL_KEY);21 : map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.THREADPOOL_KEY);22 :23 : map.remove(Constants.CORE_THREADS_KEY);24 : map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.CORE_THREADS_KEY);25 :26 : map.remove(Constants.THREADS_KEY);27 : map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.THREADS_KEY);28 :29 : map.remove(Constants.QUEUES_KEY);30 : map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.QUEUES_KEY);31 :32 : map.remove(Constants.ALIVE_KEY);33 : map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.ALIVE_KEY);34 :35 : map.remove(Constants.TRANSPORTER_KEY);36 : map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.TRANSPORTER_KEY);37 : }38 : 39 : if (localMap != null && localMap.size() > 0 ) {40 : map.putAll(localMap);41 : }42 :43 : 44 : if (remoteMap != null && remoteMap.size() > 0 ) {45 : 46 : String dubbo = remoteMap.get(Constants.DUBBO_VERSION_KEY);47 : if (dubbo != null && dubbo.length() > 0 ) {48 : map.put(Constants.DUBBO_VERSION_KEY, dubbo);49 : }50 : String version = remoteMap.get(Constants.VERSION_KEY);51 : if (version != null && version.length() > 0 ) {52 : map.put(Constants.VERSION_KEY, version);53 : }54 : String group = remoteMap.get(Constants.GROUP_KEY);55 : if (group != null && group.length() > 0 ) {56 : map.put(Constants.GROUP_KEY, group);57 : }58 : String methods = remoteMap.get(Constants.METHODS_KEY);59 : if (methods != null && methods.length() > 0 ) {60 : map.put(Constants.METHODS_KEY, methods);61 : }62 : 63 : String remoteTimestamp = remoteMap.get(Constants.TIMESTAMP_KEY);64 : if (remoteTimestamp != null && remoteTimestamp.length() > 0 ) {65 : map.put(Constants.REMOTE_TIMESTAMP_KEY, remoteMap.get(Constants.TIMESTAMP_KEY));66 : }67 : 68 : String remoteFilter = remoteMap.get(Constants.REFERENCE_FILTER_KEY);69 : String localFilter = localMap.get(Constants.REFERENCE_FILTER_KEY);70 : if (remoteFilter != null && remoteFilter.length() > 0 71 : && localFilter != null && localFilter.length() > 0 ) {72 : localMap.put(Constants.REFERENCE_FILTER_KEY, remoteFilter + "," + localFilter);73 : }74 : String remoteListener = remoteMap.get(Constants.INVOKER_LISTENER_KEY);75 : String localListener = localMap.get(Constants.INVOKER_LISTENER_KEY);76 : if (remoteListener != null && remoteListener.length() > 0 77 : && localListener != null && localListener.length() > 0 ) {78 : localMap.put(Constants.INVOKER_LISTENER_KEY, remoteListener + "," + localListener);79 : }80 : }81 :82 : 83 : return remoteUrl.clearParameters().addParameters(map);84 : }85 :86 : }
将 localMap
和 remoteUrl.parameters
合并 成 map
,大多数以前者 为主【第 12 至 41 行】,部分指定 以后者为主【第 43 至 80 行】。
将合并的 map
的结果,覆盖 设置到 remoteUrl
中。
666. 彩蛋 知识星球
比想象中,长好多的一篇博客,原本预期会短蛮多的。
顺便吐槽下,中间碰到一些困惑,网络上搜了一圈,都没解释到很多细节的点的源码解析文章,真的是。哎~~~