1. 概述
本文分享 Netty NIO Channel 关闭( close )操作的过程,分成客户端和服务端 Channel 两种关闭:
- 客户端 NioSocketChannel
- 客户端关闭 NioSocketChannel ,断开和服务端的连接。
- 服务端关闭 NioSocketChannel ,断开和客户端的连接。
- 服务端 NioServerSocketChannel
- 服务端关闭 NioServerSocketChannel ,取消端口绑定,关闭服务。
上面的关闭,可能是客户端/服务端主动关闭,也可能是异常关闭。
- 关于 NioSocketChannel 的关闭,在 「2. NioSocketChannel」 详细解析。
- 关于 NioServerSocketChannel 的关闭,在 「3. NioSocketChannel」 详细解析。
2. NioSocketChannel
通过 NioSocketChannel#close()
方法,应用程序里可以主动关闭 NioSocketChannel 通道。代码如下:
// AbstractChannel.java |
- NioSocketChannel 继承 AbstractChannel 抽象类,所以
#close()
方法实际是 AbstractChannel 实现的。 在方法内部,会调用对应的
ChannelPipeline#close()
方法,将 close 事件在 pipeline 上传播。而 close 事件属于 Outbound 事件,所以会从tail
节点开始,最终传播到head
节点,使用 Unsafe 进行关闭。代码如下:// DefaultChannelPipeline.java
public final ChannelFuture close() {
return tail.close();
}
// TailContext.java
// FROM AbstractChannelHandlerContext.java 。因为 TailContext 继承 AbstractChannelHandlerContext 抽象类,该方法是它实现的。
public ChannelFuture close() {
return close(newPromise());
}
// HeadContext.java
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.close(promise);
}
2.1 AbstractUnsafe#close
AbstractUnsafe#close()
方法,关闭 Channel 。代码如下:
|
方法参数
cause
、closeCause
,关闭的“原因”。对于 close 操作来说,无论是正常关闭,还是异常关闭,通过使用 Exception 来表示来源。在 AbstractChannel 类中,枚举了所有来源:// AbstractChannel.java
private static final ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractUnsafe.class, "flush0()");
private static final ClosedChannelException ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractUnsafe.class, "ensureOpen(...)");
private static final ClosedChannelException CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractUnsafe.class, "close(...)");
private static final ClosedChannelException WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
new ClosedChannelException(), AbstractUnsafe.class, "write(...)");
private static final NotYetConnectedException FLUSH0_NOT_YET_CONNECTED_EXCEPTION = ThrowableUtil.unknownStackTrace(
new NotYetConnectedException(), AbstractUnsafe.class, "flush0()");第 2 至 5 行:调用
ChannelPromise#setUncancellable()
方法,设置 Promise 不可取消。- 第 8 行:若
AbstractChannel.closeInitiated
为true
时,表示关闭已经标记初始化,此时可能已经关闭完成。- 第 10 至 12 行:关闭已经完成,直接通知 Promise 对象。
- 第 13 至 22 行:关闭并未完成,通过监听器回调通知 Promise 对象。
- 第 23 行:
return
结束。 - 第 27 行:标记关闭已经初始化。
- 第 30 行:调用
#isActive()
方法, 获得 Channel 是否激活。 - 第 31 至 33 行:标记内存队列
outboundBuffer
为空。 - 第 35 行:调用
#prepareToClose()
方法,执行准备关闭。详细解析,胖友先跳到 「2.2 NioSocketChannelUnsafe#prepareToClose」 中。 - 第 37 行:若
closeExecutor
非空,在 「2.2 NioSocketChannelUnsafe#prepareToClose」 中,我们已经看到如果开启SO_LINGER
功能,会返回GlobalEventExecutor.INSTANCE
对象。- 第 38 至 44 行:提交任务到
closeExecutor
中,在它的线程中,执行#doClose0(promise)
方法,执行关闭。为什么要在“在它的线程中”中?回答不出来的胖友,再好好重新看下 「2.2 NioSocketChannelUnsafe#prepareToClose」 小节。 - 第 46 至 61 行:提交任务到 Channel 所在的 EventLoop 中,执行后续的任务。
- 整体的逻辑和代码,和【第 66 至 91 行】的代码是基本一致。
- 第 38 至 44 行:提交任务到
- 第 66 行:若
closeExecutor
为空。- 第 70 行:调用
#doClose0(promise)
方法,执行真正的关闭。详细解析,胖友先跳到 「2.4 doClose0」 中。 - 第 75 行:调用
ChannelOutboundBuffer#failFlushed(Throwable cause, boolean notify)
方法,写入数据( 消息 )到对端失败,通知相应数据对应的 Promise 失败。详细解析,见 《精尽 Netty 源码解析 —— Channel(五)之 flush 操作》 。 - 第 77 行:调用
ChannelOutboundBuffer#close(Throwable cause)
方法,关闭内存队列。详细解析,见 《精尽 Netty 源码解析 —— Channel(五)之 flush 操作》 。 - 第 81 行:若
inFlush0
为true
,正在 flush 中,在 EventLoop 中的线程中,调用#fireChannelInactiveAndDeregister(boolean wasActive)
方法,执行取消注册,并触发 Channel Inactive 事件到 pipeline 中。详细解析,见 「2.5 AbstractUnsafe#fireChannelInactiveAndDeregister」 中。- 第 90 行:若
inFlush0
为false
,不在 flush 中,直接调用#fireChannelInactiveAndDeregister(boolean wasActive)
方法,执行取消注册,并触发 Channel Inactive 事件到 pipeline 中。
- 第 90 行:若
- 第 70 行:调用
2.2 NioSocketChannelUnsafe#prepareToClose
NioSocketChannelUnsafe#prepareToClose()
方法,执行准备关闭。代码如下:
1: |
第 4 行:如果配置
StandardSocketOptions.SO_LINGER
大于 0 。让我们先来看下它的定义:Socket 参数,关闭 Socket 的延迟时间,Netty 默认值为 -1 ,表示禁用该功能。
* -1 表示 socket.close() 方法立即返回,但 OS 底层会将发送缓冲区全部发送到对端。
* 0 表示 socket.close() 方法立即返回,OS 放弃发送缓冲区的数据直接向对端发送RST包,对端收到复位错误。
* 非 0 整数值表示调用 socket.close() 方法的线程被阻塞直到延迟时间到或发送缓冲区中的数据发送完毕,若超时,则对端会收到复位错误。- 按照这个定义,如果大于 0,如果在真正关闭 Channel ,需要阻塞直到延迟时间到或发送缓冲区中的数据发送完毕。
- 如果在 EventLoop 中执行真正关闭 Channel 的操作,那么势必会阻塞 EventLoop 的线程。所以,在【第 11 行】的代码,返回
GlobalEventExecutor.INSTANCE
对象,作为执行真正关闭 Channel 的操作的执行器( 它也有一个自己的线程哟 )。
- 第 9 行:调用
#doDeregister()
方法,执行取消注册。详细解析,胖友先跳到 「2.2 AbstractUnsafe#doDeregister」 中。 - 【来自我表弟普架的牛逼解答,我表示点赞支持】第 9 行的:为什么要调用
#doDeregister()
方法呢?因为SO_LINGER
大于 0 时,真正关闭 Channel ,需要阻塞直到延迟时间到或发送缓冲区中的数据发送完毕。如果不取消该 Channel 的SelectionKey.OP_READ
事件的感兴趣,就会不断触发读事件,导致 CPU 空轮询。为什么呢?在 Channel 关闭时,会自动触发SelectionKey.OP_READ
事件。而且,会不断不断不断的触发,如果不进行取消SelectionKey.OP_READ
事件的感兴趣。- 😈 感叹一句,细思极恐啊,厉害了,Netty 。
- 第 11 行:如果开启
SO_LINGER
功能,返回GlobalEventExecutor.INSTANCE
对象。 - 第 18 行:若果关闭
SO_LINGER
功能,返回null
对象。 - 😈 胖友,调回 「2.1 AbstractUnsafe#close」 继续把。
2.3 AbstractUnsafe#doDeregister
AbstractUnsafe#doDeregister()
方法,执行取消注册。代码如下:
|
- 调用
EventLoop#cancel(SelectionKey key)
方法,取消 SelectionKey ,即相当于调用SelectionKey#cancel()
方法。如此,对通道的读写等等 IO 就绪事件不再感兴趣,也不会做出相应的处理。
2.4 AbstractUnsafe#doClose0
AbstractUnsafe#doClose0(ChannelPromise promise)
方法,执行真正的关闭。代码如下:
1: private void doClose0(ChannelPromise promise) { |
- 第 4 行:调用
#doClose()
方法,执行关闭。这是一个抽象方法,NioSocketChannel 对它的实现,胖友先跳到 「2.4.1 NioSocketChannel#doClose」 中。 第 6 行:调用
CloseFuture#setClosed()
方法,通知closeFuture
关闭完成。此处就会结束我们在 EchoClient 的阻塞监听客户端关闭。例如:// Wait until the connection is closed.
// 监听客户端关闭,并阻塞等待
f.channel().closeFuture().sync();- 哟哟哟,就要结束阻塞等待了。
第 8 行:调用
#safeSetSuccess(promise)
方法,通知 通知 Promise 关闭成功。此处就会回调我们对Channel#close()
方法的返回的 ChannelFuture 的监听。示例如下:ctx.channel().close().addListener(new ChannelFutureListener() { // 我是一个萌萌哒监听器
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println(Thread.currentThread() + "我会被唤醒");
}
});- 哟哟哟,要被回调了。
- 若发生异常:
- 第 11 行:调用
CloseFuture#setClosed()
方法,通知closeFuture
关闭完成。 - 第 13 行: 调用
#safeSetFailure(promise, t)
方法,通知 通知 Promise 关闭异常。
- 第 11 行:调用
2.4.1 NioSocketChannel#doClose
NioSocketChannel#doClose()
方法,执行 Java 原生 NIO SocketChannel 关闭。代码如下:
1: |
第 4 行:调用
AbstractNioChannel#doClose()
方法,执行父类关闭方法。代码如下:
protected void doClose() throws Exception {
// 通知 connectPromise 异常失败
ChannelPromise promise = connectPromise;
if (promise != null) {
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
connectPromise = null;
}
// 取消 connectTimeoutFuture 等待
ScheduledFuture<?> future = connectTimeoutFuture;
if (future != null) {
future.cancel(false);
connectTimeoutFuture = null;
}
}- 适用于客户端正在发起对服务端的连接的阶段。
- 【重要】第 6 行:调用
SocketChannel#close()
方法,执行 Java 原生 NIO SocketChannel 关闭。
2.5 AbstractUnsafe#fireChannelInactiveAndDeregister
AbstractUnsafe#fireChannelInactiveAndDeregister(boolean wasActive)
方法,执行取消注册,并触发 Channel Inactive 事件到 pipeline 中。代码如下:
private void fireChannelInactiveAndDeregister(final boolean wasActive) { |
<1>
处,传入#deregister(...)
方法的第一个参数为unsafeVoidPromise
,类型为 VoidChannelPromise 类,表示需要通知 Promise 。为什么这么说呢?在#safeSetSuccess(promise)
方法中,可以看到:protected final void safeSetSuccess(ChannelPromise promise) {
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
}
}!(promise instanceof VoidChannelPromise)
代码块,表示排除 VoidChannelPromise 类型的promise
。
<2>
处,通过对比新老的active
的值,判断是否 Channel 的状态是否从 Active 变成 Inactive 。- 第 2 至 5 行:调用
ChannelPromise#setUncancellable()
方法,设置 Promise 不可取消。 - 第 7 至 11 行:不处于已经注册状态,直接通知 Promise 取消注册成功,并
return
返回。- 😈 在当前情况下,
registered = true
,所以不符合条件。
- 😈 在当前情况下,
- 第 22 行:调用
#invokeLater(Runnable)
方法,提交任务到 EventLoop 的线程中执行,以避免一个 Channel 的 ChannelHandler 在不同的 EventLoop 或者线程中执行。详细的说明,可以看下【第 13 至 21 行】的英文说明。- 😈 实际从目前该方法的调用看下来,有可能不是从 EventLoop 的线程中调用。
- 第 27 行:调用
AbstractUnsafe#doDeregister()
方法,执行取消注册。在 「2.3 AbstractUnsafe#doDeregister」 中,已经详细解析。 - 第 31 至 34 行:如果
fireChannelInactive = true
,调用ChannelPipeline#fireChannelInactive()
方法,触发 Channel Inactive 事件到 pipeline 中。而 Channel Inactive 事件属于 Inbound 事件,所以会从head
节点开始,最终传播到tail
节点,目前并未执行什么逻辑,感兴趣的胖友,可以自己去看看。如果胖友业务上有需要,可以自己添加 ChannelHandler 进行处理。 - 第 40 至 42 行:标记为未注册。
- 第 44 行:调用
ChannelPipeline#fireChannelUnregistered()
方法,触发 Channel Unregistered 事件到 pipeline 中。而 Channel Unregistered 事件属于 Inbound 事件,所以会从head
节点开始,最终传播到tail
节点,目前并未执行什么逻辑,感兴趣的胖友,可以自己去看看。如果胖友业务上有需要,可以自己添加 ChannelHandler 进行处理。- 😈 又啰嗦了一遍,【第 31 至 34 行】的代码的逻辑。
- 第 48 行:调用
#safeSetSuccess(promise)
方法,通知 Promise 取消注册成功。
3. NioServerSocketChannel
通过 NioServerSocketChannel#close()
方法,应用程序里可以主动关闭 NioServerSocketChannel 通道。在具体的代码实现上,唯一的差别就是对 AbstractNioChannel#doClose()
方法的实现不同( 对应 「2.4.1 NioSocketChannel#doClose」 )。代码如下:
NioSocketChannel#doClose()
方法,执行 Java 原生 NIO SocketServerChannel 关闭。代码如下:
|
- 调用
SocketServerChannel#close()
方法,执行 Java 原生 NIO SocketServerChannel 关闭。
那么可能会有胖友有疑惑了,#close()
方法的实现,99.99% 都相似,那么 NioSocketChannel 和 NioServerSocketChannel 差异的关闭逻辑怎么实现呢?答案其实很简单,通过给它们配置不同的 ChannelHandler 实现类即可。
4. Unsafe#closeForcibly
实际上,在 Unsafe 接口上定义了 #closeForcibly()
方法,英文注释如下:
/** |
- 立即关闭 Channel ,并且不触发 pipeline 上的任何事件。
- 仅仅用于 Channel 注册到 EventLoop 上失败的情况下。😈 这也就是为什么
without firing any events
的原因啦。
AbstractUnsafe 对该接口方法,实现代码如下:
|
- 在方法内部,调用
AbstractNioChannel#doClose()
方法,执行 Java 原生 NIO SocketServerChannel 或 SocketChannel 关闭。 - 并且,从代码实现上,我们可以看到,确实并未触发任何 pipeline 上的事件。
5. 服务端处理客户端主动关闭连接
在客户端主动关闭时,服务端会收到一个 SelectionKey.OP_READ
事件的就绪,在调用客户端对应在服务端的 SocketChannel 的 #read()
方法会返回 -1 ,从而实现在服务端关闭客户端的逻辑。在 Netty 的实现,在 NioByteUnsafe#read()
方法中,简化代码如下:
// <1> |
<1>
处,读取客户端的 SocketChannel 返回 -1 ,说明客户端已经关闭。<2>
处,调用#closeOnRead(ChannelPipeline pipeline)
方法,关闭客户端的连接。代码如下:1: private void closeOnRead(ChannelPipeline pipeline) {
2: if (!isInputShutdown0()) {
3: // 开启连接半关闭
4: if (isAllowHalfClosure(config())) {
5: // 关闭 Channel 数据的读取
6: shutdownInput();
7: // 触发 ChannelInputShutdownEvent.INSTANCE 事件到 pipeline 中
8: pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
9: } else {
10: close(voidPromise());
11: }
12: } else {
13: // 标记 inputClosedSeenErrorOnRead 为 true
14: inputClosedSeenErrorOnRead = true;
15: // 触发 ChannelInputShutdownEvent.INSTANCE 事件到 pipeline 中
16: pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
17: }
18: }第 2 行:调用
NioSocketChannel#isInputShutdown0()
方法,判断是否关闭 Channel 数据的读取。代码如下:// NioSocketChannel.java
protected boolean isInputShutdown0() {
return isInputShutdown();
}
public boolean isInputShutdown() {
return javaChannel().socket().isInputShutdown() || !isActive();
}
// java.net.Socket.java
private boolean shutIn = false;
/**
* Returns whether the read-half of the socket connection is closed.
*
* @return true if the input of the socket has been shutdown
* @since 1.4
* @see #shutdownInput
*/
public boolean isInputShutdown() {
return shutIn;
}- 😈 注意看下英文注释。
<1>
第 4 行:调用AbstractNioByteChannel#isAllowHalfClosure()
方法,判断是否开启连接半关闭的功能。代码如下:// AbstractNioByteChannel.java
private static boolean isAllowHalfClosure(ChannelConfig config) {
return config instanceof SocketChannelConfig &&
((SocketChannelConfig) config).isAllowHalfClosure();
}- 可通过
ALLOW_HALF_CLOSURE
配置项开启。- Netty 参数,一个连接的远端关闭时本地端是否关闭,默认值为
false
。 - 值为
false
时,连接自动关闭。 - 值为
true
时,触发 ChannelInboundHandler 的#userEventTriggered()
方法,事件 ChannelInputShutdownEvent 。
- Netty 参数,一个连接的远端关闭时本地端是否关闭,默认值为
<1.1>
第 6 行:调用NioSocketChannel#shutdownInput()
方法,关闭 Channel 数据的读取。代码如下:
public ChannelFuture shutdownInput() {
return shutdownInput(newPromise());
}
public ChannelFuture shutdownInput(final ChannelPromise promise) {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
shutdownInput0(promise);
} else {
loop.execute(new Runnable() {
public void run() {
shutdownInput0(promise);
}
});
}
return promise;
}
private void shutdownInput0(final ChannelPromise promise) {
try {
// 关闭 Channel 数据的读取
shutdownInput0();
// 通知 Promise 成功
promise.setSuccess();
} catch (Throwable t) {
// 通知 Promise 失败
promise.setFailure(t);
}
}
private void shutdownInput0() throws Exception {
// 调用 Java NIO Channel 的 shutdownInput 方法
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().shutdownInput();
} else {
javaChannel().socket().shutdownInput();
}
}- 核心是,调用 Java NIO Channel 的 shutdownInput 方法。
<1.1>
第 8 行:调用ChannelPipeline#fireUserEventTriggered(Object event)
方法,触发ChannelInputShutdownEvent.INSTANCE
事件到 pipeline 中。关于这个事件,胖友可以看看 《netty 处理远程主机强制关闭一个连接》 。<1.2>
第 9 至 11 行:调用#close(Promise)
方法,关闭客户端的 Channel 。后续的,就是 「2. NioSocketChannel」 中。
- 可通过
第 12 至 17 行:
第 14 行:标记
inputClosedSeenErrorOnRead
为true
。原因如下:/**
* 通道关闭读取,又错误读取的错误的标识
*
* 详细见 https://github.com/netty/netty/commit/ed0668384b393c3502c2136e3cc412a5c8c9056e 提交
*/
private boolean inputClosedSeenErrorOnRead;如下是提交的说明:
AbstractNioByteChannel will detect that the remote end of the socket has
been closed and propagate a user event through the pipeline. However if
the user has auto read on, or calls read again, we may propagate the
same user events again. If the underlying transport continuously
notifies us that there is read activity this will happen in a spin loop
which consumes unnecessary CPU.- 胖友认真看下英文注释。结合 《NIO read spin event loop spin when half closed #7801》 提供的示例。
在标记
inputClosedSeenErrorOnRead = true
后,在NioByteUnsafe#read()
方法中,会主动对SelectionKey.OP_READ
的感兴趣,避免空轮询。代码如下:// AbstractNioByteUnsafe.java
public final void read() {
final ChannelConfig config = config();
// 若 inputClosedSeenErrorOnRead = true ,移除对 SelectionKey.OP_READ 事件的感兴趣。
if (shouldBreakReadReady(config)) {
clearReadPending(); // 移除对 SelectionKey.OP_READ 事件的感兴趣
return;
}
// ... 省略其他代码。
}
// AbstractNioByteChannel.java
final boolean shouldBreakReadReady(ChannelConfig config) {
return isInputShutdown0() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
}- x
- 第 16 行:调用
ChannelPipeline#fireUserEventTriggered(Object event)
方法,触发ChannelInputShutdownEvent.INSTANCE
事件到 pipeline 中。
666. 彩蛋
比想象中简单的文章。但是,卡了比较久的时间。主要是针对 《High CPU usage with SO_LINGER and sudden connection close (4.0.26.Final+) #4449》 的讨论,中间请教了基友闪电侠和表弟普架。
痛并快乐的过程。如果英文好一点,相信解决的过程,可能更加愉快一些把。