本文基于 Dubbo 2.6.1 版本,望知悉。
1. 概述
本文分享 dubbo://
协议的远程调用的第二部分:同步调用。
在 dubbo://
协议的调用,一共分成三种:
- sync 同步调用
- async 异步调用
- oneway 单向调用
前两种比较好理解,都是基于 Request Response 模型,差异点在异步调用,服务消费者不阻塞等待结果,而是通过回调的方式,处理服务提供者返回的结果。
最后一种,基于 Message 模型,发起调用,而不关注等待和关注执行结果。
因此,从性能上:oneway > async > sync 。
友情提示:本文会分享 sync 和 oneway 两种方式。
2. 顺序图
-
- 此图是在
injvm://
协议的顺序图的基础上修改:- InjvmInvoker 替换成 ExchangeServer 。例如在 Netty4 中,IO Worker 解析请求,转发给 ExchangeHandler 处理。
- InjvmProtocol 替换成 DubboProtocol 。在该类中,实现了自定义的 ExchangeHandler 处理请求。注意,在 Dubbo ThreadPool 中处理请求,参见 《Dubbo 用户指南 —— 线程模型》 文档。
- 此图是在
3. 消费者调用服务
调用 DubboInvoker#invoke(Invocation)
方法,调用服务。代码如下:
/** |
第 5 行:调用
RpcUtils#getMethodName()
方法,获得方法名。代码如下:public static String getMethodName(Invocation invocation) {
// 泛化调用,第一个参数为方法名
if (Constants.$INVOKE.equals(invocation.getMethodName())
&& invocation.getArguments() != null
&& invocation.getArguments().length > 0
&& invocation.getArguments()[0] instanceof String) {
return (String) invocation.getArguments()[0];
}
// 普通调用,直接获得
return invocation.getMethodName();
}第 6 至 8 行:获得
path
( 服务名 )、version
。- 第 10 至 16 行:顺序,获得 ExchangeClient 对象。
第 20 行:调用
RpcUtils#isAsync(url, invocation)
方法,判断是否异步调用。代码如下:public static boolean isAsync(URL url, Invocation inv) {
return Boolean.TRUE.toString().equals(inv.getAttachment(Constants.ASYNC_KEY)) // RpcContext#asyncCall(Callable) 方法,可以设置
|| url.getMethodParameter(getMethodName(inv), Constants.ASYNC_KEY, false);
}- 获得是否异步。服务引用或方法,任一配置
async = true
,即为异步。
- 获得是否异步。服务引用或方法,任一配置
第 22 行:调用
RpcUtils#isOneway(url, invocation)
方法,判断是否异步调用。代码如下:public static boolean isOneway(URL url, Invocation inv) {
return Boolean.FALSE.toString().equals(inv.getAttachment(Constants.RETURN_KEY)) // RpcContext#asyncCall(Runnable) 方法,可以设置
|| !url.getMethodParameter(getMethodName(inv), Constants.RETURN_KEY, true);
}- 获得是否单向。方法配置
return = true
,即为单向。
- 获得是否单向。方法配置
第 24 行:调用
URL#getMethodParameter(method, key, defaultValue)
方法,获得远程调用超时时间,单位:毫秒。- 第 25 至 30 行:oneway 单向调用。
- 第 28 行:注意,调用的是
ExchangeClient#send(invocation, sent)
方法,发送消息,而不是请求。 - 第 29 行:设置
RpcContext.future = null
,无需 FutureFilter ,异步回调。 - 第 30 行:创建 RpcResult 对象,空返回。
- 第 28 行:注意,调用的是
- 第 31 至 35 行:async 异步调用。
- 第 33 行:调用
ExchangeClient#request(invocation, timeout)
方法,发送请求。 - 第 34 行:调用
RpcContext#setFuture(future)
方法,在 FutureFitler 中,异步回调。 - 第 35 行:创建 RpcResult 对象,空返回。
- 第 33 行:调用
- 第 36 至 40 行:sync 同步调用。
- 第 38 行:设置
RpcContext.future = null
,无需 FutureFilter ,异步回调。 - 第 39 行:调用
ExchangeClient#request(invocation, timeout)
方法,发送请求。 - 第 39 行:调用
ResponseFuture#get()
方法,阻塞等待,返回结果。
- 第 38 行:设置
4. 提供者提供服务
在 DubboProtocol 类中,实现了自己的 ExchangeHandler 对象,处理请求、消息、连接、断开连接等事件。对于服务消费者的远程调用,通过 #reply(ExchangeChannel channel, Object message)
和 #reply(Channel channel, Object message)
方法来处理。如下图所示:ExchangeHandler
下面,我们来看看每个方法的实现代码。
4.1 reply
1: |
- 用于处理服务消费者的同步调用和异步调用的请求。
第 6 行:调用
#getInvoker(channel, invocation)
方法,获得请求对应的 Invoker 对象。代码如下:/**
* Exporter 集合
*
* key: 服务键 {@link #serviceKey(URL)} 或 {@link URL#getServiceKey()} 。
* 不同协议会不同
*/
protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>(); // FROM 父类 AbstractProtocol.java
1: Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
2: boolean isCallBackServiceInvoke;
3: boolean isStubServiceInvoke;
4: int port = channel.getLocalAddress().getPort();
5: String path = inv.getAttachments().get(Constants.PATH_KEY);
6: // TODO 【8033 参数回调】
7: // if it's callback service on client side
8: isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY));
9: if (isStubServiceInvoke) {
10: port = channel.getRemoteAddress().getPort();
11: }
12: // 如果是参数回调,获得真正的服务名 `path` 。
13: // callback
14: isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
15: if (isCallBackServiceInvoke) {
16: path = inv.getAttachments().get(Constants.PATH_KEY) + "." + inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);
17: inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
18: }
19: // 获得服务建
20: String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
21: // 获得 Exporter 对象
22: DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
23: // 获得 Invoker 对象
24: if (exporter == null) {
25: throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
26: }
27: return exporter.getInvoker();
28: }- 第 6 至 11 行:TODO 【8033 参数回调】
- 第 12 至 18 行:如果是参数回调,获得真正的服务名
path
。在参数回调一文中,我们详细解析。 第 20 行:调用
#serviceKey(port, path, version)
方法,获得服务键。代码如下:protected static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {
return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup);
}第 22 行:从
exporterMap
集合中,获得 Exporter 对象。- 第 23 至 27 行:获得 Invoker 对象。
第 8 至 27 行:如果是参数回调,校验服务消费者实际存在对应的回调方法,通过方法名判断。
- 第 29 行:设置调用方的地址。
- 第 31 行:调用
Invoker#invoke(invocation)
方法,执行调用,并返回结果。后续的逻辑,和injvm://
协议是一致的。
4.2 received
|
- 用于处理服务消费者的单次调用的消息,通过判断消息类型是不是 Invocation 。
4.3 connected && disconnected
本小节和 Dubbo RPC 无关系,只是为了完整分享 DubboProtocol ExchangeHandler 的完整代码实现。
在服务提供者上,有 "onconnect"
和 "ondisconnect"
配置项,在服务提供者连接或断开连接时,调用 Service 对应的方法。目前这个配置项,在 Dubbo 文档里,暂未提及。当然,这个在实际场景下,基本没用过。
|
调用
#invoke(channel, methodKey)
方法,执行对应的方法。代码如下:/**
* 调用方法
*
* @param channel 通道
* @param methodKey 方法名
*/
private void invoke(Channel channel, String methodKey) {
// 创建 Invocation 对象
Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
// 调用 received 方法,执行对应的方法
if (invocation != null) {
try {
this.received(channel, invocation);
} catch (Throwable t) {
logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
}
}
}
private Invocation createInvocation(Channel channel, URL url, String methodKey) {
String method = url.getParameter(methodKey);
if (method == null || method.length() == 0) {
return null;
}
RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
invocation.setAttachment(Constants.PATH_KEY, url.getPath());
invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
}
return invocation;
}
666. 彩蛋
抽丝剥茧,像拨洋葱一样,一层一层一层,泪流满面。哈哈哈
清明节,扫代码第二波。