本文基于 Dubbo 2.6.1 版本,望知悉。
1. 概述
本文接 《精尽 Dubbo 源码分析 —— NIO 服务器(三)之 Telnet 层》 一文,分享 dubbo-remoting-api
模块, exchange
包,信息交换层。
exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer。
在一次 RPC 调用,每个请求( Request ),是关注对应的响应( Response )。那么 transport 层 提供的网络传输 功能,是无法满足 RPC 的诉求的。因此,exchange 层,在其 Message 之上,构造了Request-Response 的模型。
实现上,也非常简单,将 Message 分成 Request 和 Response 两种类型,并增加编号属性,将 Request 和 Response 能够一一映射。
实际上,RPC 调用,会有更多特性的需求:1)异步处理返回结果;2)内置事件;3)等等。因此,Request 和 Response 上会有类似编号的系统字段。
一条消息,我们分成两段:
- 协议头( Header ) : 系统字段,例如编号等。
- 内容( Body ) :具体请求的参数和响应的结果等。
胖友在看下面这张图,是否就亲切多了 🙂 :
所以,exchange
包,很多的代码,是在 Header 的处理。OK ,下面我们来看下这个包的类图:
- 白色部分,为通用接口和
transport
包下的类。 - 蓝色部分,为
exchange
包下的类。
在 《精尽 Dubbo 源码分析 —— NIO 服务器(二)之 Transport 层》 中,我们提到,装饰器设计模式,是 dubbo-remoting
项目,最核心的实现方式,所以,exchange
其实是在 transport
上的装饰,提供给 dubbo-rpc
项目使用。
下面,我们来看具体代码实现。
2. ExchangeChannel
com.alibaba.dubbo.remoting.exchange.ExchangeChannel
,继承 Channel 接口,信息交换通道接口。方法如下:
// 发送请求 |
2.1 HeaderExchangeChannel
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel
,实现 ExchangeChannel 接口,基于消息头部( Header )的信息交换通道实现类。
2.1.1 构造方法
private static final String CHANNEL_KEY = HeaderExchangeChannel.class.getName() + ".CHANNEL"; |
channel
属性,通道。HeaderExchangeChannel 是传入channel
属性的装饰器,每个实现的方法,都会调用channel
。如下是该属性的一个例子:`channel`
#getOrAddChannel(Channel)
静态方法,创建 HeaderExchangeChannel 对象。代码如下:static HeaderExchangeChannel getOrAddChannel(Channel ch) {
if (ch == null) {
return null;
}
HeaderExchangeChannel ret = (HeaderExchangeChannel) ch.getAttribute(CHANNEL_KEY);
if (ret == null) {
ret = new HeaderExchangeChannel(ch);
if (ch.isConnected()) { // 已连接
ch.setAttribute(CHANNEL_KEY, ret);
}
}
return ret;
}- 传入的
ch
属性,实际就是HeaderExchangeChanel.channel
属性。 - 通过
ch.attribute
的CHANNEL_KEY
键值,保证有且仅有为ch
属性,创建唯一的 HeaderExchangeChannel 对象。 - 要求已连接。
- 传入的
#removeChannelIfDisconnected(ch)
静态方法,移除 HeaderExchangeChannel 对象。代码如下:static void removeChannelIfDisconnected(Channel ch) {
if (ch != null && !ch.isConnected()) { // 未连接
ch.removeAttribute(CHANNEL_KEY);
}
}
2.1.2 发送请求
1: |
- 第 3 至 5 行:若已经关闭,不再允许发起新的请求。
- 第 6 至 10 行:创建 Request 对象。其中,
twoWay = true
需要响应;data = request
具体数据。 - 第 12 行:创建 DefaultFuture 对象。
- 第 13 至 15 行:调用
Channel#send(req)
方法,发送请求。 - 第 16 至 19 行:发生 RemotingException 异常,调用
DefaultFuture#cancel()
方法,取消。 - 第 21 行:返回 DefaultFuture 对象。从代码的形式上来说,有点类似线程池提交任务,返回 Future 对象。🙂 看到 DefaultFuture 的具体代码,我们就会更加理解了。
2.1.3 优雅关闭
1: |
- 第 3 至 6 行:标记
closed = true
,避免发起新的请求。 - 第 7 至 17 行:调用
DefaultFuture#hasFuture(channel)
方法,判断已发起的已经是否已经都响应了。若否,等待完成或超时。 - 第 19 行:关闭通道。
其它方法
其它实现方法,主要是直接调用 channel
的方法,点击 传送门 查看代码。
3. ExchangeClient
com.alibaba.dubbo.remoting.exchange.ExchangeClient
,实现 Client ,ExchangeChannel 接口,信息交换客户端接口。
无自定义方法。
3.1 HeaderExchangeClient
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeClient
,实现 ExchangeClient 接口,基于消息头部( Header )的信息交换客户端实现类。
构造方法
1: /** |
client
属性,客户端。如下是该属性的一个例子:`client`
- 第 34 行:使用传入的
client
属性,创建 HeaderExchangeChannel 对象。 第 35 至 41 行:读取心跳相关配置。默认,开启心跳功能。为什么需要有心跳功能呢?
FROM 《Dubbo 用户指南 —— dubbo:protocol》
心跳间隔,对于长连接,当物理层断开时,比如拔网线,TCP的FIN消息来不及发送,对方收不到断开事件,此时需要心跳来帮助检查连接是否已断开
第 42 至 45 行:调用
#startHeatbeatTimer()
方法,发起心跳定时器。
发起心跳定时器
1: private void startHeatbeatTimer() { |
- 第 3 行:调用
#stopHeartbeatTimer()
方法,停止原有定时任务。 - 第 5 至 13 行:发起新的定时任务。
- 第 7 至 11 行:创建定时任务 HeartBeatTask 对象。具体实现见下文。
其它方法
其它实现方法,主要是直接调用 channel
或 client
的方法,点击 传送门 查看代码。
4. ExchangeServer
com.alibaba.dubbo.remoting.exchange.ExchangeServer
,继承 Server 接口,信息交换服务器接口。方法如下:
// 获得通道数组 |
4.1 HeaderExchangeServer
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeServer
,实现 ExchangeServer 接口,基于消息头部( Header )的信息交换服务器实现类。
代码实现上,和 HeaderExchangeChannel + HeaderExchangeClient 的综合。
4.1.1 构造方法
代码实现上,和 HeaderExchangeClient 的类似。
/** |
4.1.2 发起心跳定时器
代码实现上,和 HeaderExchangeClient 的类似。
private void startHeatbeatTimer() { |
- 差异,Server 持有多条 Client 连接的 Channel ,所以通过 ChannelProvider 返回的是多条。
4.1.3 重置属性
|
4.1.4 优雅关闭
代码实现上,和 HeaderExchangeChannel 的类似,且复杂一些。
1: |
- Server 关闭的过程,分成两个阶段:正在关闭和已经关闭。
第 4 行:调用
#startClose()
方法,标记正在关闭。代码如下:
public void startClose() {
server.startClose();
}
// AbstractPeer.java
public void startClose() {
if (isClosed()) {
return;
}
closing = true;
}第 8 至 11 行:发送 READONLY 事件给所有 Client ,表示 Server 不再接收新的消息,避免不断有新的消息接收到。杂实现的呢?以 DubboInvoker 举例子,
#isAvailable()
方法,代码如下:
public boolean isAvailable() {
if (!super.isAvailable())
return false;
for (ExchangeClient client : clients) {
if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)) { // 只读判断
//cannot write == not Available ?
return true;
}
}
return false;
}- 即使
client
处于连接中,但是 Server 处于正在关闭中,也算不可用,不进行发送请求( 消息 )。
- 即使
#sendChannelReadOnlyEvent()
方法,广播客户端,READONLY_EVENT 事件。代码如下:private void sendChannelReadOnlyEvent() {
// 创建 READONLY_EVENT 请求
Request request = new Request();
request.setEvent(Request.READONLY_EVENT);
request.setTwoWay(false); // 无需响应
request.setVersion(Version.getVersion());
// 发送给所有 Client
Collection<Channel> channels = getChannels();
for (Channel channel : channels) {
try {
if (channel.isConnected())
channel.send(request, getUrl().getParameter(Constants.CHANNEL_READONLYEVENT_SENT_KEY, true));
} catch (RemotingException e) {
logger.warn("send connot write messge error.", e);
}
}
}第 22 行:调用
#oClose()
方法,关闭心跳定时器。代码如下:private void doClose() {
if (!closed.compareAndSet(false, true)) {
return;
}
stopHeartbeatTimer();
try {
scheduled.shutdown();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}第 24 行:真正关闭服务器。
4.2 ExchangeServerDelegate
com.alibaba.dubbo.remoting.exchange.support.ExchangeServerDelegate
,实现 ExchangeServer 接口,信息交换服务器装饰者。在每个实现的方法里,直接调用被装饰的 server
属性的方法。
目前 dubbo-remoting-p2p
模块中,ExchangeServerPeer 会继承该类,后续再看。
5. 请求/响应模型
5.1 Request
com.alibaba.dubbo.remoting.exchange.Request
,请求。代码如下:
/** |
- 内置两种事件:
HEARTBEAT_EVENT
:心跳。因为心跳比较常用,所以在事件上时候了null
。READONLY_EVENT
:只读。上文已经解释。
mId
属性:编号。使用INVOKE_ID
属性生成,JVM 进程内唯一。生成代码如下:private static long newId() {
// getAndIncrement() When it grows to MAX_VALUE, it will grow to MIN_VALUE, and the negative can be used as ID
return INVOKE_ID.getAndIncrement();
}version
属性,版本号。目前使用 Dubbo 大版本,"2.0.0"
。mTwoWay
属性,标记请求是否响应( Response ),默认需要。mBroken
属性,是否异常的请求。在消息解析的时候,会出现。mData
属性,请求具体数据。
5.2 Response
com.alibaba.dubbo.remoting.exchange.Response
,响应。代码如下:
/** |
mId
属性,响应编号,和请求编号一致。mStatus
属性,状态。有多种状态:[状态码] (https://github.com/apache/incubator-dubbo/blob/9deadadea3b1342345fed77c87a3d24ea026d7e6/dubbo-remoting/dubbo-remoting-api/src/main/java/com/alibaba/dubbo/remoting/exchange/Response.java)。mEvent
属性,是否事件。和 Request 内置了一样的事件,但是READONLY_EVENT
并未使用。因为目前,只读事件,无需响应。mErrorMsg
属性,错误消息。mResult
属性,结果。
5.3 ResponseFuture
com.alibaba.dubbo.remoting.exchange.ResponseFuture
,响应 Future 接口。方法如下:
// 获得值 |
和 java.util.concurrent.Future
很类似。
5.3.1 ResponseCallback
com.alibaba.dubbo.remoting.exchange.ResponseCallback
,响应回调接口。方法如下:
// 处理执行完成 |
ResponseCallback 在 com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter
中有使用,后面我们会有文章来分享 FutureFilter 。
5.3.2 DefaultFuture
com.alibaba.dubbo.remoting.exchange.support.DefaultFuture
,实现 ResponseFuture 接口,默认响应 Future 实现类。同时,它也是所有 DefaultFuture 的管理容器。
构造方法
/** |
CHANNELS
静态属性,通道集合。通过#hasFuture(channel)
方法,判断通道是否有未结束的请求。代码如下:public static boolean hasFuture(Channel channel) {
return CHANNELS.containsValue(channel);
}FUTURES
静态属性,Future 集合。sent
属性,发送请求时间。因为在目前 Netty Mina 等通信框架中,发送请求一般是异步的,因此在ChannelHandler#sent(channel, message)
方法中,调用DefaultFuture#sent(channel, request)
静态方法,代码如下:public static void sent(Channel channel, Request request) {
DefaultFuture future = FUTURES.get(request.getId());
if (future != null) {
future.doSent();
}
}
private void doSent() {
sent = System.currentTimeMillis();
}callback
属性,回调,适用于异步请求。通过#setCallback(callback)
方法设置。
获得值
/** |
- 第 7 行:调用
#isDone()
方法,判断是否完成。若未完成,基于 Lock + Condition 的方式,实现等待。而等待的唤醒,通过ChannelHandler#received(channel, message)
方法,接收到请求时执行DefaultFuture#received(channel, response)
方法。🙂 下文详细解析。- 《 Java线程(九):Condition-线程通信更高效的方式》
- 《怎么理解Condition》
- 第 8 行:获得开始时间。注意,此处使用的不是
start
属性。后面我们会看到,#get(...)
方法中,使用的是重新获取开始时间;后台扫描调用超时任务,使用的是start
属性。也就是说,#get(timeout)
方法的timeout
参数,指的是从当前时刻开始的等待超时时间。当然,这不影响最终的结果,最终 Response 是什么,由是ChannelHandler#received(channel, message)
还是后台扫描调用超时任务,谁先调用DefaultFuture#received(channel, response)
方法决定。🙂 有点绕,胖友细看下。 - 第 9 行:获得锁。
- 第 11 至 17 行:等待完成或超时。
- 第 21 行:释放锁。
- 第 24 至 26 行:若未完成,抛出超时异常 TimeoutException 。
TimeoutException.phase
的阶段,由sent > 0
来决定,即 Client 是否发送给 Server 。#getTimeoutMessage(scan)
方法,获得超时异常提示信息。🙂 胖友自己看哈。
第 29 行:调用
#returnFromResponse()
方法,返回响应( Response )。代码如下:private Object returnFromResponse() throws RemotingException {
Response res = response;
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
// 正常,返回结果
if (res.getStatus() == Response.OK) {
return res.getResult();
}
// 超时,抛出 TimeoutException 异常
if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
}
// 其他,抛出 RemotingException 异常
throw new RemotingException(channel, res.getErrorMessage());
}
响应结果
1: public static void received(Channel channel, Response response) { |
- 该方法有两处被调用,如下图所示:
调用
- 第 4 行:移除
FUTURES
。 第 6 至 7 行:调用
DefaultFuture#doReceived(response)
方法,响应结果。代码如下:1: private void doReceived(Response res) {
2: // 锁定
3: lock.lock();
4: try {
5: // 设置结果
6: response = res;
7: // 通知,唤醒等待
8: if (done != null) {
9: done.signal();
10: }
11: } finally {
12: // 释放锁定
13: lock.unlock();
14: }
15: // 调用回调
16: if (callback != null) {
17: invokeCallback(callback);
18: }
19: }- 第 3 行:获得锁。
- 第 6 行:设置响应
response
。 - 第 8 至 10 行:调用
Condition#signal()
方法,通知,唤醒DefaultFuture#get(..)
方法的等待。 - 第 13 行:释放锁。
- 第 16 至 18 行:调用
#invokeCallback(callback)
方法,执行回调方法。
第 8 至 14 行:超时情况,打印告警日志。
- 第 15 至 18 行:移除
CHANNELS
。
设置回调
1: |
- 第 3 至 5 行:若已完成,调用
#invokeCallback(callback)
方法,执行回调方法。 - 第 9 行:获得锁。
- 第 12 至 13 行:若未完成,设置回调
callback
属性,等在#doReceived(response)
方法中再回调。 - 第 14 至 16 行:标记已完成。在【第 22 至 24 行】,调用
#invokeCallback(callback)
方法,执行回调方法。 - 第 18 至 20 行:释放锁。
调用回调
1: private void invokeCallback(ResponseCallback c) { |
- 和
#returnFromResponse()
方法,情况一致。 - 第 11 至 17 行:正常返回,调用
ResponseCallback#done(result)
方法,处理结果。 - 第 18 至 25 行:超时异常,调用
ResponseCallback#caught(e)
方法,处理 TimeoutException 异常。 - 第 26 至 34 行:其他异常,调用 ResponseCallback#caught(e)` 方法,处理 RuntimeException 异常。
后台扫描调用超时任务
static { |
- 🙂 代码比较简单,胖友自己看下代码和注释嘿。
代码略多,胖友自己在梳理梳理,也可以多多调试。
5.3.3 SimpleFuture
com.alibaba.dubbo.remoting.exchange.support.SimpleFuture
,实现 ResponseFuture 接口,简单的 Future 实现。
目前暂未使用。
5.4 MultiMessage
com.alibaba.dubbo.remoting.exchange.support.MultiMessage
,实现 Iterable 接口,多消息的封装。代码如下:
public final class MultiMessage implements Iterable { |
6. Handler
在文初的,我们在类图可以看到,有多种处理器,统一在本小节分享。
6.1 HeartbeatHandler
com.alibaba.dubbo.remoting.exchange.support.header.HeartbeatHandler
,实现 AbstractChannelHandlerDelegate 抽象类,心跳处理器,处理心跳事件。
旁白君,注意,它是一个 AbstractChannelHandlerDelegate !!!
代码比较简单,胖友自己查看。
6.1.1 HeartBeatTask
com.alibaba.dubbo.remoting.exchange.support.header.HeartBeatTask
,实现 Runnable 接口,心跳任务。
构造方法
private ChannelProvider channelProvider; |
channelProvider
属性,用于查询获得需要心跳的通道数组。ChannelProvider 接口,代码如下:interface ChannelProvider {
Collection<Channel> getChannels();
}
执行任务
1: |
- 【任务一】第 13 至 24 行:最后读或写的时间,任一超过心跳间隔
heartbeat
,发送心跳。 - 【任务二】第 25 至 40 行:最后读的时间,超过心跳超时时间
heartbeatTimeout
,分成两种情况:- 第 29 至 35 行:客户端侧,重连连接服务端。
- 第 36 至 39 行:服务端侧,关闭客户端连接。
6.2 HeaderExchangeHandler
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler
,实现 ChannelHandlerDelegate 接口,基于消息头部( Header )的信息交换处理器实现类。
旁白君,注意,它是一个 ChannelHandlerDelegate !!!
代码比较简单,胖友自己查看,我们挑几个比较重要的来讲讲。
接收消息
1: |
- 第 4 行:设置最后的读时间。
- 第 6 行:创建 ExchangeChannel 对象。
- 第 8 至 24 行:处理请求( Request)
- 第 13 至 14 行:调用
#handlerEvent(channel, request)
方法,处理事件请求。 - 第 17 至 19 行:调用
#handleRequest(channel, request)
方法,处理普通请求(需要响应),并将响应写回请求方。 - 第 21 至 23 行:调用
ChannelHandler#received(channel, message)
方法,处理普通请求(无需响应)。
- 第 13 至 14 行:调用
- 第 25 至 27 行:调用
#handleResponse(channel, message)
方法,处理响应。 - 第 29 至 41 行:处理 String 的情况
- 第 30 至 33 行:客户端侧,不支持 String 的情况。
- 第 34 至 40 行:服务端侧,目前仅有 telnet 命令的情况,调用
TelnetHandler#telnet(channel, message)
方法,获得 telnet 命令的结果,并响应给 telnet 客户端。在 《精尽 Dubbo 源码分析 —— NIO 服务器(三)之 Telnet 层》 有详细分享。
- 第 42 至 44 行:剩余的情况,调用
ChannelHandler#received(channel, message)
方法,处理。 - 第 45 至 48 行:移除 ExchangeChannel 对象,若已断开。
#handlerEvent(channel, request)
方法,代码如下:void handlerEvent(Channel channel, Request req) {
if (req.getData() != null && req.getData().equals(Request.READONLY_EVENT)) {
channel.setAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY, Boolean.TRUE);
}
}- 客户端接收到 READONLY_EVENT 事件请求,进行记录到通道。后续,不再向该服务器,发送新的请求。
#handleRequest(channel, request)
方法,代码如下:1: Response handleRequest(ExchangeChannel channel, Request req) {
2: Response res = new Response(req.getId(), req.getVersion());
3: // 请求无法解析,返回 BAD_REQUEST 响应
4: if (req.isBroken()) {
5: Object data = req.getData();
6: String msg; // 请求数据,转成 msg
7: if (data == null) {
8: msg = null;
9: } else if (data instanceof Throwable) {
10: msg = StringUtils.toString((Throwable) data);
11: } else {
12: msg = data.toString();
13: }
14: res.setErrorMessage("Fail to decode request due to: " + msg);
15: res.setStatus(Response.BAD_REQUEST);
16: return res;
17: }
18: // 使用 ExchangeHandler 处理,并返回响应
19: // find handler by message class.
20: Object msg = req.getData();
21: try {
22: // handle data.
23: Object result = handler.reply(channel, msg);
24: res.setStatus(Response.OK);
25: res.setResult(result);
26: } catch (Throwable e) {
27: res.setStatus(Response.SERVICE_ERROR);
28: res.setErrorMessage(StringUtils.toString(e));
29: }
30: return res;
31: }- 第 3 至 17 行:请求无法解析,返回 BAD_REQUEST 响应。下面 ExchangeCodec ,我们将看到具体发生的代码。
- 第 18 至 30 行:调用
ExchangeHandler#reply(channel, message)
方法,返回结果,并设置到响应( Response) 最终返回。
#handleResponse(channel, response)
方法,代码如下:static void handleResponse(Channel channel, Response response) {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}- 非心跳事件响应,调用
DefaultFuture#received(channel, response)
方法,唤醒等待请求结果的线程。
- 非心跳事件响应,调用
🙂 比较繁杂,胖友耐心的看一看哟。
发生异常
1: |
第 3 至 17 行:当发生 ExecutionException 异常,返回异常响应( Response )。目前会发生 ExecutionException 的情况,并且符合提交,如下图所示:
ExecutionException
第 18 至 26 行:见注释。
6.3 ExchangeHandler
com.alibaba.dubbo.remoting.exchange.ExchangeHandler
,继承 ChannelHandler 和 TelnetHandler 接口,信息交换处理器接口。方法如下:
// 回复请求结果 |
- 注意,返回的是请求结果。正如我们在上文看到的,将请求结果,设置到
Response.mResult
属性中。
ExchangeHandler 是一个非常关键的接口。为什么这么说呢,点击 DubboProtocol. requestHandler
!胖友,领悟到了么?如果没有,淡定,后面我们会有文章分享。
6.3.1 ExchangeHandlerAdapter
com.alibaba.dubbo.remoting.exchange.support.ExchangeHandlerAdapter
,实现 ExchangeHandler 接口,继承 TelnetHandlerAdapter 抽象类,信息交换处理器适配器抽象类。代码如下:
|
在 DubboProtocol 、ThirftProtocol 中,都会基于 ExchangeHandlerAdapter 实现自己的处理器,处理请求,返回结果。
6.4 Replier
友情提示:这个小节,胖友可以选择性来看,目前仅用于
dubbo-remoting-p2p
模块中。
在 ExchangeHandler 中,我们看到的是,Request 对应统一的 ExchangeHandler 实现的对象。但是在一些场景下,我们希望实现,不同的数据类型,对应不同的处理器。Replier 就是来处理这种情况的。一个数据类型,对应一个 Replier 对象。
com.alibaba.dubbo.remoting.exchange.support.Replier
,回复者接口。代码如下:
public interface Replier<T> { |
- 和 ExchangeHandler 最大的不同是,使用的是泛型 T,而不是固定的 Request 。
6.4.1 ReplierDispatcher
com.alibaba.dubbo.remoting.exchange.support.ReplierDispatcher
,实现 Replier 接口,回复者调度器实现类。
构造方法
/** |
repliers
属性,回复者集合。可通过#addReplier(Class<T> type, Replier<T> replier)
或#removeReplier(Class<T> type)
方法,添加或移除回复者。
回复请求结果
|
- 调用
#getReplier(Class<?> type)
方法,获得回复者对象。 - 调用
Repiler#reply(channel, request)
方法,回复请求结果。
6.4.2 ExchangeHandlerDispatcher
com.alibaba.dubbo.remoting.exchange.support.ExchangeHandlerDispatcher
,实现 ExchangeHandler 接口,信息交换处理器调度器实现类。代码如下:
/** |
通过 ExchangeHandlerDispatcher ,将 ReplierDispatcher + ChannelHandlerDispatcher + TelnetHandler 三者结合在一起,将对应的事件,调度到合适的处理器。以
#reply(...)
#received(...)
#telnet(...)
方法,举例子,代码如下:
"unchecked", "rawtypes"}) ({
public Object reply(ExchangeChannel channel, Object request) throws RemotingException {
return replierDispatcher.reply(channel, request);
}
public void received(Channel channel, Object message) {
handlerDispatcher.received(channel, message);
}
public String telnet(Channel channel, String message) throws RemotingException {
return telnetHandler.telnet(channel, message);
}
7. Exchanger
com.alibaba.dubbo.remoting.exchange.Exchanger
,数据交换者接口。方法如下:
Exchanger 和 Transporter 类似。
(HeaderExchanger.NAME) |
@SPI(HeaderExchanger.NAME)
注解,Dubbo SPI 拓展点,默认为"header"
,即 HeaderExchanger 。@Adaptive({Constants.EXCHANGER_KEY})
注解,基于 Dubbo SPI Adaptive 机制,加载对应的 Server 实现,使用URL.exchanger
属性。@Adaptive({Constants.EXCHANGER_KEY})
注解,基于 Dubbo SPI Adaptive 机制,加载对应的 Client 实现,使用URL.exchanger
属性。
7.1 HeaderExchanger
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchanger
,实现 Exchanger 接口,基于消息头部( Header )的信息交换者实现类。代码如下:
public class HeaderExchanger implements Exchanger { |
- 以
#connect(...)
方法举例子。- 通过
Transporters#connect(url, handler)
方法,创建通信 Client ,内嵌到 HeaderExchangeClient 中。 - 传入的
handler
处理器,内嵌到 HeaderExchangeHandler ,再进一步内嵌到 DecodeHandler 中。所以,处理器的顺序是:DecodeHandler => HeaderExchangeHandler => ExchangeHandler(handler
) 。
- 通过
7.2 Exchangers
Exchangers 和 Transporters 类似。
com.alibaba.dubbo.remoting.Transporters
,数据交换者门面类,参见 Facade 设计模式。
代码比较简单,胖友自己查看列。
8. ExchangeCodec
胖友,打起精神,ExchangeCodec 非常重要。
com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec
,继承 TelnetCodec 类,信息交换编解码器。
- 基于消息长度的方式,做每条消息的粘包拆包处理。和我们在 《精尽 Dubbo 源码分析 —— NIO 服务器(二)之 Transport 层》 中,看到 Telnet 协议,基于特定字符的方式,做每条命令的粘包拆包处理不同。
- Header 部分,协议头,通过 Codec 编解码。Bits 位如下:
[0, 15]
:Magic Number[16, 20]
:Serialization 编号。[21]
:event
是否为事件。[22]
:twoWay
是否需要响应。[23]
:是请求还是响应。[24 - 31]
:status
状态。[32 - 95]
:id
编号,Long 型。[96 - 127]
:Body 的长度。通过该长度,读取 Body 。
- Body 部分,协议体,通过 Serialization 序列化/反序列化。
属性
// header length. |
HEADER_LENGTH
静态属性,Header 总长度,16 Bytes = 128 Bits 。- 其它静态属性,胖友对照上面的 Bits 位。
编码
1: |
- 第 3 至 4 行:调用
#encodeRequest(channel, buffer, request)
方法,编码请求。 - 第 5 至 6 行:调用
#encodeResponse(channel, buffer, response)
方法,编码响应。 - 第 7 至 9 行:调用
TelnetCodec#encode(channel, buffer, msg)
方法,编码 Telnet 命令的结果。 #encodeRequest(channel, buffer, request)
方法,代码如下:1: protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
2: Serialization serialization = getSerialization(channel);
3: // `[0, 15]`:Magic Number
4: // header.
5: byte[] header = new byte[HEADER_LENGTH];
6: // set magic number.
7: Bytes.short2bytes(MAGIC, header);
8:
9: // `[16, 20]`:Serialization 编号 && `[23]`:请求。
10: // set request and serialization flag.
11: header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
12:
13: // `[21]`:`event` 是否为事件。
14: if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
15: // `[22]`:`twoWay` 是否需要响应。
16: if (req.isEvent()) header[2] |= FLAG_EVENT;
17:
18: // `[32 - 95]`:`id` 编号,Long 型。
19: // set request id.
20: Bytes.long2bytes(req.getId(), header, 4);
21:
22: // 编码 `Request.data` 到 Body ,并写入到 Buffer
23: // encode request data.
24: int savedWriteIndex = buffer.writerIndex();
25: buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
26: ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
27: ObjectOutput out = serialization.serialize(channel.getUrl(), bos); // 序列化 Output
28: if (req.isEvent()) {
29: encodeEventData(channel, out, req.getData());
30: } else {
31: encodeRequestData(channel, out, req.getData());
32: }
33: // 释放资源
34: out.flushBuffer();
35: if (out instanceof Cleanable) {
36: ((Cleanable) out).cleanup();
37: }
38: bos.flush();
39: bos.close();
40: // 检查 Body 长度,是否超过消息上限。
41: int len = bos.writtenBytes();
42: checkPayload(channel, len);
43: // `[96 - 127]`:Body 的**长度**。
44: Bytes.int2bytes(len, header, 12);
45:
46: // 写入 Header 到 Buffer
47: // write
48: buffer.writerIndex(savedWriteIndex);
49: buffer.writeBytes(header); // write header.
50: buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
51: }- Header 部分,先写入
header
数组,再写入 Buffer 中。 Body 部分,使用 Serialization 序列化
Request.data
,写入到 Buffer 中。#encodeEventData(Channel channel, ObjectOutput out, Object data)
方法,代码如下:private void encodeEventData(Channel channel, ObjectOutput out, Object data) throws IOException {
encodeEventData(out, data);
}
private void encodeEventData(ObjectOutput out, Object data) throws IOException {
out.writeObject(data);
}- x
#encodeRequestData(Channel channel, ObjectOutput out, Object data)
方法,代码如下:protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
encodeRequestData(out, data);
}
protected void encodeRequestData(ObjectOutput out, Object data) throws IOException {
out.writeObject(data);
}#encodeEventData(...)
和#encodeRequestData(...)
两个方法是一致的。
- 第 42 行:会调用
#checkPayload(channel, len)
方法,校验 Body 内容的长度。笔者在这块纠结了很久,如果过长而抛出 ExceedPayloadLimitException 异常,那么 ChannelBuffer 是否重置下写入位置。后来发现自己煞笔了,每次 ChannelBuffer 都是新创建的,所以无需重置。 - 为什么 Buffer 先写入了 Body ,再写入 Header 呢?因为 Header 中,里面
[96 - 127]
的 Body 长度,需要序列化后才得到。
- Header 部分,先写入
#encodeResponse(channel, buffer, response)
方法,和#encodeRequest(chanel, buffer, request)
方法,基本一致,胖友自己瞅瞅列。主要差异点如下:[24 - 31]
:status
状态。这是 Request 没有,而 Response 有的部分。- 当响应的内容过长而抛出 ExceedPayloadLimitException 异常,根据条件,发送一条 Response (
status = BAD_RESPONSE
) 给请求方。
解码
1: |
- 第 3 至 6 行:读取
header
数组。注意,这里的Math.min(readable, HEADER_LENGTH)
,优先考虑解析 Dubbo 协议。 - 第 8 行:调用
#decode(channel, buffer, readable, header)
方法,解码。 - ========== 分隔线 ==========
- 第 13 至 32 行:非 Dubbo 协议,目前是 Telnet 协议。
- 第 17 至 21 行:将 Buffer 完全复制到
header
数组中。因为,上面的#decode(channel, buffer)
方法,可能未读全。因为,【第 3 至 6 行】,是以 Dubbo 协议 为优先考虑解码的。 - 第 22 至 29 行:【TODO 8026 】header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW ?搞不懂?
- 第 31 行:调用
Telnet#decode(channel, buffer, readable, header)
方法,解码 Telnet 。在 《精尽 Dubbo 源码分析 —— NIO 服务器(三)之 Telnet 层》 有详细解析。
- 第 17 至 21 行:将 Buffer 完全复制到
- 第 33 至 48 行:基于消息长度的方式,拆包。
- 第 50 至 54 行:调用
#decodeBody(channel, is, header)
方法,解析 Header + Body ,根据情况,返回 Request 或 Reponse 。🙂 逻辑上,是#encodeRequest(...)
和#encodeResponse(...)
方法的反向,所以,胖友就自己看啦。 - 第 55 至 67 行:skip 未读完的流,并打印告警日志。
666. 彩蛋
🙂 啰嗦而又冗长。
希望对胖友有一些些帮助。
建议,自己尝试实现简单的 Request Response 模型。