本文基于 Dubbo 2.6.1 版本,望知悉。
1. 概述
从本文开始,我们来分享 Dubbo 的集群容错功能的实现。
在 《精尽 Dubbo 源码分析 —— 项目结构一览》 的 「3.4 dubbo-cluster」 中,我们对 Dubbo 的 dubbo-cluster
项目,做了整体的代码结构做了介绍。如果已经没什么印象的胖友,请先回过头找回失散的记忆。
Dubbo 对集群容错功能,实现了很好的 package
拆分,因此我们按照如下顺序:
- 抽象 API
- Cluster 实现
- Directory 实现
- LoadBalance 实现
- Merger 实现
- Router 实现
- Configurator 实现
一个主题,对应一篇文章。那么,本文当然是分享抽象 API。考虑到干巴巴的看抽象 API 会很容易一脸懵逼,所以我们会使用 FailoverCluster 贯穿本文。
2. 整体流程
- 🙂 只看红线。
- 左边 invoke :通过 Cluster 暴露 Invoker 对象,从而实现统一、透明的调用过程。
- 无法理解?详细解析,见 「3. Cluster」 。
- 右边 list :通过 Directory 中,获取可调用的 Invoker 集合。
- 右边 route :通过 Router ,过滤符合路由规则的 Invoker 集合。
- 右边 select :通过 LoadBalance ,根据负载均衡机制,选择一个符合的 Invoker 对象。
- 右边 invoke :调用该 Invoker 对象。
3. Cluster
com.alibaba.dubbo.rpc.cluster.Cluster
,集群接口。代码如下:
(FailoverCluster.NAME) |
@SPI(FailoverCluster.NAME)
注解,Dubbo SPI 拓展点,默认为"failover"
,即失败重试,也就是会贯穿本文的 FailoverCluster 类。@Adaptive
注解,基于 Dubbo SPI Adaptive 机制,加载对应的 Cluster 实现,使用URL.cluster
属性。#join(Directory<T>)
接口方法,基于 Directory ,创建 Invoker 对象,实现统一、透明的 Invoker 调用过程。
3.1 join 方法
在 RegistryProtocol 的 #doRefer(Cluster, Registry, type, url)
方法中,会调用 Cluster#join(directory)
方法,创建 Invoker 对象。代码如下:
private Cluster cluster; // <1> |
<1>
:cluster
属性,Cluster$Adaptive
对象<2>
:创建 RegistryDirectory 对象。通过它,可以注册到一个注册中心的所有服务提供者,即上文提到的【右边 list】。<3>
:调用Cluster#join(directory)
方法,创建 Invoker 对象。因为cluster
是 Dubbo SPI Adaptive 类,所以可以自动获取到对应的 Cluster 实现类。
3.2 子类
我们可以看到,每个 Cluster 实现类,对应一个专属于其的 Invoker 实现类。本文分享的 FailoverCluster 的对应的 Invoker 为 FailoverClusterInvoker 。在看具体的代码之前,先一起来看看集群容错的调用( invoke )过程。
4. 调用顺序图
如下是服务消费者的调用顺序图:
- 在 InvokerInvocationHandler 的 【4】
#invoke(invocation)
处插入:先调用集群容错 Invoker 的#invoke(invocation)
,再调用ProtocolFilterWrapper$Invoker
的#invoke(invocation)
。 - 调用栈如下图:
调用栈
- MockClusterInvoker ,胖友先无视,后续有详细文章,进行分享。
5. FailoverCluster
com.alibaba.dubbo.rpc.cluster.support.FailoverCluster
,实现 Cluster 接口,失败自动切换,当出现失败,重试其它服务器。通常用于读操作,但重试会带来更长延迟。可通过 retries="2"
来设置重试次数(不含第一次)。代码如下:
public class FailoverCluster implements Cluster { |
- 对应 Invoker 为 FailoverClusterInvoker 。
6. AbstractClusterInvoker
因为,FailoverClusterInvoker 继承 AbstractClusterInvoker ,所以我们来分享它。
com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker
,实现 Invoker 接口,Cluster Invoker 抽象类:
- 实现例如选择一个符合 Invoker 对象等等公用方法
定义
#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
抽象方法,实现子 Cluster 的 Invoker 实现类的服务调用的差异逻辑,代码如下:protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException;
6.1 构造方法
/** |
directory
字段,Directory 对象。通过它,可以获得所有服务提供者的 Invoker 对象。availablecheck
字段,集群时是否排除非可用( available )的 Invoker ,默认为"true"
,通过"cluster.availablecheck"
配置项设置。destroyed
字段,是否已经销毁。若已经销毁,则不允许在调用。stickyInvoker
字段,粘滞连接 Invoker ,参见 《Dubbo 用户指南 —— 粘滞连接
》 文档。粘滞连接用于有状态服务,尽可能让客户端总是向同一提供者发起调用,除非该提供者挂了,再连另一台。
6.2 list
#list(Invocation)
方法,获得所有服务提供者 Invoker 集合。代码如下:
protected List<Invoker<T>> list(Invocation invocation) throws RpcException { |
6.3 select
#select(LoadBalance, Invocation, invokers, selected)
方法,从候选的 Invoker 集合,选择一个最终调用的 Invoker 对象。代码如下:
/** |
- 该方法主要处理粘滞连接的特性,具体使用 Loadbalance 选择 Invoker 对象的逻辑,在
#doselect(loadbalance, invocation, invokers, selected)
方法中。 - 第 5 至 22 行:获得粘滞连接
stickyInvoker
对象。- 第 6 至 7 行:获得方法级的
sticky
配置项。 - 第 9 至 13 行:若
stickyInvoker
不存在于invokers
中,说明不在候选中,需要置空,重新选择。 - 第 14 至 21 行:获得粘滞连接
stickyInvoker
对象。如要满足如下条件:- 第 16 行:1)开启粘滞连接的特性;2)
stickyInvoker
不存在于selected
中。 - 第 18 行:若开启排除非可用的 Invoker 的特性,则校验
stickyInvoker
是否可用。
- 第 16 行:1)开启粘滞连接的特性;2)
- 第 6 至 7 行:获得方法级的
- 第 25 行:调用
#doselect(loadbalance, invocation, invokers, selected)
方法,执行选择一个 Invoker 对象。 - 第 27 至 30 行:若开启粘滞连接的特性,记录最终选择的 Invoker 对象,到
stickyInvoker
中。
6.3.1 doselect
#doselect(loadbalance, invocation, invokers, selected)
方法,从候选的 Invoker 集合,选择一个最终调用的 Invoker 对象。代码如下:
1: private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { |
- 有五种选择最终调用的 Invoker 对象的方式。
- 【第一种】第 5 至 8 行:如果只有一个候选的 Invoker 对象,直接选择返回。😈 因为没的选择了。
【第二种】第 9 至 13 行:如果只有两个候选的 Invoker 集合,退化为轮询。此处存在一个 BUG :
转载自我飞哥,《dubbo 源码 - 负载均衡》
这里退化成轮询的实现有问题,对应源码
return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
如果retries=4,即最多调用5次,且两个可选invoke分别为:10.0.0.1:20884,10.0.0.1:20886;
那么5次选择的invoke为:
- 10.0.0.1:20884
- 10.0.0.1:20886
- 10.0.0.1:20886
- 10.0.0.1:20886
- 10.0.0.1:20886,
即除了第1次外后面的选择都是选择第二个invoker;
因次需要把selected.get(0)修改为:selected.get(selected.size()-1);
即每次拿前一次选择的invoker与 invokers.get(0)比较,如果相同,则选则另一个invoker;否则就选 invokers.get(0);
【第三种】第 16 至 21 行:调用
Loadbalance#select(invokers, url, invocation)
方法,使用 Loadbalance ,选择一个 Invoker 对象。具体的代码实现,见 Loadbalance 的文章。- 这种方式的返回,选择的 Invoker 对象,需要满足两个条件:1)不存在于
selected
中。2)Invoker 是可用的,若开启排除非可用的 Invoker 的特性。
- 这种方式的返回,选择的 Invoker 对象,需要满足两个条件:1)不存在于
- 【第四种】调用
#reselect(loadbalance, invocation, invokers, selected, availablecheck)
方法,重新选择一个 Invoker 对象。😈 因为此时invokers
中,无法找到一个满足条件的 Invoker 对象。详细解析,见 「6.3.2 reselect」 。 - 【第五种】顺序从候选的
invokers
集合中,选择一个 Invoker 对象,不考虑是否可用,又或者已经选择过,类似【第一种】【第二种】的方式。😈总之,保证能获取到一个 Invoker 对象。
6.3.2 reselect
#reselect(loadbalance, invocation, invokers, selected, availablecheck)
方法,重新选择一个 Invoker 对象。代码如下:
1: private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException { |
- 第 4 行:预先创建一个重选 Invoker 集合,我们会发现很奇怪的一段
invokers.size() - 1
代码。这是为什么呢?笔者的理解是,出现重选#reselect(...)
的原因,说明#doselect(...)
的【第三种】选择的 Invoker 对象,在selected
中,因此需要去掉一个。 - 一共有两类三种的选择方式:
- 【第一种】第 10 至 16 行:获得非选择过(
invokers
), 并且必须可用的 Invoker 集合。 - 【第二种】第 22 至 27 行:获得非选择过(
invokers
), 并且不考虑可用的 Invoker 集合。 - 【第三种】第 36 至 44 行:获得选择过(
selected
),并且必须可用的 Invoker 集合。
- 【第一种】第 10 至 16 行:获得非选择过(
- 第 19 行 || 第 30 行 || 第 47 行:调用
Loadbalance#select(invokers, url, invocation)
方法,使用 Loadbalance ,选择一个 Invoker 对象。
6.4 invoke
#invoke(invocation)
方法,调用服务提供者的逻辑。代码如下:
1: |
第 4 行:调用
#checkWhetherDestroyed()
方法,校验是否已经销毁。代码如下:protected void checkWhetherDestroyed() {
if (destroyed.get()) {
throw new RpcException("Rpc cluster invoker for " + getInterface() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion()
+ " is now destroyed! Can not invoke any more.");
}
}第 7 行:调用
#list(invocation)
方法,基于 Directory ,获得所有服务提供者 Invoker 集合。- 第 9 至 16 行:获得 Loadbalance 对象。
- 第 19 行:调用
RpcUtils#attachInvocationIdIfAsync(url, invocation)
方法,设置调用编号,若是异步调用。 - 第 22 行:调用
#doInvoke(invocation, invokers, loadbalance)
抽象方法,执行调用。🙂 子 Cluster 的 Invoker 实现类的服务调用的差异逻辑。
6.5 其它实现方法
6.5.1 getInterface
|
6.5.2 getUrl
|
6.5.3 isAvailable
|
6.5.4 checkInvokers
protected void checkInvokers(List<Invoker<T>> invokers, Invocation invocation) { |
6.5.5 destroy
|
7. FailoverClusterInvoker
com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker
,实现 AbstractClusterInvoker 抽象类,FailoverCluster Invoker 实现类。
失败自动切换,当出现失败,重试其它服务器。通常用于读操作,但重试会带来更长延迟。可通过 retries="2"
来设置重试次数(不含第一次)。
在看具体的 #doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
的实现代码之前,我们先来瞅瞅调用顺序图:
- 实际逻辑很简单:循环,查找一个 Invoker 对象,进行调用,直到成功。
#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
方法,代码如下:
1: |
- 第 3 行:
copyinvokers
变量,候选的 Invoker 集合。 - 第 5 行:调用父
#checkInvokers(copyinvokers, invocation)
方法,校验候选的 Invoker 集合非空。如果为空,抛出 RpcException 异常。 - 第 6 至 10 行:获得最大可调用次数:最大可重试次数 +1 。默认最大可重试次数
Constants.DEFAULT_RETRIES = 2
。 - 第 12 行:
le
变量,保存最后一次调用的异常。 - 第 14 行:
invoked
变量,保存已经调用的 Invoker 集合。 - 第 15 行:
providers
变量,保存已经调用的网络地址集合。 - 第 16 至 62 行:failover 机制核心实现:如果出现调用失败,那么重试其他服务器。
- 第 20 至 27 行:重试时(
i > 0
), 进行重新选择,避免重试时,候选 Invoker 集合,已发生变化。 - 【重要】第 29 行:调用父
#select(loadbalance, invocation, copyinvokers, invoked)
方法,根据 Loadbalance 负载均衡机制,从copyinvokers
中,选择一个被调用的 Invoker 对象。 - 第 31 行:保存每次调用的 Invoker 对象,到
invoked
中。 - 第 33 行:保存已经调用的 Invoker 集合,到 Context 中。
- 【重要】第 36 行:调用
Invoker#invoke(invocation)
方法,发起 RPC 调用。 - 第 37 至 48 行:若
le
非空,说明此时是重试调用成功,将最后一次调用的异常信息以 warn 级别日志输出,方便未来追溯。 - ========== 异常相关 ===========
- 第 55 至 54 行:如果是业务性质的异常,不再重试,直接抛出。
- 第 56 行:保存异常到
le
。 - 第 58 行:非 RpcException 异常,封装成 RpcException 异常。
- 第 59 至 61 行:保存每次调用的网络地址,到
providers
中。
- 第 20 至 27 行:重试时(
- 第 63 至 71 行:超过最大调用次数,抛出 RpcException 异常。该异常中,带有最后一次调用异常的信息。
666. 彩蛋
刚开始看集群容错,真的是一脸懵逼。
感谢我飞哥。