1. 概述
本文分享 Netty NioSocketChannel 写入对端数据的过程。和写入相关的,在 Netty Channel 有三种 API 方法:
ChannelFuture write(Object msg) |
原生的 Java NIO SocketChannel 只有一种 write 方法,将数据写到对端。而 Netty Channel 竟然有三种方法,我们来一个个看看:
- write 方法:将数据写到内存队列中。
- 也就是说,此时数据并没有写入到对端。
- flush 方法:刷新内存队列,将其中的数据写入到对端。
- 也就是说,此时数据才真正写到对端。
- writeAndFlush 方法:write + flush 的组合,将数据写到内存队列后,立即刷新内存队列,又将其中的数据写入到对端。
- 也就是说,此时数据已经写到对端。
严格来说,上述的描述不是完全准确。因为 Netty Channel 的 #write(Object msg, ...)
和 #writeAndFlush(Object msg, ...)
方法,是异步写入的过程,需要通过监听返回的 ChannelFuture 来确实是真正写入。例如:
// 方式一:异步监听 |
- 所以,胖友实际在使用时,一定要注意。😈 如果感兴趣,可以看看 Dubbo 和 Motan 等等 RPC 框架是怎么使用这个 API 方法的。
- 😈 有一点一定非常肯定要注意,
#write(Object msg, ...)
方法返回的 Promise 对象,只有在数据真正被#flush()
方法调用执行完成后,才会被回调通知。如果胖友不理解,请自己测试一下。
考虑到 Netty NioSocketChannel 写入对端数据的代码太多,所以笔者拆成 write 和 flush 相关的两篇文章。所以,本文当然是 write 相关的文章。当然,这两个操作相关性很高,所以本文也会包括 flush 部分的内容。
2. AbstractChannel
AbstractChannel 对 #write(Object msg, ...)
方法的实现,代码如下:
|
- 在方法内部,会调用对应的
ChannelPipeline#write(Object msg, ...)
方法,将 write 事件在 pipeline 上传播。详细解析,见 「3. DefaultChannelPipeline」 。- 最终会传播 write 事件到
head
节点,将数据写入到内存队列中。详细解析,见 「5. HeadContext」 。
- 最终会传播 write 事件到
3. DefaultChannelPipeline
DefaultChannelPipeline#write(Object msg, ...)
方法,代码如下:
|
- 在方法内部,会调用
TailContext#write(Object msg, ...)
方法,将 write 事件在 pipeline 中,从尾节点向头节点传播。详细解析,见 「4. TailContext」 。
4. TailContext
TailContext 对 TailContext#write(Object msg, ...)
方法的实现,是从 AbstractChannelHandlerContext 抽象类继承,代码如下:
1: |
在【第 2 行】的代码,我们可以看到,
#write(Object msg)
方法,会调用#write(Object msg, ChannelPromise promise)
方法。缺少的
promise
方法参数,通过调用#newPromise()
方法,进行创建 Promise 对象,代码如下:
public ChannelPromise newPromise() {
return new DefaultChannelPromise(channel(), executor());
}- 返回 DefaultChannelPromise 对象。
- 在【第 29 行】的代码,返回的结果就是传入的
promise
对象。
- 第 8 至 11 行:若消息( 消息 )为空,抛出异常。
- 第 15 行:调用
#isNotValidPromise(promise, true)
方法,判断是否为不合法的 Promise 对象。该方法,在 《精尽 Netty 源码解析 —— ChannelPipeline(四)之 Outbound 事件的传播》 中已经详细解析。- 第 17 行:调用
ReferenceCountUtil#release(msg)
方法,释放释放消息( 数据 )相关的资源。 - 第 19 行:返回
promise
对象。一般情况下,出现这种情况是promise
已经被取消,所以不再有必要写入数据。或者说,写入数据的操作被取消。 - 第 21 至 25 行:若发生异常, 调用
ReferenceCountUtil#release(msg)
方法,释放释放消息( 数据 )相关的资源。最终,会抛出该异常。
- 第 17 行:调用
第 28 行:调用
#write(Object msg, boolean flush, ChannelPromise promise)
方法,写入消息( 数据 )到内存队列。代码如下:1: private void write(Object msg, boolean flush, ChannelPromise promise) {
2: // 获得下一个 Outbound 节点
3: AbstractChannelHandlerContext next = findContextOutbound();
4: // 记录 Record 记录
5: final Object m = pipeline.touch(msg, next);
6: EventExecutor executor = next.executor();
7: // 在 EventLoop 的线程中
8: if (executor.inEventLoop()) {
9: // 执行 writeAndFlush 事件到下一个节点
10: if (flush) {
11: next.invokeWriteAndFlush(m, promise);
12: // 执行 write 事件到下一个节点
13: } else {
14: next.invokeWrite(m, promise);
15: }
16: } else {
17: AbstractWriteTask task;
18: // 创建 writeAndFlush 任务
19: if (flush) {
20: task = WriteAndFlushTask.newInstance(next, m, promise);
21: // 创建 write 任务
22: } else {
23: task = WriteTask.newInstance(next, m, promise);
24: }
25: // 提交到 EventLoop 的线程中,执行该任务
26: safeExecute(executor, task, promise, m);
27: }
28: }- 方法参数
flush
为true
时,该方法执行的是 write + flush 的组合操作,即将数据写到内存队列后,立即刷新内存队列,又将其中的数据写入到对端。 - 第 3 行:调用
#findContextOutbound()
方法,获得下一个 Outbound 节点。 第 5 行:调用
DefaultChannelPipeline#touch(Object msg, AbstractChannelHandlerContext next)
方法,记录 Record 记录。代码如下:// DefaultChannelPipeline.java
final Object touch(Object msg, AbstractChannelHandlerContext next) {
return touch ? ReferenceCountUtil.touch(msg, next) : msg;
}
// ReferenceCountUtil.java
/**
* Tries to call {@link ReferenceCounted#touch(Object)} if the specified message implements
* {@link ReferenceCounted}. If the specified message doesn't implement {@link ReferenceCounted},
* this method does nothing.
*/
"unchecked") (
public static <T> T touch(T msg, Object hint) {
if (msg instanceof ReferenceCounted) {
return (T) ((ReferenceCounted) msg).touch(hint);
}
return msg;
}第 7 行:在 EventLoop 的线程中。
- 第 10 至 11 行:如果
flush = true
时,调用AbstractChannelHandlerContext#invokeWriteAndFlush()
方法,执行 writeAndFlush 事件到下一个节点。 - 第 12 至 15 行:如果
flush = false
时,调用AbstractChannelHandlerContext#invokeWrite()
方法,执行 write 事件到下一个节点。 - 后续的逻辑,和 《精尽 Netty 源码解析 —— ChannelPipeline(四)之 Outbound 事件的传播》 分享的 bind 事件在 pipeline 中的传播是基本一致的。
- 随着 write 或 writeAndFlush 事件不断的向下一个节点传播,最终会到达 HeadContext 节点。详细解析,见 「5. HeadContext」 。
- 第 10 至 11 行:如果
- 第 16 行:不在 EventLoop 的线程中。
- 第 19 至 20 行:如果
flush = true
时,创建 WriteAndFlushTask 任务。 - 第 21 至 24 行:如果
flush = false
时,创建 WriteTask 任务。 - 第 26 行:调用
#safeExecute(executor, task, promise, m)
方法,提交到 EventLoop 的线程中,执行该任务。从而实现,在 EventLoop 的线程中,执行 writeAndFlush 或 write 事件到下一个节点。详细解析,见 「7. AbstractWriteTask」 中。
- 第 19 至 20 行:如果
- 方法参数
第 29 行:返回
promise
对象。
5. HeadContext
在 pipeline 中,write 事件最终会到达 HeadContext 节点。而 HeadContext 的 #write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
方法,会处理该事件,代码如下:
|
- 在方法内部,会调用
AbstractUnsafe#write(Object msg, ChannelPromise promise)
方法,将数据写到内存队列中。详细解析,见 「6. AbstractUnsafe」 。
6. AbstractUnsafe
AbstractUnsafe#write(Object msg, ChannelPromise promise)
方法,将数据写到内存队列中。代码如下:
/** |
outboundBuffer
属性,内存队列,用于缓存写入的数据( 消息 )。- 第 7 行:内存队列为空,一般是 Channel 已经关闭。
- 调用
#safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION)
方法,通知 Promise 异常结果。 - 第 16 行:调用
ReferenceCountUtil#release(msg)
方法,释放释放消息( 数据 )相关的资源。 - 第 17 行:
return
,结束执行。
- 调用
第 23 行:调用
AbstractNioByteChannel#filterOutboundMessage(msg)
方法,过滤写入的消息( 数据 )。代码如下:// AbstractNioByteChannel.java
protected final Object filterOutboundMessage(Object msg) {
// <1> ByteBuf 的情况
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
// 已经是内存 ByteBuf
if (buf.isDirect()) {
return msg;
}
// 非内存 ByteBuf ,需要进行创建封装
return newDirectBuffer(buf);
}
// <2> FileRegion 的情况
if (msg instanceof FileRegion) {
return msg;
}
// <3> 不支持其他类型
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}<1>
处,消息( 数据 )是 ByteBuf 类型,如果是非 Direct ByteBuf 对象,需要调用#newDirectBuffer(ByteBuf)
方法,复制封装成 Direct ByteBuf 对象。原因是:在使用 Socket 传递数据时性能很好,由于数据直接在内存中,不存在从 JVM 拷贝数据到直接缓冲区的过程,性能好。( 来自 《[netty核心类]–缓冲区ByteBuf》 )<2>
处,消息( 数据 )是 FileRegion 类型,直接返回。<3>
处,不支持其他数据类型。
- 第 24 至 28 行:计算消息的长度。
- 第 29 行:若发生异常时:
- 第 31 行:调用
#safeSetFailure(promise, Throwable t)
方法,通知 Promise 异常结果。 - 第 33 行:调用
ReferenceCountUtil#release(msg)
方法,释放释放消息( 数据 )相关的资源。 - 第 34 行:
return
,结束执行。
- 第 31 行:调用
- 第 38 行:调用
ChannelOutboundBuffer#addMessage(msg, size, promise)
方法,写入消息( 数据 )到内存队列。关于 ChannelOutboundBuffer ,我们在 《精尽 Netty 源码解析 —— Channel(五)之 flush 操作》 中,详细解析。
😈 至此,write 操作,将数据写到内存队列中的过程。
🙂 当然,想要写入数据到对端的过程,还是需要看完 《精尽 Netty 源码解析 —— Channel(五)之 flush 操作》 一文。
7. AbstractWriteTask
AbstractWriteTask ,实现 Runnable 接口,写入任务抽象类。它有两个子类实现:
- WriteTask ,write 任务实现类。
- WriteAndFlushTask ,write + flush 任务实现类。
它们都是 AbstractChannelHandlerContext 的内部静态类。那么让我们先来 AbstractWriteTask 的代码。
7.1 构造方法
/** |
- 每个字段,看代码注释。
ESTIMATE_TASK_SIZE_ON_SUBMIT
静态字段,提交任务时,是否计算 AbstractWriteTask 对象的自身占用内存大小。WRITE_TASK_OVERHEAD
静态字段,每个 AbstractWriteTask 对象自身占用内存的大小。为什么占用的 48 字节呢?- 16 bytes object header
,对象头,16 字节。- 3 reference fields
,3 个对象引用字段,3 * 8 = 24 字节。- 1 int fields
,1 个int
字段,4 字节。padding
,补齐 8 字节的整数倍,因此 4 字节。- 因此,合计 48 字节( 64 位的 JVM 虚拟机,并且不考虑压缩 )。
- 如果不理解的胖友,可以看看 《JVM中 对象的内存布局 以及 实例分析》 。
handle
字段,Recycler 处理器。而 Recycler 是 Netty 用来实现对象池的工具类。在网络通信中,写入是非常频繁的操作,因此通过 Recycler 重用 AbstractWriteTask 对象,减少对象的频繁创建,降低 GC 压力,提升性能。
7.2 init
#init(AbstractWriteTask task, AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise)
方法,初始化 AbstractWriteTask 对象。代码如下:
protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { |
- 在下文中,我们会看到 AbstractWriteTask 对象是从 Recycler 中获取,所以获取完成后,需要通过该方法,初始化该对象的属性。
<1>
处,计算 AbstractWriteTask 对象大小。并且在<2>
处,调用ChannelPipeline#incrementPendingOutboundBytes(long size)
方法,增加 ChannelOutboundBuffer 的totalPendingSize
属性。代码如下:// DefaultChannelPipeline.java
protected void incrementPendingOutboundBytes(long size) {
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
if (buffer != null) {
buffer.incrementPendingOutboundBytes(size);
}
}- 内部,会调用
ChannelOutboundBuffer#incrementPendingOutboundBytes(long size)
方法,增加 ChannelOutboundBuffer 的totalPendingSize
属性。详细解析,见 《精尽 Netty 源码解析 —— Channel(五)之 flush 操作》 的 「10.1 incrementPendingOutboundBytes」 小节。
- 内部,会调用
7.3 run
#run()
实现方法,
1: |
在
<1>
处, 调用ChannelPipeline#decrementPendingOutboundBytes(long size)
方法,减少 ChannelOutboundBuffer 的totalPendingSize
属性。代码如下:
protected void decrementPendingOutboundBytes(long size) {
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
if (buffer != null) {
buffer.decrementPendingOutboundBytes(size);
}
}- 内部,会调用
ChannelOutboundBuffer#decrementPendingOutboundBytes(long size)
方法,减少 ChannelOutboundBuffer 的totalPendingSize
属性。详细解析,见 《精尽 Netty 源码解析 —— Channel(五)之 flush 操作》 的 「10.2 decrementPendingOutboundBytes」 小节。
- 内部,会调用
第 10 行:调用
#write(ctx, msg, promise)
方法,执行 write 事件到下一个节点。代码如下:protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ctx.invokeWrite(msg, promise);
}第 11 至 19 行:置空 AbstractWriteTask 对象,并调用
Recycler.Handle#recycle(this)
方法,回收该对象。
7.4 WriteTask
WriteTask ,实现 SingleThreadEventLoop.NonWakeupRunnable 接口,继承 AbstractWriteTask 抽象类,write 任务实现类。
为什么会实现 SingleThreadEventLoop.NonWakeupRunnable 接口呢?write 操作,仅仅将数据写到内存队列中,无需唤醒 EventLoop ,从而提升性能。关于 SingleThreadEventLoop.NonWakeupRunnable 接口,在 《精尽 Netty 源码解析 —— EventLoop(三)之 EventLoop 初始化》 有详细解析。
7.4.1 newInstance
#newInstance(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise)
方法,创建 WriteTask 对象。代码如下:
private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() { |
7.4.2 构造方法
private WriteTask(Recycler.Handle<WriteTask> handle) { |
7.4.3 write
WriteTask 无需实现 #write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise)
方法,直接重用父类该方法即可。
7.5 WriteAndFlushTask
WriteAndFlushTask ,继承 WriteAndFlushTask 抽象类,write + flush 任务实现类。
7.5.1 newInstance
#newInstance(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise)
方法,创建 WriteAndFlushTask 对象。代码如下:
private static final Recycler<WriteAndFlushTask> RECYCLER = new Recycler<WriteAndFlushTask>() { |
7.5.2 构造方法
private WriteAndFlushTask(Recycler.Handle<WriteAndFlushTask> handle) { |
7.5.3 write
#write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise)
方法,在父类的该方法的基础上,增加执行 flush 事件到下一个节点。代码如下:
|
666. 彩蛋
最后,我们来看一个真的彩蛋,嘿嘿嘿。
在一些 ChannelHandler 里,我们想要写入数据到对端,可以有两种写法,代码如下:
|
这两者有什么异同呢?
<2>
种,实际就是本文所描述的,将 write 事件,从 pipeline 的tail
节点到head
节点的过程。<1>
种,和<2>
种不同,将 write 事件,从当前的ctx
节点的下一个节点到head
节点的过程。- 为什么呢?胖友自己调试理解下。😁😁😁
推荐阅读文章: