本文基于 Dubbo 2.6.1 版本,望知悉。
1. 概述
本文接 《精尽 Dubbo 源码解析 —— 集群容错(一)之抽象 API》 一文,分享 dubbo-cluster
模块, support
包,各种 Cluster 实现类。
Cluster 子类如下图:
Cluster 子类
我们可以看到,每个 Cluster 实现类,对应一个专属于其的 Invoker 实现类。下面,我们一个一个子类往下看。
老艿艿:本文对应 《Dubbo 用户指南 —— 集群容错》 文档。
2. AvailableCluster
com.alibaba.dubbo.rpc.cluster.support.AvailableCluster
,实现 Cluster 接口,调用首个可用服务器,目前用于多注册中心引用。代码如下:
public class AvailableCluster implements Cluster {
public static final String NAME = "available";
public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new AvailableClusterInvoker<T>(directory); }
}
|
- 对应 Invoker 实现类为 AvailableClusterInvoker 。
2.1 AvailableClusterInvoker
com.alibaba.dubbo.rpc.cluster.support.AvailableClusterInvoker
,实现 AbstractClusterInvoker 抽象类,AvailableCluster Invoker 实现类。代码如下:
public class AvailableClusterInvoker<T> extends AbstractClusterInvoker<T> {
public AvailableClusterInvoker(Directory<T> directory) { super(directory); }
@Override public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { for (Invoker<T> invoker : invokers) { if (invoker.isAvailable()) { return invoker.invoke(invocation); } } throw new RpcException("No provider available in " + invokers); }
}
|
3. BroadcastCluster
com.alibaba.dubbo.rpc.cluster.support.BroadcastCluster
,实现 Cluster 接口,广播调用所有提供者,逐个调用,任意一台报错则报错。通常用于通知所有提供者更新缓存或日志等本地资源信息。代码如下:
public class BroadcastCluster implements Cluster {
@Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new BroadcastClusterInvoker<T>(directory); }
}
|
- 对应 Invoker 实现类为 BroadcastClusterInvoker 。
3.1 BroadcastClusterInvoker
com.alibaba.dubbo.rpc.cluster.support.BroadcastClusterInvoker
,实现 AbstractClusterInvoker 抽象类,BroadcastCluster Invoker 实现类。代码如下:
public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);
public BroadcastClusterInvoker(Directory<T> directory) { super(directory); }
@Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); RpcContext.getContext().setInvokers((List) invokers); RpcException exception = null; Result result = null; for (Invoker<T> invoker : invokers) { try { result = invoker.invoke(invocation); } catch (RpcException e) { exception = e; logger.warn(e.getMessage(), e); } catch (Throwable e) { exception = new RpcException(e.getMessage(), e); logger.warn(e.getMessage(), e); } } if (exception != null) { throw exception; } return result; }
}
|
4. FailbackCluster
com.alibaba.dubbo.rpc.cluster.support.FailbackCluster
,实现 Cluster 接口,失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。代码如下:
public class FailbackCluster implements Cluster {
public final static String NAME = "failback";
@Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new FailbackClusterInvoker<T>(directory); }
}
|
- 对应 Invoker 实现类为 FailbackClusterInvoker 。
4.1 FailbackClusterInvoker
com.alibaba.dubbo.rpc.cluster.support.FailbackClusterInvoker
,实现 AbstractClusterInvoker 抽象类,FailbackCluster Invoker 实现类。
4.1.1 构造方法
private static final long RETRY_FAILED_PERIOD = 5 * 1000;
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new NamedThreadFactory("failback-cluster-timer", true));
private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>();
private volatile ScheduledFuture<?> retryFuture;
public FailbackClusterInvoker(Directory<T> directory) { super(directory); }
|
4.1.2 doInvoke
@Override protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null); return invoker.invoke(invocation); } catch (Throwable e) { logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", ", e); addFailed(invocation, this); return new RpcResult(); } }
|
- 若 RPC 调用失败,则调用
#addFailed(invocation, this)
方法,添加到 failed
中,后台定时重试。
4.1.3 addFailed
private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) { if (retryFuture == null) { synchronized (this) { if (retryFuture == null) { retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
public void run() { try { retryFailed(); } catch (Throwable t) { logger.error("Unexpected error occur at collect statistic", t); } } }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS); } } } failed.put(invocation, router); }
|
- 创建的定时任务,会调用
#retryFailed()
方法,重试任务,发起 RCP 调用。
4.1.4 retryFailed
void retryFailed() { if (failed.size() == 0) { return; } for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(failed).entrySet()) { Invocation invocation = entry.getKey(); Invoker<?> invoker = entry.getValue(); try { invoker.invoke(invocation); failed.remove(invocation); } catch (Throwable e) { logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e); } } }
|
- 循环重试任务,逐个发起 RPC 调用。若调用成功,移除该失败任务出
failed
集合。
在极端情况下,存在一个 BUG ,复现步骤如下:
- 假设目前有两个服务提供者 A、B 。
- 首先调用 A 服务,假设超时,添加到
failed
中。
- 重试调用 B 服务(A 服务亦可),假设再次超时,添加到
failed
中。
- 因为
#doInvoker(...)
方法,调用失败,不会抛出异常(当然也不能),导致 #retryFailed(...)
方法,误以为调用成功,错误的移除该失败任务出 failed
集合。
那么能不能在 #retryFailed(...)
方法中,先移除该失败任务出 failed
集合呢,再发起 PRC 调用呢?答案是不可以,因为在调用 #doInvoke(...)
方法之前,可能发生异常,导致失败任务的丢失。
那么该怎么办?有两种方式:
- 上述方案的基础上,在
#retryFailed(...)
方法的移除处理中,增加调用 #addFailed(...)
方法。
- 枚举一个
FAILED_RESULT
对象,让 #doInvoke(...)
方法发生异常时,返回该对象。这样 #retryFailed(...)
方法,在移除出 failed
集合时,增加下是否执行成功的判断。
笔者倾向第二种,逻辑更加线性和易懂。
5. FailfastCluster
com.alibaba.dubbo.rpc.cluster.support.FailfastCluster
,实现 Cluster 接口,快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。代码如下:
public class FailfastCluster implements Cluster {
public final static String NAME = "failfast";
@Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new FailfastClusterInvoker<T>(directory); }
}
|
- 对应 Invoker 实现类为 FailfastClusterInvoker 。
5.1 FailfastInvoker
com.alibaba.dubbo.rpc.cluster.support.FailbackClusterInvoker
,实现 AbstractClusterInvoker 抽象类,Failfast Invoker 实现类。代码如下:
public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {
public FailfastClusterInvoker(Directory<T> directory) { super(directory); }
@Override public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null); try { return invoker.invoke(invocation); } catch (Throwable e) { if (e instanceof RpcException && ((RpcException) e).isBiz()) { throw (RpcException) e; } throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } }
}
|
- 和 FailbackClusterInvoker 差异点,在于对异常的处理。
6. FailsafeCluster
com.alibaba.dubbo.rpc.cluster.support.FailsafeCluster
,实现 Cluster 接口,失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。
代码如下:
public class FailfastCluster implements Cluster {
public final static String NAME = "failfast";
@Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new FailfastClusterInvoker<T>(directory); }
}
|
- 对应 Invoker 实现类为 FailsafeClusterInvoker 。
6.1 FailsafeClusterInvoker
com.alibaba.dubbo.rpc.cluster.support.FailsafeClusterInvoker
,实现 AbstractClusterInvoker 抽象类,Failsafe Invoker 实现类。代码如下:
public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);
public FailsafeClusterInvoker(Directory<T> directory) { super(directory); }
@Override public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null); return invoker.invoke(invocation); } catch (Throwable e) { logger.error("Failsafe ignore exception: " + e.getMessage(), e); return new RpcResult(); } }
}
|
- 和 FailfastInvoker 差异点,在于对异常的处理。
7. ForkingCluster
com.alibaba.dubbo.rpc.cluster.support.ForkingCluster
,实现 Cluster 接口,并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks="2"
来设置最大并行数。
代码如下:
public class ForkingCluster implements Cluster {
public final static String NAME = "forking";
@Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new ForkingClusterInvoker<T>(directory); }
}
|
7.1 ForkingClusterInvoker
老艿艿:BlockQueue 的使用,非常精髓!
com.alibaba.dubbo.rpc.cluster.support.ForkingClusterInvoker
,实现 AbstractClusterInvoker 抽象类,ForkingCluster Invoker 实现类。代码如下:
1: public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> { 2: 3:
6: private final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("forking-cluster-timer", true)); 7: 8: public ForkingClusterInvoker(Directory<T> directory) { 9: super(directory); 10: } 11: 12: @Override 13: @SuppressWarnings({"unchecked", "rawtypes"}) 14: public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { 15: 16: checkInvokers(invokers, invocation); 17: 18: final List<Invoker<T>> selected; 19: 20: final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS); 21: 22: final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); 23: 24: if (forks <= 0 || forks >= invokers.size()) { 25: selected = invokers; 26: } else { 27: 28: 29: selected = new ArrayList<Invoker<T>>(); 30: for (int i = 0; i < forks; i++) { 31: 32: Invoker<T> invoker = select(loadbalance, invocation, invokers, selected); 33: if (!selected.contains(invoker)) { 34: selected.add(invoker); 35: } 36: } 37: } 38: 39: RpcContext.getContext().setInvokers((List) selected); 40: 41: final AtomicInteger count = new AtomicInteger(); 42: 43: final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>(); 44: 45: for (final Invoker<T> invoker : selected) { 46: executor.execute(new Runnable() { 47: public void run() { 48: try { 49: 50: Result result = invoker.invoke(invocation); 51: 52: ref.offer(result); 53: } catch (Throwable e) { 54: 55: int value = count.incrementAndGet(); 56: 57: if (value >= selected.size()) { 58: ref.offer(e); 59: } 60: } 61: } 62: }); 63: } 64: try { 65: 66: Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS); 67: 68: if (ret instanceof Throwable) { 69: Throwable e = (Throwable) ret; 70: throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); 71: } 72: 73: return (Result) ret; 74: } catch (InterruptedException e) { 75: throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e); 76: } 77: } 78: 79: }
|
- 第 15 至 39 行:胖友自己看代码注释,比较易懂。
- 第 41 行:
count
变量,异常计数器。
- 第 43 行:
ref
变量,阻塞队列。通过它,实现线程池异步执行任务的结果通知,非常亮眼。
- 第 44 至 63 行:循环
selected
集合,提交线程池,发起 RPC 调用。
- 第 49 至 52 行:调用
Invoker#invoke(invocation)
方法,RPC 调用,成功获得 Result 结果,并将 Result 添加到 ref
阻塞队列中。
- 第 53 至 59 行:若调用失败,异常计数器
count
加一。当所有的 RPC 调用都完成,并且都是异常时,则添加最后一个异常到 ref
阻塞队列。🙂 细节处理很到位。
- 第 66 行:从
ref
队列中,阻塞等待,直到获得到结果或者超时。至此,ForkingClusterInvoker 实现了并行调用,且只要一个成功即返回。当然,还有一个隐性的,所有都失败才返回。
- 第 67 至 76 行:处理等待的“结果”。
8. FailoverCluster
FailoverCluster ,在 《精尽 Dubbo 源码解析 —— 集群容错(一)之抽象 API》 一文中,我们已经详细解析。
9. MergeableCluster
MergeableCluster ,对应 《Dubbo 用户指南 —— 分组聚合》 文档,我们后续单独写文章分享。
10. MockClusterWrapper
MockClusterWrapper ,对应 《Dubbo 用户指南 ——本地伪装 》 文档,我们后续单独写文章分享。
666. 彩蛋
知识星球
比想象中的简单一些,花的比较多的时间在 FailbackClusterInvoker ,考虑是否存在 BUG 。
比较亮眼的是 ForkingClusterInvoker ,萌萌的 BlockQueue 。