本文基于 Dubbo 2.6.1 版本,望知悉。
1. 概述
本文接 《精尽 Dubbo 源码分析 —— NIO 服务器(一)之抽象 API》 一文,分享 dubbo-remoting-api
模块, transport
包,网络传输层。
transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
涉及的类图如下:
- 白色部分,为通用接口。
- 蓝色部分,为
transport
包下的类。 - 整个类图,我们分成六个部分:
- Client
- Server
- Channel
- ChannelHandler
- Codec
- Dispacher
- 从流程上来说,我们分成:
- Server
- 启动
- 关闭
- Client
- 启动
- 关闭
- ChannelHandler
- 处理连接
- 处理断开
- 发送消息
- 接收消息
- 处理异常
- Server
艿艿的旁白:涉及较多类和流程,内容不是很线性,可能分享的比较凌乱,还望胖友谅解。建议,读 2-3 遍,并且做一些调试。
2. AbstractPeer
com.alibaba.dubbo.remoting.transport.AbstractPeer
,实现 Endpoint、ChannelHandler 接口,Peer 抽象类。
构造方法
1: /** |
handler
属性,通道处理器,通过构造方法传入。实现的 ChannelHandler 的接口方法,直接调用handler
的方法,进行执行逻辑处理。url
属性,URL ,通过构造方法传入。通过该属性,传递 Dubbo 服务引用和服务暴露的配置项。closing
属性,正在关闭,调用#startClose()
方法,变更。close
属性,关闭完成,调用#close()
方法,变更。
发送消息
|
sent
配置项:true
等待消息发出,消息发送失败将抛出异常。false
不等待消息发出,将消息放入 IO 队列,即刻返回。- 详细参见:《Dubbo 用户指南 —— 异步调用》
其他方法
胖友点击 AbstractPeer ,再看看所有的方法。
2.1 AbstractEndpint
com.alibaba.dubbo.remoting.transport.AbstractPeer.AbstractEndpint
,实现 Resetable 接口,继承 AbstractPeer 抽象类,端点抽象类。
构造方法
1: /** |
codec
属性,编解码器。在构造方法中,可以看到调用#getChannelCodec(url)
方法,基于url
参数,加载对应的 Codec 实现对象。代码如下:1: protected static Codec2 getChannelCodec(URL url) {
2: String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
3: if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) { // 例如,在 DubboProtocol 中,会获得 DubboCodec
4: return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
5: } else {
6: return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName));
7: }
8: }- 第 3 行:基于 Dubbo SPI 机制,加载对应的 Codec 实现对象。例如,在 DubboProtocol 中,会获得 DubboCodec 对象。
- 第 6 行:Codec 接口,已经废弃了,目前 Dubbo 项目里,也没有它的拓展实现。
重置属性
#reset(url)
实现方法,使用新的 url
属性,可重置 codec
timeout
connectTimeout
属性。🙂 已经添加了谅解,胖友点击可看。
3. Client
3.1 AbstractClient
com.alibaba.dubbo.remoting.transport.AbstractClient
,实现 Client 接口,继承 AbstractEndpoint 抽象类,客户端抽象类,重点实现了公用的重连逻辑,同时抽象了连接等模板方法,供子类实现。抽象方法如下:
protected abstract void doOpen() throws Throwable; |
构造方法
1: /** |
reconnectExecutorService
属性,重连定时任务执行器。在客户端连接服务端时,会创建后台任务,定时检查连接,若断开,会进行重连。- 第 27 至 31 行:从 URL 中,获得重连相关配置项。
- 第 33 至 41 行:调用
#doOpen()
抽象方法,初始化客户端。若异常,调用#close()
方法,进行关闭。 - 第 43 至 63 行:调用
#connect()
实现方法,连接服务器。若异常,调用#close()
方法,进行关闭。- 第 51 至 57 行:若是连接失败 RemotingException ,若开启了 启动时检查 ,则调用
#close()
方法,进行关闭。
- 第 51 至 57 行:若是连接失败 RemotingException ,若开启了 启动时检查 ,则调用
- 第 66 至 69 行:从 DataStore 中,获得线程池。
- DataStore 在
dubbo-common
模块,store
包下实现。目前的实现比较简单,可以认为是ConcurrentMap<String, ConcurrentMap<String, Object>>
的集合。胖友可以自己看相关实现。 - 此处的线程池,实际就是 《Dubbo 用户指南 —— 线程模型》 中说的线程池。在 「8. Dispacher」 中,详细解析。
- DataStore 在
连接服务器
/** |
- 第 3 行:获得锁。在连接和断开连接时,通过锁,避免并发冲突。
第 5 至 8 行:调用
#isConnected()
方法,判断连接状态。若已经连接,就不重复连接。代码如下:
public boolean isConnected() {
Channel channel = getChannel();
return channel != null && channel.isConnected();
}- 该方法,是因为实现 Channel 接口( Client 实现 Channel 接口 ),所以需要实现的。我们可以看到,实际方法内部,调用的是
channel
对象,进行判断。其它实现 Channel 的方法,也是这么处理的,例如#getAttribute(key)
等方法。
- 该方法,是因为实现 Channel 接口( Client 实现 Channel 接口 ),所以需要实现的。我们可以看到,实际方法内部,调用的是
第 10 行:调用
#initConnectStatusCheckCommand()
方法,初始化重连线程。- 🙂 方法会复杂一些,不杂糅在这里讲。
- 第 14 至 17 行:连接失败,抛出异常 RemotingException 。
- 第 18 至 25 行:连接成功,打印日志。
- 第 26 至 29 行:设置重连次数归零,打印过错误日志状态为否。下面,我们会看到这些状态字段的变更。
- 第 38 行:释放锁。
初始化重连线程
/** |
- 第 4 行:调用
#getReconnectParam(url)
方法,获得重连频率。默认开启,2000 毫秒。- 🙂 代码比较简单,胖友自己点击方法查看。
- 第 6 至 38 行:若开启重连功能, 创建重连线程。
- 第 8 至 35 行:创建 Runnable 对象。
- 第 11 至 13 行:未连接时,调用
#connect()
方法,进行重连。 - 第 14 至 17 行:已连接时,记录最后连接时间。
- 第 18 至 33 行:符合条件时,打印错误或告警日志。为什么要符合条件才打印呢?之前也和朋友聊起来过,线上因为中间件组件,打印了太多的日志,结果整个 JVM 崩了。特别在网络场景 + 大量“无限”重试的场景,特别容易打出满屏的日志。这块,我们可以学习下。另外,Eureka 在集群同步,也有类似处理。
- 第 11 至 13 行:未连接时,调用
- 第 36 行:发起任务,定时检查,是否需要重连。
- 第 8 至 35 行:创建 Runnable 对象。
reconnect=false to close reconnect
,从目前代码上来看,未实现#reset(url)
方法,在 URL 的reconnect=false
配置项时,关闭重连线程。
发送消息
|
包装通道处理器
// Constants.java |
第 10 行:调用
ExecutorUtil#setThreadName(url, CLIENT_THREAD_POOL_NAME)
方法,设置线程名,即URL.threadname=xxx
。代码如下:public static URL setThreadName(URL url, String defaultName) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, defaultName);
name = new StringBuilder(32).append(name).append("-").append(url.getAddress()).toString();
url = url.addParameter(Constants.THREAD_NAME_KEY, name);
return url;
}- 注意,线程名中,包含 URL 的地址信息。
第 12 行:设置线程类型,即
URL.threadpool=xxx
。默认情况下,使用"cached"
类型,这个和 Server 是不同的,下面我们会看到。- 第 14 行:调用
ChannelHandlers#wrap(handler, url)
方法,包装通道处理器。这里我们不细说,在 「8. Dispacher」 中,结合解析。 - 🙂 这是一个非常关键的方法,在例如 NettyClient 等里,都会调用该方法。
其他方法
如下方法比较简单,艿艿就不重复啰嗦了。
#disconnect()
方法,断开连接。#reconnect()
方法,主动重连。#close()
方法,强制关闭。#close(timeout)
方法,优雅关闭。
子类类图
3.2 ClientDelegate
com.alibaba.dubbo.remoting.transport.ClientDelegate
,实现 Client 接口,客户端装饰者实现类。在每个实现的方法里,直接调用被装饰的 client
属性的方法。
目前 dubbo-rpc-default
模块中,ChannelWrapper 继承了 ClientDelegate 类。但实际上,ChannelWrapper 重新实现了所有的方法,并且,并未复用任何方法。所以,ClientDelegate 目前用途不大。
4. Server
4.1 AbstractServer
com.alibaba.dubbo.remoting.transport.AbstractServer
,实现 Server 接口,继承 AbstractEndpoint 抽象类,服务器抽象类,重点实现了公用的逻辑,同时抽象了开启、关闭等模板方法,供子类实现。抽象方法如下:
protected abstract void doOpen() throws Throwable; |
构造方法
1: /** |
- 第 24 至 36 行:从 URL 中,加载
localAddress
bindAddress
accepts
idleTimeout
配置项。比较难理解的,可能是两个地址属性,如下是比例提供的一个例子:例子
- 配置项可在
#reset(url)
方法中,重置属性。
- 配置项可在
- 第 38 至 47 行:调用
#doOpen()
方法,开启服务器。 - 第 49 至 52 行:从 DataStore 中,获得线程池。
fixme replace this with better method
,说明官方在这块实现上,也不是很满意,后面会优化掉。
被客户端连接
|
发送消息
|
其他方法
如下方法比较简单,艿艿就不重复啰嗦了。
#disconnect()
方法,断开连接。#close()
方法,强制关闭。#close(timeout)
方法,优雅关闭。
子类类图
4.2 ServerDelegate
com.alibaba.dubbo.remoting.transport.ServerDelegate
,实现 Client 接口,客户端装饰者实现类。在每个实现的方法里,直接调用被装饰的 server
属性的方法。
目前 dubbo-remoting-p2p
模块中,PeerServer 会继承该类,后续再看。
5. Channel
5.1 AbstractChannel
com.alibaba.dubbo.remoting.transport.AbstractChannel
,实现 Channel 接口,实现 AbstractPeer 抽象类,通道抽象类。
发送消息
|
- 具体的发送方法,子类实现。在 AbstractChannel 中,目前只做状态检查。
子类类图
5.2 ChannelDelegate
com.alibaba.dubbo.remoting.transport.ChannelDelegate
,实现 Channel 接口,通道装饰者实现类。在每个实现的方法里,直接调用被装饰的 channel
属性的方法。
目前 Dubbo 中,暂未用到。
7. ChannelHandler
7.1 ChannelHandlerAdapter
com.alibaba.dubbo.remoting.transport.ChannelHandlerAdapter
,实现 ChannelHandler 接口,通道处理器适配器,每个方法为空实现。代码如下:
public void connected(Channel channel) throws RemotingException { } |
子类,可继承它,仅实现想要的方法。
7.2 ChannelHandlerDispatcher
com.alibaba.dubbo.remoting.transport.ChannelHandlerDispatcher
,实现 ChannelHandler 接口,通道处理器调度器。在它内部,有一个通道处理器数组 channelHandlers
属性。
每个实现的方法,都会循环调用 channelHandlers
的方法,例如:
public void received(Channel channel, Object message) { |
搜索了下 ChannelHandlerDispatcher 的使用情况,主要用在 dubbo-remoting-p2p
的 AbstractGroup 中。
7.3 ChannelHandlerDelegate
com.alibaba.dubbo.remoting.transport.ChannelHandlerDelegate
,实现 ChannelHandler 接口,通道处理器装饰者接口。方法如下:
ChannelHandler getHandler(); |
正如,我们在上文中说道,装饰器模式,在 dubbo-remoting-api
扮演了非常重要的角色,那么最佳演员就是 ChannelHandlerDelegate 们。下面,开始他们的表演。
7.3.1 AbstractChannelHandlerDelegate
com.alibaba.dubbo.remoting.transport.AbstractChannelHandlerDelegate
,实现 ChannelHandlerDelegate 接口,通道处理器装饰者抽象实现类。在每个实现的方法里,直接调用被装饰的 handler
属性的方法。
7.3.2 DecodeHandler
com.alibaba.dubbo.remoting.transport.DecodeHandler
,实现 AbstractChannelHandlerDelegate 抽象类,解码处理器,处理接收到的消息,实现了 Decodeable 接口的情况。
覆写了 #received(channel, message)
方法
1: |
- 第 3 至 5 行:当消息是 Decodeable 类型时,调用
#decode(message)
方法,解析消息。 - 第 7 至 9 行:当消息是 Request 类型时,调用
#decode(message)
方法,解析data
属性。 - 第 11 至 13 行:当消息是 Response 类型时,调用
#decode(message)
方法,解析result
属性。 - 第 15 行:调用
ChannelHandler#received(channel, message)
方法,将消息交给委托的handler
,继续处理。🙂 胖友是否感受到,装饰器模式的好处:通过组合的方式,实现功能的叠加。
解析消息
1: private void decode(Object message) { |
- 第 2 至 4 行:当类型是 Decodeable 时,调用
Decodeable#decode()
方法,进一步解析。 - 在
dubbo-rpc-default
项目中,DecodeableRpcInvocation 和 DecodeableRpcResult 实现 Decodeable 接口,后面我们来分享。
7.3.3 MultiMessageHandler
`com.alibaba.dubbo.remoting.transport.MultiMessageHandler ,实现 AbstractChannelHandlerDelegate 抽象类,多消息处理器,处理一次性接收到多条消息的情况。
覆写了 #received(channel, message)
方法
1: |
- 第 3 至 7 行:当消息是 MultiMessage 类型,即多消息,循环提交给
handler
处理。 - 第 8 至 10 行:当单消息时,直接提交给
handler
处理。
🙂 在下面的文章,我们可以看到 ChannelHandlerDelegate 的组合使用的例子。
8. Dispacher
本小节内容,对应 《Dubbo 用户指南 —— 线程模型》 。
简单概括这节,以接收消息举例子,代码如下:
executor.execute(new Runnable() { |
将 ChannelHandler 的具体操作,调度到线程池中,这也是为什么这个模块叫 dispacher
的原因。
8.1 ChannelHandlers
com.alibaba.dubbo.remoting.transport.dispatcher.ChannelHandlers
,通道处理器工厂。在上文 「3.1 AbstractClient」 ,我们看到 AbstractClient#wrapChannelHandler(url, handler)
方法中,会调用 ChannelHandlers#wrap(url, handler)
方法。实际上,Server 部分也会有这样类似的逻辑,只是代码实现上暂未统一。以 dubbo-remoting-netty4
来举例子:
NettyClient :
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}NettyServer :
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME) /* 设置线程名到 URL 上 */));
}
无论 Client 还是 Server ,都是类似的,将传入的 handler
,最终使用 ChannelHandlers 进行一次包装。OK ,我们来看看包装通道处理器的具体代码:
1: /** |
- 第 11 至 15 行:在这里,我们就看到了多个 ChannelHandlerDelegate 的组合。包括,第 15 行的,
Dispatcher#dispatch(handler, url)
方法,实际上也是返回一个 ChannelHandlerDelegate 对象。
8.2 Dispatcher 实现类
在 Dubbo 中,有多种 Dispatcher 的实现,如下:
FROM 《Dubbo 用户指南 —— 线程模型》
all
所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。direct
所有消息都不派发到线程池,全部在 IO 线程上直接执行。message
只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。execution
只请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。connection
在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。
子类类图
8.2.1 AllDispatcher
我们以 all
对应的 AllDispatcher 举例子,代码如下:
public class AllDispatcher implements Dispatcher { |
在该类的 #dispatch(...)
的方法中,我们可以看到创建 AllChannelHandler 对象,并传入 handler
属性。🙂 聪慧如你,已经猜到 AllChannelHandler 也是 ChannelHandlerDelegate 类型。也就是说“线程模型”,也是通过装饰器模式,组合而成。
每个 Dispatcher 实现类,都对应一个 ChannelHandler 实现类。默认未配置的情况下,使用 AllDispatcher 调度。
8.2.2 AllChannelHandler
com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler
,实现 WrappedChannelHandler 抽象类。覆写 #connected(channel)
方法如下:
WrappedChannelHandler 是实现 ChannelHandlerDelegate 的抽象类,下文再看。
|
- 创建 ChannelEventRunnable 对象,提交给线程池执行。
- 注意,传入的状态为
ChannelState.CONNECTED
。不同的实现方法,对应不同的状态。
8.3 ChannelEventRunnable
com.alibaba.dubbo.remoting.transport.dispatcher.ChannelEventRunnable
,实现 Runnable 接口。代码比较简单,胖友自己看噢。主要分成三部分:
- 构造方法
- ChannelState
#run()
方法,简化代码如下:
public void run() {
switch (state) {
case CONNECTED: handler.connected(channel); break;
case DISCONNECTED:handler.disconnected(channel); break;
case SENT:handler.sent(channel, message);break;
case RECEIVED:handler.received(channel, message);break;
case CAUGHT:handler.caught(channel, exception);break;
default: logger.warn("unknown state: " + state + ", message is " + message);
}
}
8.4 WrappedChannelHandler
com.alibaba.dubbo.remoting.transport.dispatcher.WrappedChannelHandler
,实现 ChannelHandlerDelegate 接口,包装的 WrappedChannelHandler 实现类。
从目前的实现来看,WrappedChannelHandler 继承 AbstractChannelHandlerDelegate 更合适,因为
#connected(channel)
等,实现的方法都是相同的。
构造方法
1: /** |
- 第 19 行:基于 Dubbo SPI Adaptive 机制,创建线程池。
- 第 21 至 27 行:添加线程池到 DataStore 中。🙂 这就是上文 AbstractClient 或 AbstractServer 从 DataStore 获得线程池的方式。当然,官方也说了,这种方式不是很优雅,有点奇淫技巧,未来会优化掉。
共享线程池
在 WrappedChannelHandler 中,有一个内置的共享线程池,如下:
protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true)); |
【TODO 8024】搞不懂,这个设计的意图,先mark留着。
子类类图
9. Codec
9.1 CodecSupport
com.alibaba.dubbo.remoting.transport.CodecSupport
,编解码工具类,提供查询 Serialization 的功能。
初始化
/** |
Dubbo 提供了多种序列化方式,此处初始化结果,如下图:SERIALIZATION 集合
查找 Serialization 对象
public static Serialization getSerialization(URL url, Byte id) throws IOException { |
🙂 在最新的 Dubbo 版本中,已经将 serialization
模块,从 dubbo-common
中,独立成 dubbo-serialization
。So ,我们后面开一个系列来分享。
9.2 AbstractCodec
com.alibaba.dubbo.remoting.transport.AbstractCodec
,实现 Codec2 接口,提供如下公用方法:
#checkPayload(channel, size)
静态方法,校验消息长度。#getSerialization(channel)
方法,获得 Serialization 对象。#isClientSide(channel)
方法,是否为客户端侧的通道。#isServerSide(channel)
方法,是否为服务端侧的通道。
子类类图
编解码器的实现,通过继承的方式,获得更多的功能。每一个 Codec2 类实现对不同消息的编解码。通过协议头来判断,具体使用哪个编解码逻辑。听起来有点绕,我们来看一段简化 ExchangeCodec 的 #decode(...)
例子:
1: protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException { |
- 第 2 至 7 行:通过 magic number 判断到,并非 Dubbo Exchange 信息交易的协议头,转交给父类 TelnetCodec 处理,一般此时是 Telnet 消息。
- 第 8 至 11 行:通过 magic number 判断到,符合 Dubbo Exchange 信息交易的协议头,ExchangeCodec 自己处理。
9.2.1 TransportCodec
com.alibaba.dubbo.remoting.transport.codec.TransportCodec
,传输编解码器,使用 Serialization 进行序列化/反序列化,直接编解码。
编码消息
1: |
- 第 3 至 5 行:获得对应的 Serialization 对象,并创建用于反序列化的 ObjectOutput 对象。不同的 Serialization 实现,对应不同的 ObjectOutput 实现类。🙂 这里,我们只要读懂大体流程,详细的,我们后面文章见。
第 7 行:调用
#encodeData(channel, objectOutput, message)
方法,写入 ObjectOutput。代码如下:protected void encodeData(Channel channel, ObjectOutput output, Object message) throws IOException {
encodeData(output, message);
}
protected void encodeData(ObjectOutput output, Object message) throws IOException {
output.writeObject(message);
}第 9 至 12 行:释放资源。目前,仅有
kryo
的 KryoObjectInput 、KryoObjectOutput 实现了 Cleanable 接口,需要释放资源。
解码消息
#decode(channel, buffer)
实现方法,和解码消息基本一致,胖友自己查看。
9.3 CodecAdapter
com.alibaba.dubbo.remoting.transport.codec.CodecAdapter
,实现 Code2 接口,Codec 适配器,将 Codec 适配成 Codec2 。
🙂 代码比较简单,胖友自己查看。
666. 彩蛋
代码比较多,如果不熟悉 Netty 等框架的胖友,可能会一脸懵逼的看到文末。建议胖友结合如下的代码:
// Netty.java |
在脑补 Debug + IDE Debug ,多考虑下。
推荐阅读: