1. 概述
本文,我们来分享 Bootstrap 分享 Netty 客户端。因为我们日常使用 Netty 主要使用 NIO 部分,所以本文也只分享 Netty NIO 客户端。
2. Bootstrap 示例
下面,我们先来看一个 ServerBootstrap 的使用示例,就是我们在 《精尽 Netty 源码分析 —— 调试环境搭建》 搭建的 EchoClient 示例。代码如下:
public final class EchoClient { |
- 示例比较简单,已经添加中文注释,胖友自己查看。
3. Bootstrap
io.netty.bootstrap.Bootstrap
,实现 AbstractBootstrap 抽象类,用于 Client 的启动器实现类。
3.1 构造方法
/** |
config
属性,BootstrapConfig 对象,启动类配置对象。resolver
属性,地址解析器对象。绝大多数情况下,使用DEFAULT_RESOLVER
即可。remoteAddress
属性,连接地址。
3.2 resolver
#resolver(AddressResolverGroup<?> resolver)
方法,设置 resolver
属性。代码如下:
public Bootstrap resolver(AddressResolverGroup<?> resolver) { |
3.3 remoteAddress
#remoteAddress(...)
方法,设置 remoteAddress
属性。代码如下:
public Bootstrap resolver(AddressResolverGroup<?> resolver) { |
3.4 validate
#validate()
方法,校验配置是否正确。代码如下:
|
- 在
#connect(...)
方法中,连接服务端时,会调用该方法进行校验。
3.5 clone
#clone(...)
方法,克隆 Bootstrap 对象。代码如下:
|
- 两个克隆方法,都是调用参数为
bootstrap
为 Bootstrap 构造方法,克隆一个 Bootstrap 对象。差别在于,下面的方法,多了对group
属性的赋值。
3.6 connect
#connect(...)
方法,连接服务端,即启动客户端。代码如下:
public ChannelFuture connect() { |
- 该方法返回的是 ChannelFuture 对象,也就是异步的连接服务端,启动客户端。如果需要同步,则需要调用
ChannelFuture#sync()
方法。
#connect(...)
方法,核心流程如下图:
#bind(...)
方法,核心流程如下图:
- 主要有 5 个步骤,下面我们来拆解代码,看看和我们在 《精尽 Netty 源码分析 —— NIO 基础(五)之示例》 的 NioClient 的代码,是怎么对应的。
- 相比
#bind(...)
方法的流程,主要是绿色的 2 个步骤。
3.6.1 doResolveAndConnect
#doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress)
方法,代码如下:
1: private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { |
- 第 3 行:调用
#initAndRegister()
方法,初始化并注册一个 Channel 对象。因为注册是异步的过程,所以返回一个 ChannelFuture 对象。详细解析,见 「3.7 initAndRegister」 。- 第 6 至 10 行:若执行失败,直接进行返回
regFuture
对象。
- 第 6 至 10 行:若执行失败,直接进行返回
- 第 9 至 37 行:因为注册是异步的过程,有可能已完成,有可能未完成。所以实现代码分成了【第 12 行】和【第 13 至 37 行】分别处理已完成和未完成的情况。
- 核心在【第 12 行】或者【第 33 行】的代码,调用
#doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise)
方法,解析远程地址,并进行连接。 - 如果异步注册对应的 ChanelFuture 未完成,则调用
ChannelFuture#addListener(ChannelFutureListener)
方法,添加监听器,在注册完成后,进行回调执行#doResolveAndConnect0(...)
方法的逻辑。详细解析,见 「3.6.2 doResolveAndConnect0」 。 - 所以总结来说,resolve 和 connect 的逻辑,执行在 register 的逻辑之后。
- 核心在【第 12 行】或者【第 33 行】的代码,调用
3.6.2 doResolveAndConnect0
老艿艿:此小节的内容,胖友先看完 「3.7 initAndRegister」 的内容在回过头来看。因为
#doResolveAndConnect0(...)
方法的执行,在#initAndRegister()
方法之后。
#doResolveAndConnect0(...)
方法,解析远程地址,并进行连接。代码如下:
1: private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress, |
- 第 3 至 14 行:使用
resolver
解析远程地址。因为解析是异步的过程,所以返回一个 Future 对象。- 详细的解析远程地址的代码,考虑到暂时不是本文的重点,所以暂时省略。😈 老艿艿猜测胖友应该也暂时不感兴趣,哈哈哈。
- 第 16 至 44 行:因为注册是异步的过程,有可能已完成,有可能未完成。所以实现代码分成了【第 16 至 29 行】和【第 31 至 44 行】分别处理已完成和未完成的情况。
- 核心在【第 26 行】或者【第 41 行】的代码,调用
#doConnect(...)
方法,连接远程地址。 - 如果异步解析对应的 Future 未完成,则调用
Future#addListener(FutureListener)
方法,添加监听器,在解析完成后,进行回调执行#doConnect(...)
方法的逻辑。详细解析,见 见 「3.13.3 doConnect」 。 - 所以总结来说,connect 的逻辑,执行在 resolve 的逻辑之后。
- 老艿艿目前使用 「2. Bootstrap 示例」 测试下来,符合【第 16 至 30 行】的条件,即无需走异步的流程。
- 核心在【第 26 行】或者【第 41 行】的代码,调用
3.6.3 doConnect
#doConnect(...)
方法,执行 Channel 连接远程地址的逻辑。代码如下:
1: private static void doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) { |
- 第 6 行:调用 EventLoop 执行 Channel 连接远程地址的逻辑。但是,实际上当前线程已经是 EventLoop 所在的线程了,为何还要这样操作呢?答案在【第 3 至 4 行】的英语注释。感叹句,Netty 虽然代码量非常庞大且复杂,但是英文注释真的是非常齐全,包括 Github 的 issue 对代码提交的描述,也非常健全。
- 第 10 至 14 行:调用
Channel#connect(...)
方法,执行 Channel 连接远程地址的逻辑。后续的方法栈调用如下图:Channel connect 流程
- 还是老样子,我们先省略掉 pipeline 的内部实现代码,从
AbstractNioUnsafe#connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise)
方法,继续向下分享。
- 还是老样子,我们先省略掉 pipeline 的内部实现代码,从
AbstractNioUnsafe#connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise)
方法,执行 Channel 连接远程地址的逻辑。代码如下:
1: |
第 8 至 12 行:目前有正在连接远程地址的 ChannelPromise ,则直接抛出异常,禁止同时发起多个连接。
connectPromise
变量,定义在 AbstractNioChannel 类中,代码如下:/**
* 目前正在连接远程地址的 ChannelPromise 对象。
*
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
private ChannelPromise connectPromise;第 15 行:调用
#isActive()
方法,获得 Channel 是否激活。NioSocketChannel 对该方法的实现代码如下:
public boolean isActive() {
SocketChannel ch = javaChannel();
return ch.isOpen() && ch.isConnected();
}- 判断 SocketChannel 是否处于打开,并且连接的状态。此时,一般返回的是
false
。
- 判断 SocketChannel 是否处于打开,并且连接的状态。此时,一般返回的是
第 18 行:调用
#doConnect(SocketAddress remoteAddress, SocketAddress localAddress)
方法,执行连接远程地址。代码如下:// NioSocketChannel.java
1:
2: protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
3: // 绑定本地地址
4: if (localAddress != null) {
5: doBind0(localAddress);
6: }
7:
8: boolean success = false; // 执行是否成功
9: try {
10: // 连接远程地址
11: boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
12: // 若未连接完成,则关注连接( OP_CONNECT )事件。
13: if (!connected) {
14: selectionKey().interestOps(SelectionKey.OP_CONNECT);
15: }
16: // 标记执行是否成功
17: success = true;
18: // 返回是否连接完成
19: return connected;
20: } finally {
21: // 执行失败,则关闭 Channel
22: if (!success) {
23: doClose();
24: }
25: }
26: }- 第 3 至 6 行:若
localAddress
非空,则调用#doBind0(SocketAddress)
方法,绑定本地地址。一般情况下,NIO Client 是不需要绑定本地地址的。默认情况下,系统会随机分配一个可用的本地地址,进行绑定。 第 11 行:调用
SocketUtils#connect(SocketChannel socketChannel, SocketAddress remoteAddress)
方法,Java 原生 NIO SocketChannel 连接 远程地址,并返回是否连接完成( 成功 )。代码如下:public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
public Boolean run() throws IOException {
return socketChannel.connect(remoteAddress);
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}可能有胖友有和我一样的疑问,为什么将 connect 操作包在 AccessController 中呢?我们来看下 SocketUtils 类的注释:
/**
* Provides socket operations with privileges enabled. This is necessary for applications that use the
* {@link SecurityManager} to restrict {@link SocketPermission} to their application. By asserting that these
* operations are privileged, the operations can proceed even if some code in the calling chain lacks the appropriate
* {@link SocketPermission}.
*/- 一般情况下,我们用不到,所以也可以暂时不用理解。
- 感兴趣的胖友,可以 Google “AccessController” 关键字,或者阅读 《Java 安全模型介绍》 。
【重要】第 12 至 15 行:若连接未完成(
connected == false
)时,我们可以看到,调用SelectionKey#interestOps(ops)
方法,添加连接事件(SelectionKey.OP_CONNECT
)为感兴趣的事件。也就说,也就是说,当连接远程地址成功时,Channel 对应的 Selector 将会轮询到该事件,可以进一步处理。- 第 20 至 25 行:若执行失败(
success == false
)时,调用#doClose()
方法,关闭 Channel 。
- 第 3 至 6 行:若
- 第 18 至 19 行:笔者测试下来,
#doConnect(SocketChannel socketChannel, SocketAddress remoteAddress)
方法的结果为false
,所以不会执行【第 19 行】代码的#fulfillConnectPromise(ChannelPromise promise, boolean wasActive)
方法,而是执行【第 20 至 57 行】的代码逻辑。 - 第 22 行:记录
connectPromise
。 第 24 行:记录
requestedRemoteAddress
。requestedRemoteAddress
变量,在 AbstractNioChannel 类中定义,代码如下:/**
* 正在连接的远程地址
*/
private SocketAddress requestedRemoteAddress;第 26 至 40 行:调用
EventLoop#schedule(Runnable command, long delay, TimeUnit unit)
方法,发起定时任务connectTimeoutFuture
,监听连接远程地址是否超时。若连接超时,则回调通知connectPromise
超时异常。connectPromise
变量,在 AbstractNioChannel 类中定义,代码如下:/**
* 连接超时监听 ScheduledFuture 对象。
*/
private ScheduledFuture<?> connectTimeoutFuture;第 42 至 57 行:调用
ChannelPromise#addListener(ChannelFutureListener)
方法,添加监听器,监听连接远程地址是否取消。若取消,则取消connectTimeoutFuture
任务,并置空connectPromise
。这样,客户端 Channel 可以发起下一次连接。
3.6.4 finishConnect
看到此处,可能胖友会有疑问,客户端的连接在哪里完成呢?答案在 AbstractNioUnsafe#finishConnect()
方法中。而该方法通过 Selector 轮询到 SelectionKey.OP_CONNECT
事件时,进行触发。调用栈如下图:finishConnect 调用栈
* 哈哈哈,还是老样子,我们先省略掉 EventLoop 的内部实现代码,从 `AbstractNioUnsafe#finishConnect()` 方法,继续向下分享。
AbstractNioUnsafe#finishConnect()
方法,完成客户端的连接。代码如下:
1: |
- 第 6 行:判断是否在 EventLoop 的线程中。
- 第 10 行:调用
#isActive()
方法,获得 Channel 是否激活。笔者调试时,此时返回false
,因为连接还没完成。 - 第 12 行:调用
#doFinishConnect()
方法,执行完成连接的逻辑。详细解析,见 「3.6.4.1 doFinishConnect」 。 - 第 14 行:执行完成连接成功,调用
#fulfillConnectPromise(ChannelPromise promise, boolean wasActive)
方法,通知connectPromise
连接完成。详细解析,见 「3.6.4.2 fulfillConnectPromise 成功」 。 - 第 15 至 17 行:执行完成连接异常,调用
#fulfillConnectPromise(ChannelPromise promise, Throwable cause)
方法,通知connectPromise
连接异常。详细解析,见 「3.6.4.3 fulfillConnectPromise 异常」 。 - 第 18 至 27 行:执行完成连接结束,取消
connectTimeoutFuture
任务,并置空connectPromise
。
3.6.4.1 doFinishConnect
NioSocketChannel#doFinishConnect()
方法,执行完成连接的逻辑。代码如下:
|
- 【重要】是不是非常熟悉的,调用
SocketChannel#finishConnect()
方法,完成连接。😈 美滋滋。
3.6.4.2 fulfillConnectPromise 成功
AbstractNioUnsafe#fulfillConnectPromise(ChannelPromise promise, Throwable cause)
方法,通知 connectPromise
连接完成。代码如下:
1: private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { |
- 第 10 行:调用
#isActive()
方法,获得 Channel 是否激活。笔者调试时,此时返回true
,因为连接已经完成。 第 14 行:回调通知
promise
执行成功。此处的通知,对应回调的是我们添加到#connect(...)
方法返回的 ChannelFuture 的 ChannelFutureListener 的监听器。示例代码如下:ChannelFuture f = b.connect(HOST, PORT).addListener(new ChannelFutureListener() { // 回调的就是我!!!
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("连接完成");
}
}).sync();
- 第 19 行:因为
wasActive == false
并且active == true
,因此,Channel 可以认为是新激活的,满足【第 20 行】代码的执行条件。- 第 40 行:调用
DefaultChannelPipeline#fireChannelActive()
方法,触发 Channel 激活的事件。【重要】后续的流程,和 NioServerSocketChannel 一样,也就说,会调用到AbstractUnsafe#beginRead()
方法。这意味着什么呢?将我们创建 NioSocketChannel 时,设置的readInterestOp = SelectionKey.OP_READ
添加为感兴趣的事件。也就说,客户端可以读取服务端发送来的数据。 - 关于
AbstractUnsafe#beginRead()
方法的解析,见 《精尽 Netty 源码分析 —— 启动(一)之服务端》的 「3.13.3 beginRead」 部分。
- 第 40 行:调用
- 第 23 至 27 行:TODO 芋艿 1004 fulfillConnectPromise promiseSet
3.6.4.3 fulfillConnectPromise 异常
#fulfillConnectPromise(ChannelPromise promise, Throwable cause)
方法,通知 connectPromise
连接异常。代码如下:
private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) { |
- 比较简单,已经添加中文注释,胖友自己查看。
3.7 initAndRegister
Bootstrap 继承 AbstractBootstrap 抽象类,所以 #initAndRegister()
方法的流程上是一致的。所以和 ServerBootstrap 的差别在于:
- 创建的 Channel 对象不同。
- 初始化 Channel 配置的代码实现不同。
3.7.1 创建 Channel 对象
考虑到本文的内容,我们以 NioSocketChannel 的创建过程作为示例。创建 NioSocketChannel 对象的流程,和 NioServerSocketChannel 基本是一致的,所以流程图我们就不提供了,直接开始撸源码。
3.7.1.1 NioSocketChannel
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); |
DEFAULT_SELECTOR_PROVIDER
静态属性,默认的 SelectorProvider 实现类。config
属性,Channel 对应的配置对象。每种 Channel 实现类,也会对应一个 ChannelConfig 实现类。例如,NioSocketChannel 类,对应 SocketChannelConfig 配置类。在构造方法中,调用
#newSocket(SelectorProvider provider)
方法,创建 NIO 的 ServerSocketChannel 对象。代码如下:private static SocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each SocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}- 😈 是不是很熟悉这样的代码,效果和
SocketChannel#open()
方法创建 SocketChannel 对象是一致。
- 😈 是不是很熟悉这样的代码,效果和
#NioSocketChannel(SocketChannel channel)
构造方法,代码如下:public NioSocketChannel(SocketChannel socket) {
this(null, socket);
}
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}- 调用父 AbstractNioByteChannel 的构造方法。详细解析,见 「3.7.1.2 AbstractNioByteChannel」 。
- 初始化
config
属性,创建 NioSocketChannelConfig 对象。
3.7.1.2 AbstractNioByteChannel
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { |
- 调用父 AbstractNioChannel 的构造方法。后续的构造方法,和 NioServerSocketChannel 是一致的。
- 注意传入的 SelectionKey 的值为
OP_READ
。
- 注意传入的 SelectionKey 的值为
3.7.2 初始化 Channel 配置
#init(Channel channel)
方法,初始化 Channel 配置。代码如下:
|
- 比较简单,已经添加中文注释,胖友自己查看。
666. 彩蛋
撸完 Netty 服务端启动之后,再撸 Netty 客户端启动之后,出奇的顺手。美滋滋。
另外,也推荐如下和 Netty 客户端启动相关的文章,以加深理解: