1. 概述
本文分享 Netty NIO 服务端 NioServerSocketChannel 接受( accept )客户端连接的过程。简单来说:
- 服务端 NioServerSocketChannel 的 boss EventLoop 线程轮询是否有新的客户端连接接入。
- 当轮询到有新的连接接入,封装连入的客户端的 SocketChannel 为 Netty NioSocketChannel 对象。
- 选择一个服务端 NioServerSocketChannel 的 worker EventLoop ,将客户端的 NioSocketChannel 注册到其上。并且,注册客户端的 NioSocketChannel 的读事件,开始轮询该客户端是否有数据写入。
下面,让我们来看看具体的代码实现。
2. NioMessageUnsafe#read
老艿艿:有点不知道怎么取标题好,直接用方法名吧。
在 NioEventLoop 的 #processSelectedKey(SelectionKey k, AbstractNioChannel ch)
方法中,我们会看到这样一段代码:
// SelectionKey.OP_READ 或 SelectionKey.OP_ACCEPT 就绪 |
- 当
(readyOps & SelectionKey.OP_ACCEPT) != 0
时,这就是服务端 NioServerSocketChannel 的 boss EventLoop 线程轮询到有新的客户端连接接入。 - 然后,调用
NioMessageUnsafe#read()
方法,“读取”( 😈 这个抽象很灵性 )新的客户端连接连入。
NioMessageUnsafe#read()
方法,代码如下:
1: private final class NioMessageUnsafe extends AbstractNioUnsafe { |
- 😈 NioMessageUnsafe 只有一个
#read()
方法,而该方法,“读取”新的客户端连接连入。 第 15 行:调用
Unsafe#recvBufAllocHandle()
方法,获得 获得 RecvByteBufAllocator.Handle 对象。默认情况下,返回的是 AdaptiveRecvByteBufAllocator.HandleImpl 对象。关于它的内容,我们放在 ByteBuf 相关的文章,详细解析。第 17 行:调用
DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#reset(ChannelConfig)
方法,重置 RecvByteBufAllocator.Handle 对象。代码如下:
public void reset(ChannelConfig config) {
this.config = config; // 重置 ChannelConfig 对象
maxMessagePerRead = maxMessagesPerRead(); // 重置 maxMessagePerRead 属性
totalMessages = totalBytesRead = 0; // 重置 totalMessages 和 totalBytesRead 属性
}- 注意,AdaptiveRecvByteBufAllocator.HandleImpl 继承 DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle 抽象类。
第 22 至 42 行:while 循环 “读取”新的客户端连接连入。
- 第 25 行: 调用
NioServerSocketChannel#doReadMessages(List<Object> buf)
方法,读取客户端的连接到readBuf
中。详细解析,胖友先跳到 「3. AbstractNioMessageChannel#doReadMessages」 中,看完记得回到此处。 - 第 25 至 29 行:无可读取的客户端的连接,结束循环。
- 第 30 至 34 行:读取出错,标记关闭服务端,并结束循环。目前我们看到
NioServerSocketChannel#doReadMessages(List<Object> buf)
方法的实现,返回的结果只会存在 0 和 1 ,也就是说不会出现这种情况。笔者又去翻了别的实现类,例如NioDatagramChannel#doReadMessages(List<Object> buf)
方法,在发生异常时,会返回 -1 。 第 37 行:调用
AdaptiveRecvByteBufAllocator.HandleImpl#incMessagesRead(int amt)
方法,读取消息( 客户端 )数量 +localRead
。代码如下:
public final void incMessagesRead(int amt) {
totalMessages += amt;
}- 对于 AdaptiveRecvByteBufAllocator.HandleImpl 来说,考虑到抽象的需要,所以统一使用“消息”的说法。
第 38 行:调用
AdaptiveRecvByteBufAllocator.HandleImpl#incMessagesRead(int amt)#continueReading()
方法,判断是否循环是否继续,读取( 接受 )新的客户端连接。代码如下:// AdaptiveRecvByteBufAllocator.HandleImpl.java
public boolean continueReading() {
return continueReading(defaultMaybeMoreSupplier);
}
// DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle.java
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() &&
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
totalMessages < maxMessagePerRead &&
totalBytesRead > 0; // <1>
}- 因为
<1>
处,此时totalBytesRead
等于 0 ,所以会返回 false 。因此,循环会结束。也因此,对于 NioServerSocketChannel 来说,每次只接受一个新的客户端连接。😈 当然,因为服务端 NioServerSocketChannel 对Selectionkey.OP_ACCEPT
事件感兴趣,所以后续的新的客户端连接还是会被接受的。
- 因为
- 第 39 至 42 行:读取过程中发生异常,记录该异常到
exception
中,同时结束循环。
- 第 25 行: 调用
- 第 44 至 51 行:循环
readBuf
数组,触发 Channel read 事件到 pipeline 中。- 第 48 行:TODO 芋艿 细节
- 第 50 行:调用
ChannelPipeline#fireChannelRead(Object msg)
方法,触发 Channel read 事件到 pipeline 中。- 注意,传入的方法参数是新接受的客户端 NioSocketChannel 连接。
- 在内部,会通过 ServerBootstrapAcceptor ,将客户端的 Netty NioSocketChannel 注册到 EventLoop 上。详细解析,胖友先跳到 「4. ServerBootstrapAcceptor」 中,看完记得回到此处。
- 第 53 行:清空
readBuf
数组。 - 第 55 行:调用
RecvByteBufAllocator.Handle#readComplete()
方法,读取完成。暂无重要的逻辑,不详细解析。 第 57 行:调用
ChannelPipeline#fireChannelReadComplete()
方法,触发 Channel readComplete 事件到 pipeline 中。- 如果有需要,胖友可以自定义处理器,处理该事件。一般情况下,不需要。
如果没有自定义 ChannelHandler 进行处理,最终会被 pipeline 中的尾节点 TailContext 所处理。代码如下:
// TailContext.java
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
onUnhandledInboundChannelReadComplete();
}
// DefaultChannelPipeline.java
protected void onUnhandledInboundChannelReadComplete() {
}- 具体的调用是空方法。
- 第 60 至 66 行:
exception
非空,说明在接受连接过程中发生异常。- 第 62 行:TODO 芋艿 细节
- 第 65 行: 调用
ChannelPipeline#fireExceptionCaught(Throwable)
方法,触发 exceptionCaught 事件到 pipeline 中。- 默认情况下,会使用 ServerBootstrapAcceptor 处理该事件。详细解析,见 「4.3 exceptionCaught」 。
- 如果有需要,胖友可以自定义处理器,处理该事件。一般情况下,不需要。
- 第 68 至 75 行:TODO 芋艿 细节
- 第 76 至 87 行:TODO 芋艿 细节
3. AbstractNioMessageChannel#doReadMessages
doReadMessages(List<Object> buf)
抽象方法,读取客户端的连接到方法参数 buf
中。它是一个抽象方法,定义在 AbstractNioMessageChannel 抽象类中。代码如下:
/** |
- 返回值为读取到的数量。
NioServerSocketChannel 对该方法的实现代码如下:
1: |
第 4 行:调用
SocketUtils#accept(ServerSocketChannel serverSocketChannel)
方法,接受客户端连接。代码如下:public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
public SocketChannel run() throws IOException {
return serverSocketChannel.accept(); // <1>
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}- 重点是看
<1>
处,调用ServerSocketChannel#accept()
方法,接受客户端连接。
- 重点是看
- 第 9 行:基于客户端的 NIO ServerSocket ,创建 Netty NioSocketChannel 对象。整个过程,就是 《精尽 Netty 源码分析 —— 启动(二)之客户端》 的 「3.7.1 创建 Channel 对象」 小节。
- 第 10 行:返回 1 ,表示成功接受了 1 个新的客户端连接。
- 第 12 至 20 行:发生异常,关闭客户端的 SocketChannel 连接,并打印告警日志。
- 第 22 行:返回 0 ,表示成功接受 0 个新的客户端连接。
4. ServerBootstrapAcceptor
ServerBootstrapAcceptor ,继承 ChannelInboundHandlerAdapter 类,服务器接收器( acceptor ),负责将接受的客户端的 NioSocketChannel 注册到 EventLoop 中。
另外,从继承的是 ChannelInboundHandlerAdapter 类,可以看出它是 Inbound 事件处理器。
4.1 构造方法
在服务端的启动过程中,我们看到 ServerBootstrapAcceptor 注册到服务端的 NioServerSocketChannel 的 pipeline 的尾部,代码如下:
// 记录当前的属性 |
即
<1>
处。也是在此处,创建了 ServerBootstrapAcceptor 对象。代码如下:private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
/**
* 自动恢复接受客户端连接的任务
*/
private final Runnable enableAutoReadTask;
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
// Task which is scheduled to re-enable auto-read.
// It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
// not be able to load the class because of the file limit it already reached.
//
// See https://github.com/netty/netty/issues/1328
enableAutoReadTask = new Runnable() { // <2>
public void run() {
channel.config().setAutoRead(true);
}
};
}enableAutoReadTask
属性,自动恢复接受客户端连接的任务,在<2>
处初始化。具体的使用,我们在 「4.3 exceptionCaught」 中,详细解析。
4.2 channelRead
#channelRead(ChannelHandlerContext ctx, Object msg)
方法,将接受的客户端的 NioSocketChannel 注册到 EventLoop 中。代码如下:
1: |
- 为了方便描述,我们统一认为接受的客户端连接为 NioSocketChannel 对象。
- 第 6 行:接受的客户端的 NioSocketChannel 对象。
- 第 8 行:调用
ChannelPipeline#addLast(childHandler)
方法,将配置的子 Channel 的处理器,添加到 NioSocketChannel 中。 - 第 10 至 14 行:设置 NioSocketChannel 的配置项、属性。
- 第 8 行:调用
第 17 至 28 行:调用
EventLoopGroup#register(Channel channel)
方法,将客户端的 NioSocketChannel 对象,从 worker EventLoopGroup 中选择一个 EventLoop ,注册到其上。- 后续的逻辑,就和 《精尽 Netty 源码分析 —— 启动(一)之服务端》 的注册逻辑基本一致( 虽然说,文章写的是 NioServerSocketChannel 的注册逻辑 )。
- 在注册完成之后,该 worker EventLoop 就会开始轮询该客户端是否有数据写入。
第 18 至 28 行:添加监听器,如果注册失败,则调用
#forceClose(Channel child, Throwable t)
方法,强制关闭客户端的 NioSocketChannel 连接。代码如下:private static void forceClose(Channel child, Throwable t) {
child.unsafe().closeForcibly();
logger.warn("Failed to register an accepted channel: {}", child, t);
}- 在该方法内部,会调用
Unsafe#closeForcibly()
方法,强制关闭客户端的 NioSocketChannel 。
- 在该方法内部,会调用
- 第 29 至 32 行:发生异常,则调用
#forceClose(Channel child, Throwable t)
方法,强制关闭客户端的 NioSocketChannel 连接。
4.3 exceptionCaught
#exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
方法,当捕获到异常时,暂停 1 秒,不再接受新的客户端连接;而后,再恢复接受新的客户端连接。代码如下:
1: |
第 8 行:调用
ChannelConfig#setAutoRead(false)
方法,关闭接受新的客户端连接。代码如下:// DefaultChannelConfig.java
/**
* {@link #autoRead} 的原子更新器
*/
private static final AtomicIntegerFieldUpdater<DefaultChannelConfig> AUTOREAD_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DefaultChannelConfig.class, "autoRead");
/**
* 是否开启自动读取的开关
*
* 1 - 开启
* 0 - 关闭
*/
"FieldMayBeFinal") (
private volatile int autoRead = 1;
public ChannelConfig setAutoRead(boolean autoRead) {
// 原子更新,并且获得更新前的值 <1>
boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
// 发起读取 <2.1>
if (autoRead && !oldAutoRead) {
channel.read();
// 关闭读取 <2.2>
} else if (!autoRead && oldAutoRead) {
autoReadCleared();
}
return this;
}autoRead
字段,是否开启自动读取的开关。😈 笔者原本以为是个boolean
类型,是不是胖友也是。其中,1 表示开启,0 表示关闭。AUTOREAD_UPDATER
静态变量,对autoRead
字段的原子更新器。
<1>
处,使用AUTOREAD_UPDATER
更新autoRead
字段,并获得更新前的值。为什么需要获取更新前的值呢?在后续的<2.1>
和<2.2>
中,当autoRead
有变化时候,才进行后续的逻辑。- 😈 下面的逻辑,我们按照
channel
的类型为 NioServerSocketChannel 来分享。 <2.1>
处,autoRead && !oldAutoRead
返回true
,意味着恢复重启开启接受新的客户端连接。所以调用NioServerSocketChannel#read()
方法,后续的逻辑,就是 《精尽 Netty 源码分析 —— 启动(一)之服务端》 的 「3.13.3 beginRead」 的逻辑。<2.2>
处,!autoRead && oldAutoRead
返回false
,意味着关闭接受新的客户端连接。所以调用#autoReadCleared()
方法,移除对SelectionKey.OP_ACCEPT
事件的感兴趣。// NioServerSocketChannel.java
protected void autoReadCleared() {
clearReadPending();
}在方法内部,会调用
#clearReadPending()
方法,代码如下:protected final void clearReadPending() {
if (isRegistered()) {
EventLoop eventLoop = eventLoop();
if (eventLoop.inEventLoop()) {
clearReadPending0();
} else {
eventLoop.execute(clearReadPendingRunnable);
}
} else {
// Best effort if we are not registered yet clear readPending. This happens during channel initialization.
// NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is
// not set yet so it would produce an assertion failure.
readPending = false;
}
}
private final Runnable clearReadPendingRunnable = new Runnable() {
public void run() {
clearReadPending0();
}
};
private void clearReadPending0() {
// TODO 芋艿
readPending = false;
// 移除对“读”事件的感兴趣。
((AbstractNioUnsafe) unsafe()).removeReadOp();
}最终的结果,是在 EventLoop 的线程中,调用
AbstractNioUnsafe#clearReadPending0()
方法,移除对“读”事件的感兴趣( 对于 NioServerSocketChannel 的 “读“事件就是SelectionKey.OP_ACCEPT
)。代码如下:// AbstractNioUnsafe.java
protected final void removeReadOp() {
SelectionKey key = selectionKey();
// 忽略,如果 SelectionKey 不合法,例如已经取消
// Check first if the key is still valid as it may be canceled as part of the deregistration
// from the EventLoop
// See https://github.com/netty/netty/issues/2104
if (!key.isValid()) {
return;
}
// 移除对“读”事件的感兴趣。
int interestOps = key.interestOps();
if ((interestOps & readInterestOp) != 0) {
// only remove readInterestOp if needed
key.interestOps(interestOps & ~readInterestOp);
}
}- 通过取反求并,后调用
SelectionKey#interestOps(interestOps)
方法,仅移除对“读”事件的感兴趣。 - 😈 整个过程的调用链,有丢丢长,胖友可以回看,或者多多调试。
- 通过取反求并,后调用
- 第 10 行:调用
EventLoop#schedule(Runnable command, long delay, TimeUnit unit)
方法,发起 1 秒的延迟任务,恢复重启开启接受新的客户端连接。该定时任务会调用ChannelConfig#setAutoRead(true)
方法,即对应<2.1>
情况。 - 第 16 行:调用
ChannelHandlerContext#fireExceptionCaught(cause)
方法,继续传播 exceptionCaught 给下一个节点。具体的原因,可看英文注释。
666. 彩蛋
推荐阅读文章: