1. 概述
本文接 《精尽 Netty 源码解析 —— Channel(四)之 write 操作》 ,分享 Netty Channel 的 #flush()
方法,刷新内存队列,将其中的数据写入到对端。
在本文中,我们会发现,#flush()
方法和 #write(Object msg, ...)
正常情况下,经历的流程是差不多的,例如在 pipeline 中对事件的传播,从 tail
节点传播到 head
节点,最终交由 Unsafe 处理,而差异点就是 Unsafe 的处理方式不同:
- write 方法:将数据写到内存队列中。
- flush 方法:刷新内存队列,将其中的数据写入到对端。
当然,上述描述仅仅指的是正常情况下,在异常情况下会有所不同。我们知道,Channel 大多数情况下是可写的,所以不需要专门去注册 SelectionKey.OP_WRITE
事件。所以在 Netty 的实现中,默认 Channel 是可写的,当写入失败的时候,再去注册 SelectionKey.OP_WRITE
事件。这意味着什么呢?在 #flush()
方法中,如果写入数据到 Channel 失败,会通过注册 SelectionKey.OP_WRITE
事件,然后在轮询到 Channel 可写 时,再“回调” #forceFlush()
方法。
是不是非常巧妙?!让我直奔代码,大口吃肉,潇洒撸码。
下文的 「2.」、「3.」、「4.」、「5.」 和 《精尽 Netty 源码解析 —— Channel(四)之 write 操作》 非常类似,所以胖友可以快速浏览。真正的差异,从 「6.」 开始。
2. AbstractChannel
AbstractChannel 对 #flush()
方法的实现,代码如下:
|
- 在方法内部,会调用对应的
ChannelPipeline#flush()
方法,将 flush 事件在 pipeline 上传播。详细解析,见 「3. DefaultChannelPipeline」 。- 最终会传播 flush 事件到
head
节点,刷新内存队列,将其中的数据写入到对端。详细解析,见 「5. HeadContext」 。
- 最终会传播 flush 事件到
3. DefaultChannelPipeline
DefaultChannelPipeline#flush()
方法,代码如下:
|
- 在方法内部,会调用
TailContext#flush()
方法,将 flush 事件在 pipeline 中,从尾节点向头节点传播。详细解析,见 「4. TailContext」 。
4. TailContext
TailContext 对 TailContext#flush()
方法的实现,是从 AbstractChannelHandlerContext 抽象类继承,代码如下:
1: |
- 第 4 行:调用
#findContextOutbound()
方法,获得下一个 Outbound 节点。 - 第 7 行:在 EventLoop 的线程中。
- 第 12 至 15 行:调用
AbstractChannelHandlerContext#invokeFlush()()
方法,执行 flush 事件到下一个节点。 - 后续的逻辑,和 《精尽 Netty 源码解析 —— ChannelPipeline(四)之 Outbound 事件的传播》 分享的 bind 事件在 pipeline 中的传播是基本一致的。
- 随着 flush 事件不断的向下一个节点传播,最终会到达 HeadContext 节点。详细解析,见 「5. HeadContext」 。
- 第 12 至 15 行:调用
- 第 16 行:不在 EventLoop 的线程中。
- 第 12 至 21 行:创建 flush 任务。该任务的内部的调用【第 18 行】的代码,和【第 9 行】的代码是一致的。
- 第 23 行:调用
#safeExecute(executor, task, promise, m)
方法,提交到 EventLoop 的线程中,执行该任务。从而实现,在 EventLoop 的线程中,执行 flush 事件到下一个节点。
5. HeadContext
在 pipeline 中,flush 事件最终会到达 HeadContext 节点。而 HeadContext 的 #flush()
方法,会处理该事件,代码如下:
|
- 在方法内部,会调用
AbstractUnsafe#flush()
方法,刷新内存队列,将其中的数据写入到对端。详细解析,见 「6. AbstractUnsafe」 。
6. AbstractUnsafe
AbstractUnsafe#flush()
方法,刷新内存队列,将其中的数据写入到对端。代码如下:
1: |
- 第 5 至 9 行:内存队列为
null
,一般是 Channel 已经关闭,所以直接返回。 - 第 12 行:调用
ChannelOutboundBuffer#addFlush()
方法,标记内存队列开始 flush 。详细解析,见 「8.4 addFlush」 。 第 14 行:调用
#flush0()
方法,执行 flush 操作。代码如下:/**
* 是否正在 flush 中,即正在调用 {@link #flush0()} 中
*/
private boolean inFlush0;
1: ("deprecation")
2: protected void flush0() {
3: // 正在 flush 中,所以直接返回。
4: if (inFlush0) {
5: // Avoid re-entrance
6: return;
7: }
8:
9: // 内存队列为 null ,一般是 Channel 已经关闭,所以直接返回。
10: // 内存队列为空,无需 flush ,所以直接返回
11: final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
12: if (outboundBuffer == null || outboundBuffer.isEmpty()) {
13: return;
14: }
15:
16: // 标记正在 flush 中。
17: inFlush0 = true;
18:
19: // 若未激活,通知 flush 失败
20: // Mark all pending write requests as failure if the channel is inactive.
21: if (!isActive()) {
22: try {
23: if (isOpen()) {
24: outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
25: } else {
26: // Do not trigger channelWritabilityChanged because the channel is closed already.
27: outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
28: }
29: } finally {
30: // 标记不在 flush 中。
31: inFlush0 = false;
32: }
33: return;
34: }
35:
36: // 执行真正的写入到对端
37: try {
38: doWrite(outboundBuffer);
39: } catch (Throwable t) {
40: // TODO 芋艿 细节
41: if (t instanceof IOException && config().isAutoClose()) {
42: /**
43: * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
44: * failing all flushed messages and also ensure the actual close of the underlying transport
45: * will happen before the promises are notified.
46: *
47: * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
48: * may still return {@code true} even if the channel should be closed as result of the exception.
49: */
50: close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
51: } else {
52: try {
53: shutdownOutput(voidPromise(), t);
54: } catch (Throwable t2) {
55: close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
56: }
57: }
58: } finally {
59: // 标记不在 flush 中。
60: inFlush0 = false;
61: }
62: }inFlush0
字段,是否正在 flush 中,即正在调用#flush0()
中。- 第 3 至 7 行:正在 flush 中,所以直接返回。
- 第 9 至 14 行:
outboundBuffer == null
,内存队列为null
,一般是 Channel 已经关闭,所以直接返回。outboundBuffer.isEmpty()
,内存队列为空,无需 flush ,所以直接返回。
- 第 17 行:设置
inFlush0
为true
,表示正在 flush 中。 - 第 19 至 34 行:调用
#isActive()
方法,发现 Channel 未激活,在根据 Channel 是否打开,调用ChannelOutboundBuffer#failFlushed(Throwable cause, boolean notify)
方法,通知 flush 失败异常。详细解析,见 「8.6 failFlushed」 。- 第 29 至 33 行:最终,设置
inFlush0
为false
,表示结束 flush 操作,最后return
返回。
- 第 29 至 33 行:最终,设置
- 第 38 行:调用
AbstractChannel#doWrite(outboundBuffer)
方法,执行真正的写入到对端。详细解析,见 「7. NioSocketChannel」 。- 第 39 至 57 行:TODO 芋艿 细节
- 第 58 至 61 行:同【第 29 至 33】的代码和目的。
实际上,AbstractNioUnsafe 重写了
#flush0()
方法,代码如下:
protected final void flush0() {
// Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call forceFlush() later,
// and thus there's no need to call it now.
if (!isFlushPending()) {
super.flush0();
}
}在执行父类 AbstractUnsafe 的
#flush0()
方法时,先调用AbstractNioUnsafe#isFlushPending()
判断,是否已经处于 flush 准备中。代码如下:private boolean isFlushPending() {
SelectionKey selectionKey = selectionKey();
return selectionKey.isValid() // 合法
&& (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0; // 对 SelectionKey.OP_WRITE 事件不感兴趣。
}- 是不是有点懵 x ?在文初,我们提到:“所以在 Netty 的实现中,默认 Channel 是可写的,当写入失败的时候,再去注册
SelectionKey.OP_WRITE
事件。这意味着什么呢?在#flush()
方法中,如果写入数据到 Channel 失败,会通过注册SelectionKey.OP_WRITE
事件,然后在轮询到 Channel 可写 时,再“回调”#forceFlush()
方法”。 - 这就是这段代码的目的,如果处于对
SelectionKey.OP_WRITE
事件感兴趣,说明 Channel 此时是不可写的,那么调用父类 AbstractUnsafe 的#flush0()
方法,也没有意义,所以就不调用。 - 😈 逻辑上,略微有点复杂,胖友好好理解下。
- 是不是有点懵 x ?在文初,我们提到:“所以在 Netty 的实现中,默认 Channel 是可写的,当写入失败的时候,再去注册
7. NioSocketChannel
AbstractChannel#doWrite(ChannelOutboundBuffer in)
抽象方法,执行真正的写入到对端。定义在 AbstractChannel 抽象类中,代码如下:
/** |
NioSocketChannel 对该抽象方法,实现代码如下:
1: |
- 第 3 行:调用
#javaChannel()
方法,获得 Java NIO 原生 SocketChannel 。 第 5 行:调用
ChannelConfig#getWriteSpinCount()
方法,获得自旋写入次数 N 。在【第 6 至 76 行】的代码,我们可以看到,不断自旋写入 N 次,直到完成写入结束。关于该配置项,官方注释如下:/**
* Returns the maximum loop count for a write operation until {@link WritableByteChannel#write(ByteBuffer)} returns a non-zero value.
* It is similar to what a spin lock is used for in concurrency programming.
* It improves memory utilization and write throughput depending on the platform that JVM runs on. The default value is {@code 16}.
*/
int getWriteSpinCount();- 默认值为
DefaultChannelConfig.writeSpinCount = 16
,可配置修改,一般不需要。
- 默认值为
- 第 6 至 76 行:不断自旋写入 N 次,直到完成写入结束。
第 8 行:调用
ChannelOutboundBuffer#isEmpty()
方法,内存队列为空,结束循环,直接返回。第 10 行:因为在 Channel 不可写的时候,会注册
SelectionKey.OP_WRITE
,等待 NIO Channel 可写。而后会”回调”#forceFlush()
方法,该方法内部也会调用#doWrite(ChannelOutboundBuffer in)
方法。所以在完成内部队列的数据向对端写入时候,需要调用#clearOpWrite()
方法,代码如下:protected final void clearOpWrite() {
final SelectionKey key = selectionKey();
// Check first if the key is still valid as it may be canceled as part of the deregistration
// from the EventLoop
// See https://github.com/netty/netty/issues/2104
if (!key.isValid()) { // 合法
return;
}
final int interestOps = key.interestOps();
// 若注册了 SelectionKey.OP_WRITE ,则进行取消
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
}- 😈 胖友看下代码注释。
- 第 18 行:调用
NioSocketChannelConfig#getMaxBytesPerGatheringWrite()
方法,获得每次写入的最大字节数。// TODO 芋艿 调整每次写入的最大字节数 - 第 20 行:调用
ChannelOutboundBuffer#nioBuffers(int maxCount, long maxBytes)
方法,从内存队列中,获得要写入的 ByteBuffer 数组。注意,如果内存队列中数据量很大,可能获得的仅仅是一部分数据。详细解析,见 「8.5 nioBuffers」 。- 第 22 行:获得写入的 ByteBuffer 数组的个数。为什么不直接调用数组的
#length()
方法呢?因为返回的 ByteBuffer 数组是预先生成的数组缓存,存在不断重用的情况,所以不能直接使用#length()
方法,而是要调用ChannelOutboundBuffer#nioBufferCount()
方法,获得写入的 ByteBuffer 数组的个数。详细解析,见 「8.5 nioBuffers」 。 - 后续根据
nioBufferCnt
的数值,分成三种情况。
- 第 22 行:获得写入的 ByteBuffer 数组的个数。为什么不直接调用数组的
- (づ ̄3 ̄)づ╭❤~ 第一种,
nioBufferCnt = 0
。 - 芋艿 TODO 1014 扣 doWrite0 的细节,应该是内部的数据为 FileRegion ,可以暂时无视,不影响本文理解。
- (づ ̄3 ̄)づ╭❤~ 第二种,
nioBufferCnt = 1
。 - 第 40 行:调用 Java 原生
SocketChannel#write(ByteBuffer buffer)
方法,执行 NIO write 调用,写入单个 ByteBuffer 对象到对端。 第 42 行:写入字节小于等于 0 ,说明 NIO Channel 不可写,所以注册
SelectionKey.OP_WRITE
,等待 NIO Channel 可写,并返回以结束循环。第 43 行:调用
AbstractNioByteChannel#incompleteWrite(true)
方法,代码如下:protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
// true ,注册对 SelectionKey.OP_WRITE 事件感兴趣
if (setOpWrite) {
setOpWrite();
// false ,取消对 SelectionKey.OP_WRITE 事件感兴趣
} else {
// It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
// use our write quantum. In this case we no longer want to set the write OP because the socket is still
// writable (as far as we know). We will find out next time we attempt to write if the socket is writable
// and set the write OP if necessary.
clearOpWrite();
// Schedule flush again later so other tasks can be picked up in the meantime
// 立即发起下一次 flush 任务
eventLoop().execute(flushTask); // <1>
}
}setOpWrite
为true
,调用#setOpWrite()
方法,注册对SelectionKey.OP_WRITE
事件感兴趣。代码如下:protected final void setOpWrite() {
final SelectionKey key = selectionKey();
// Check first if the key is still valid as it may be canceled as part of the deregistration
// from the EventLoop
// See https://github.com/netty/netty/issues/2104
if (!key.isValid()) { // 合法
return;
}
final int interestOps = key.interestOps();
// 注册 SelectionKey.OP_WRITE 事件的感兴趣
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
}- 【第 43 行】的代码,就是这种情况。
setOpWrite
为false
,调用#clearOpWrite()
方法,取消对 SelectionKey.OP_WRITE 事件感兴趣。而后,在<1>
处,立即发起下一次 flush 任务。
- 第 47 行:TODO 芋艿 调整每次写入的最大字节数
- 第 49 行:调用
ChannelOutboundBuffer#removeBytes(long writtenBytes)
方法啊,从内存队列中,移除已经写入的数据( 消息 )。详细解析,见 「8.7 removeBytes」 。 - 第 51 行:写入次数减一。
- (づ ̄3 ̄)づ╭❤~ 第三种,
nioBufferCnt > 1
。和【第二种】基本相同,差别是在于【第 60 行】的代码,调用 Java 原生SocketChannel#write(ByteBuffer[] srcs, int offset, int length)
方法,执行 NIO write 调用,写入多个 ByteBuffer 对象到对端。😈 批量一次性写入,提升性能。 - =========== 结束 ===========
- 第 79 行:通过
writeSpinCount < 0
来判断,内存队列中的数据是否未完全写入。从目前逻辑看下来,笔者认为只会返回true
,即内存队列中的数据未完全写入,说明 NIO Channel 不可写,所以注册SelectionKey.OP_WRITE
,等待 NIO Channel 可写。因此,调用#incompleteWrite(true)
方法。- 举个例子,最后一次写入,Channel 的缓冲区还剩下 10 字节可写,内存队列中剩余 90 字节,那么可以成功写入 10 字节,剩余 80 字节。😈 也就说,此时 Channel 不可写落。
7.1 乱入
老艿艿:临时插入下 AbstractNioByteChannel 和 AbstractNioMessageChannel 实现类对
#doWrite(ChannelOutboundBuffer in)
方法的实现。不感兴趣的胖友,可以直接跳过。
AbstractNioByteChannel
虽然,AbstractNioByteChannel 实现了 #doWrite(ChannelOutboundBuffer in)
方法,但是子类 NioSocketChannel 又覆盖实现了该方法,所以可以忽略 AbstractNioByteChannel 的实现方法了。
那么为什么 AbstractNioByteChannel 会实现了 #doWrite(ChannelOutboundBuffer in)
方法呢?因为 NioUdtByteConnectorChannel 和 NioUdtByteRendezvousChannel 会使用到该方法。但是,这两个类已经被标记废弃,因为:
transport udt is deprecated and so the user knows it will be removed in the future. |
- 来自 Netty 官方提交的注释说明。
AbstractNioMessageChannel
虽然,AbstractNioMessageChannel 实现了 #doWrite(ChannelOutboundBuffer in)
方法,但是对于 NioServerSocketChannel 来说,暂时没有意义,因为:
|
- 两个方法,都是直接抛出 UnsupportedOperationException 异常。
那么为什么 AbstractNioMessageChannel 会实现了 #doWrite(ChannelOutboundBuffer in)
方法呢?因为 NioDatagramChannel 和 NioSctpChannel 等等会使用到该方法。感兴趣的胖友,可以自己研究下。
8. ChannelOutboundBuffer
io.netty.channel.ChannelOutboundBuffer
,内存队列。
- 在 write 操作时,将数据写到 ChannelOutboundBuffer 中。
- 在 flush 操作时,将 ChannelOutboundBuffer 的数据写入到对端。
8.1 Entry
在 write 操作时,将数据写到 ChannelOutboundBuffer 中,都会产生一个 Entry 对象。代码如下:
/** |
RECYCLER
静态属性,用于重用 Entry 对象。handle
属性,Recycler 处理器,用于回收 Entry 对象。
next
属性,指向下一条 Entry 。通过它,形成 ChannelOutboundBuffer 内部的链式存储每条写入数据的数据结构。msg
属性,写入的消息( 数据 )。promise
属性,Promise 对象。当数据写入成功后,可以通过它回调通知结果。total
属性,长度,可读字节数。通过#total(Object msg)
方法来计算。代码如下:private static long total(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof FileRegion) {
return ((FileRegion) msg).count();
}
if (msg instanceof ByteBufHolder) {
return ((ByteBufHolder) msg).content().readableBytes();
}
return -1;
}- 从这个方法,我们看到,
msg
的类型,有 ByteBuf、FileRegion、ByteBufHolder 。
- 从这个方法,我们看到,
process
属性,已写入的字节数。详细解析,见 「8.7.1 process」 。
count
属性,msg
属性转化的 NIO ByteBuffer 的数量。bufs
属性,当count > 0
时使用,表示msg
属性转化的 NIO ByteBuffer 数组。buf
属性,当count = 0
时使用,表示msg
属性转化的 NIO ByteBuffer 对象。
cancelled
属性,是否取消写入对端。pendingSize
属性,每个 Entry 预计占用的内存大小,计算方式为消息(msg
)的字节数 + Entry 对象自身占用内存的大小。
8.1.1 newInstance
#newInstance(Object msg, int size, long total, ChannelPromise promise)
静态方法,创建 Entry 对象。代码如下:
static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) { |
- 通过 Recycler 来重用 Entry 对象。
8.1.2 recycle
#recycle()
方法,回收 Entry 对象,以为下次重用该对象。代码如下:
void recycle() { |
8.1.3 recycleAndGetNext
#recycleAndGetNext()
方法,获得下一个 Entry 对象,并回收当前 Entry 对象。代码如下:
Entry recycleAndGetNext() { |
8.1.4 cancel
#cancel()
方法,标记 Entry 对象,取消写入到对端。在 ChannelOutboundBuffer 里,Entry 数组是通过链式的方式进行组织,而当某个 Entry 对象( 节点 )如果需要取消写入到对端,是通过设置 canceled = true
来标记删除。代码如下:
int cancel() { |
8.2 构造方法
/** |
channel
属性,所属的 Channel 对象。- 链式结构
flushedEntry
属性,第一个( 开始 ) flush Entry 。unflushedEntry
属性,第一个未 flush Entry 。tailEntry
属性,尾 Entry 。flushed
属性, 已 flush 但未写入对端的 Entry 数量。- 指向关系是
Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
。这样看,可能有点抽象,下文源码解析详细理解。
NIO_BUFFERS
静态属性,线程对应的 NIO ByteBuffer 数组缓存。在AbstractChannel#doWrite(ChannelOutboundBuffer)
方法中,会调用ChannelOutbound#nioBuffers(int maxCount, long maxBytes)
方法,初始化数组缓存。 详细解析,见 「8.6 nioBuffers」 中。nioBufferCount
属性:NIO ByteBuffer 数组的数组大小。nioBufferSize
属性:NIO ByteBuffer 数组的字节大小。
inFail
属性,正在通知 flush 失败中。详细解析,见 「8.8 failFlushed」 中。- ChannelOutboundBuffer 写入控制相关。😈 详细解析,见 「10. ChannelOutboundBuffer」 。
unwritable
属性,是否不可写。UNWRITABLE_UPDATER
静态属性,unwritable
属性的原子更新器。
totalPendingSize
属性,所有 Entry 预计占用的内存大小,通过Entry.pendingSize
来合计。TOTAL_PENDING_SIZE_UPDATER
静态属性,totalPendingSize
属性的原子更新器。
fireChannelWritabilityChangedTask
属性,触发 Channel 可写的改变的任务。CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD
静态属性,每个 Entry 对象自身占用内存的大小。为什么占用的 96 字节呢?- 16 bytes object header
,对象头,16 字节。- 8 reference fields
,实际是 6 个对象引用字段,6 * 8 = 48 字节。- 2 long fields
,2 个long
字段,2 * 8 = 16 字节。- 2 int fields
,1 个int
字段,2 * 4 = 8 字节。- 1 boolean field
,1 个boolean
字段,1 字节。padding
,补齐 8 字节的整数倍,因此 7 字节。- 因此,合计 96 字节( 64 位的 JVM 虚拟机,并且不考虑压缩 )。
- 如果不理解的胖友,可以看看 《JVM中 对象的内存布局 以及 实例分析》 。
8.3 addMessage
#addMessage(Object msg, int size, ChannelPromise promise)
方法,写入消息( 数据 )到内存队列。注意,promise
只有在真正完成写入到对端操作,才会进行通知。代码如下:
1: /** |
- 第 7 行:调用
#newInstance(Object msg, int size, long total, ChannelPromise promise)
静态方法,创建 Entry 对象。 - 第 11 至 17 行:修改尾节点
tailEntry
为新的 Entry 节点。- 第 8 至 10 行:若
tailEntry
为空,将flushedEntry
也设置为空。防御型编程,实际不会出现,胖友可以忽略。😈 当然,原因在#removeEntry(Entry e)
方法。 - 第 11 至 15 行:若
tailEntry
非空,将原tailEntry.next
指向新 Entry 。 - 第 17 行:更新原
tailEntry
为新 Entry 。
- 第 8 至 10 行:若
- 第 18 至 21 行:若
unflushedEntry
为空,则更新为新 Entry ,此时相当于首节点。 - 第 23 至 26 行:
#incrementPendingOutboundBytes(long size, ...)
方法,增加totalPendingSize
计数。详细解析,见 「10.1 incrementPendingOutboundBytes」 。
可能有点抽象,我们来看看基友【闪电侠】对这块的解析:
FROM 闪电侠 《netty 源码分析之 writeAndFlush 全解析》
初次调用
addMessage
之后,各个指针的情况为
fushedEntry
指向空,unFushedEntry
和tailEntry
都指向新加入的节点第二次调用
addMessage
之后,各个指针的情况为第 n 次调用
addMessage
之后,各个指针的情况为可以看到,调用 n 次
addMessage
,flushedEntry
指针一直指向 NULL ,表示现在还未有节点需要写出到 Socket 缓冲区,而unFushedEntry
之后有 n 个节点,表示当前还有n个节点尚未写出到 Socket 缓冲区中去
8.4 addFlush
#addFlush()
方法,标记内存队列每个 Entry 对象,开始 flush 。代码如下:
老艿艿:总觉得这个方法名取的有点奇怪,胖友可以直接看英文注释。😈 我“翻译”不好,哈哈哈。
1: public void addFlush() { |
- 第 6 至 7 行:若
unflushedEntry
为空,说明每个 Entry 对象已经“标记” flush 。注意,“标记”的方式,不是通过 Entry 对象有一个flushed
字段,而是flushedEntry
属性,指向第一个( 开始 ) flush 的 Entry ,而unflushedEntry
置空。 - 第 8 至 12 行:若
flushedEntry
为空,赋值为unflushedEntry
,用于记录第一个( 开始 ) flush 的 Entry 。 - 第 13 至 26 行:计算需要 flush 的 Entry 数量,并设置每个 Entry 对应的 Promise 不可取消。
- 第 18 至 23 行:
#decrementPendingOutboundBytes(long size, ...)
方法,减少totalPendingSize
计数。
- 第 18 至 23 行:
- 第 30 行:设置
unflushedEntry
为空。
可能有点抽象,我们来看看基友【闪电侠】对这块的解析:
FROM 闪电侠 《netty 源码分析之 writeAndFlush 全解析》
可以结合前面的图来看,首先拿到
unflushedEntry
指针,然后将flushedEntry
指向unflushedEntry
所指向的节点,调用完毕之后,三个指针的情况如下所示
老艿艿:再次切回到老艿艿的频道,呼呼。
当一次需要从内存队列写到对端的数据量非常大,那么可能写着写着 Channel 的缓存区不够,导致 Channel 此时不可写。但是,这一轮 #addFlush(...)
标记的 Entry 对象并没有都写到对端。例如,准备写到对端的 Entry 的数量是 flush = 10
个,结果只写了 6 个,那么就剩下 flush = 4
。
但是的但是,#addMessage(...)
可能又不断写入新的消息( 数据 )到 ChannelOutboundBuffer 中。那会出现什么情况呢?会“分”成两段:
<1>
段:自节点flushedEntry
开始的flush
个 Entry 节点,需要写入到对端。<2>
段:自节点unFlushedEntry
开始的 Entry 节点,需要调用#addFlush()
方法,添加到<1>
段中。
这就很好的解释两个事情:
- 为什么
#addFlush()
方法,命名是以"add"
开头。 - ChannelOutboundBuffer 的链式结构,为什么不是
head
和tail
两个节点,而是flushedEntry
、unFlushedEntry
、flushedEntry
三个节点。在此处,请允许老艿艿爆个粗口:真他 x 的巧妙啊。
8.4.1 size
#size()
方法,获得 flushed
属性。代码如下:
/** |
8.4.2 isEmpty
#isEmpty()
方法,是否为空。代码如下:
/** |
8.5 current
#current()
方法,获得当前要写入对端的消息( 数据 )。代码如下:
/** |
- 即,返回的是
flushedEntry
的消息( 数据 )。
8.6 nioBuffers
#nioBuffers(int maxCount, long maxBytes)
方法,获得当前要写入到对端的 NIO ByteBuffer 数组,并且获得的数组大小不得超过 maxCount
,字节数不得超过 maxBytes
。我们知道,在写入数据到 ChannelOutboundBuffer 时,一般使用的是 Netty ByteBuf 对象,但是写到 NIO SocketChannel 时,则必须使用 NIO ByteBuffer 对象,因此才有了这个方法。考虑到性能,这个方法里会使用到“缓存”,所以看起来会比较绕一丢丢。OK,开始看代码落:
/** |
- 第 4 至 5 行:初始
nioBufferSize
、nioBufferCount
计数。 - 第 6 至 8 行:获得当前线程的 NIO ByteBuffer 数组缓存。
- 关于 InternalThreadLocalMap 和 FastThreadLocal ,胖友可以暂时忽略,后续的文章,详细解析。
第 10 至 11 行:从
flushedEntry
节点,开始向下遍历。调用
#isFlushedEntry(Entry entry)
方法,判断是否为已经“标记”为 flush 的 Entry 节点。代码如下:private boolean isFlushedEntry(Entry e) {
return e != null && e != unflushedEntry;
}e != unflushedEntry
,就是我们在 「8.4 addFlush」 最后部分讲的,思考下。
entry.msg instanceof ByteBuf
,消息( 数据 )类型为 ByteBuf 。实际上,msg
的类型也可能是 FileRegion 。如果 ChannelOutboundBuffer 里的消息都是 FileRegion 类型,那就会导致这个方法返回为空 NIO ByteBuffer 数组。
- 第 13 行:若 Entry 节点已经取消,忽略。
- 第 14 至 18 行:获得消息( 数据 )开始读取位置和可读取的字节数。
- 第 21 行:若无可读取的数据,忽略。
- 第 22 至 37 行:
- 前半段
maxBytes - readableBytes < nioBufferSize
,当前 ByteBuf 可读取的字节数,不能超过maxBytes
。这个比较好理解。 - 后半段
nioBufferCount != 0
,如果第一条数据,就已经超过maxBytes
,那么只能“强行”读取,否则会出现一直无法读取的情况( 因为不能跳过这条 😈 )。
- 前半段
- 第 39 行:增加
nioBufferSize
。 - 第 40 至 45 行:调用
ByteBuf#nioBufferCount()
方法,初始 Entry 节点的count
属性( NIO ByteBuffer 数量)。- 使用
count == -1
的原因是,Entry.count
未初始化时,为-1
。
- 使用
- 第 47 至 51 行:如果超过 NIO ByteBuffer 数组的大小,调用
#expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size)
方法,进行扩容。详细解析,见 「8.6.1 expandNioBufferArray」 。 - 第 52 至 77 行:初始 Entry 节点的
buf
或bufs
属性。- 当
count = 1
时,调用ByteBuf#internalNioBuffer(readerIndex, readableBytes)
方法,获得 NIO ByteBuffer 对象。 - 当
count > 1
时,调用ByteBuf#nioBuffers()
方法,获得 NIO ByteBuffer 数组。 - 通过
nioBuffers[nioBufferCount++] = nioBuf
,将 NIO ByteBuffer 赋值到结果数组nioBuffers
中,并增加nioBufferCount
。
- 当
- 第 79 至 82 行:到达
maxCount
上限,结束循环。老艿艿的想法,这里最好改成nioBufferCount >= maxCount
,是有可能会超过的。 - 第 87 行:下一个 Entry 节点。
- 第 90 至 92 行:设置 ChannelOutboundBuffer 的
nioBufferCount
和nioBufferSize
属性。
8.6.1 expandNioBufferArray
#expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size)
方法,进行 NIO ByteBuff 数组的扩容。代码如下:
private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) { |
- 代码比较简单,胖友自己看下注释。
8.6.2 nioBufferCount
#nioBufferCount()
方法,返回 nioBufferCount
属性。代码如下:
/** |
8.6.3 nioBufferSize
#nioBufferSize()
方法,返回 nioBufferSize
属性。代码如下:
/** |
8.7 removeBytes
#removeBytes(long writtenBytes)
方法,移除已经写入 writtenBytes
字节对应的 Entry 对象 / 对象们。代码如下:
1: public void removeBytes(long writtenBytes) { |
- 第 3 行:循环,移除已经写入
writtenBytes
字节对应的 Entry 对象。- 第 5 行:调用
#current()
方法,获得当前消息( 数据 )。 - 第 12 至 15 行:获得消息( 数据 )开始读取位置和可读取的字节数。
<1>
当前消息( 数据 )已被写完到对端。- 第 21 行:调用
#progress(long amount)
方法,处理当前消息的 Entry 的写入进度。详细解析,见 「8.7.1 progress」 。 - 第 23 行:减小
writtenBytes
。 - 第 26 行:调用
#remove()
方法,移除当前消息对应的 Entry 对象。详细解析,见 「8.7.2 remove」 。 <2》
当前消息( 数据 )未被写完到对端。- 第 31 行:调用
ByteBuf#readerIndex(readerIndex)
方法,标记当前消息的 ByteBuf 的读取位置。 - 第 33 行:调用
#progress(long amount)
方法,处理当前消息的 Entry 的写入进度。 - 第 35 行:
break
,结束循环。
- 第 5 行:调用
- 第 40 行:调用
#clearNioBuffers()
方法,清除 NIO ByteBuff 数组的缓存。详细解析,见 「8.7.4 clearNioBuffers」 。
8.7.1 progress
#progress(long amount)
方法,处理当前消息的 Entry 的写入进度,主要是通知 Promise 消息写入的进度。代码如下:
/** |
- 第 5 行:若
promise
的类型是 ChannelProgressivePromise 类型。 - 第 6 至 8 行:设置 Entry 对象的
progress
属性。 - 第 10 行:调用
ChannelProgressivePromise#tryProgress(progress, total)
方法,通知 ChannelProgressivePromise 进度。
8.7.2 remove
#remove()
方法,移除当前消息对应的 Entry 对象,并 Promise 通知成功。代码如下:
1: public boolean remove() { |
- 第 14 行:调用
#removeEntry(Entry e)
方法,移除指定 Entry 对象。详细解析,见 「8.7.3 removeEntry」 。 - 第 16 行:若 Entry 已取消,则忽略。
- 第 19 行:
ReferenceCountUtil#safeRelease(msg)
方法,释放消息( 数据 )相关的资源。 第 21 行:【重要】调用
#safeSuccess(promise)
方法,通知 Promise 执行成功。此处才是,真正触发Channel#write(...)
或Channel#writeAndFlush(...)
方法,返回的 Promise 的通知。#safeSuccess(promise)
方法的代码如下:private static void safeSuccess(ChannelPromise promise) {
// Only log if the given promise is not of type VoidChannelPromise as trySuccess(...) is expected to return
// false.
PromiseNotificationUtil.trySuccess(promise, null, promise instanceof VoidChannelPromise ? null : logger);
}第 23 行:
#decrementPendingOutboundBytes(long size, ...)
方法,减少totalPendingSize
计数。- 第 28 行:调用
Entry#recycle()
方法,回收 Entry 对象。
8.7.3 removeEntry
#removeEntry(Entry e)
方法,移除指定 Entry 对象。代码如下:
1: private void removeEntry(Entry e) { |
- 第 3 至 9 行:已移除完已 flush 的所有 Entry 节点,置空
flushedEntry
、tailEntry
、unflushedEntry
。 - 第 10 至 13 行:未移除完已 flush 的所有 Entry 节点,
flushedEntry
指向下一个 Entry 对象。
8.7.4 clearNioBuffers
#clearNioBuffers()
方法,清除 NIO ByteBuff 数组的缓存。代码如下:
// Clear all ByteBuffer from the array so these can be GC'ed. |
- 代码比较简单,胖友自己看注释。主要目的是 help gc 。
8.8 failFlushed
#failFlushed(Throwable cause, boolean notify)
方法,写入数据到对端失败,进行后续的处理,详细看代码。代码如下:
1: void failFlushed(Throwable cause, boolean notify) { |
- 第 2 至 10 行:正在通知 flush 失败中,直接返回。
- 第 14 行:标记正在通知 flush 失败中,即
inFail = true
。 - 第 15 至 20 行:循环,调用
#remove0(Throwable cause, boolean notifyWritability)
方法,移除所有已 flush 的 Entry 节点们。详细解析,见 「8. remove0」 中。 - 第 21 至 24 行:标记不在通知 flush 失败中,即
inFail = false
。
8.8.1 remove0
#remove0(Throwable cause, boolean notifyWritability)
方法,移除当前消息对应的 Entry 对象,并 Promise 通知异常。代码如下:
1: private boolean remove0(Throwable cause, boolean notifyWritability) { |
- 第 3 至 8 行:若所有 flush 的 Entry 节点,都已经写到对端,则调用
#clearNioBuffers()
方法,清除 NIO ByteBuff 数组的缓存。 - 第 14 行:调用
#removeEntry(Entry e)
方法,移除指定 Entry 对象。详细解析,见 「8.7.3 removeEntry」 。 - 第 16 行:若 Entry 已取消,则忽略。
- 第 19 行:
ReferenceCountUtil#safeRelease(msg)
方法,释放消息( 数据 )相关的资源。 第 21 行:【重要】调用
#safeFail(promise)
方法,通知 Promise 执行失败。此处才是,真正触发Channel#write(...)
或Channel#writeAndFlush(...)
方法,返回的 Promise 的通知。#safeFail(promise)
方法的代码如下:private static void safeFail(ChannelPromise promise, Throwable cause) {
// Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
// false.
PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
}第 23 行:调用
#decrementPendingOutboundBytes(long size, ...)
方法,减少totalPendingSize
计数。- 第 28 行:调用
Entry#recycle()
方法,回收 Entry 对象。
8.9 forEachFlushedMessage
TODO 1015 forEachFlushedMessage 在 netty-transport-native-poll
和 netty-transport-native-kqueue
中使用,在后续的文章解析。
8.10 close
#close(...)
方法,关闭 ChannelOutboundBuffer ,进行后续的处理,详细看代码。代码如下:
void close(ClosedChannelException cause) { |
- 第 3 行:正在通知 flush 失败中:
- 第 5 至 10 行: 提交 EventLoop 的线程中,执行关闭。
- 第 12 行:
return
返回。
- 第 16 行:标记正在通知 flush 失败中,即
inFail = true
。 - 第 28 至 30 行:从
unflushedEntry
节点,开始向下遍历。- 第 31 至 34 行:减少
totalPendingSize
计数。 - 第 36 行:若 Entry 已取消,则忽略。
- 第 38 行:调用
ReferenceCountUtil#safeRelease(msg)
方法,释放消息( 数据 )相关的资源。 - 第 40 行:【重要】调用
#safeFail(promise)
方法,通知 Promise 执行失败。此处才是,真正触发Channel#write(...)
或Channel#writeAndFlush(...)
方法,返回的 Promise 的通知。 - 第 43 行:调用
Entry#recycleAndGetNext()
方法,回收当前节点,并获得下一个 Entry 节点。
- 第 31 至 34 行:减少
- 第 45 至 48 行:标记不在通知 flush 失败中,即
inFail = false
。 - 第 51 行:调用
#clearNioBuffers()
方法,清除 NIO ByteBuff 数组的缓存。
9. NioEventLoop
在上文 「7. NioSocketChannel」 中,在写入到 Channel 到对端,若 TCP 数据发送缓冲区已满,这将导致 Channel 不写可,此时会注册对该 Channel 的 SelectionKey.OP_WRITE
事件感兴趣。从而实现,再在 Channel 可写后,进行强制 flush 。这块的逻辑,在 NioEventLoop#processSelectedKey(SelectionKey k, AbstractNioChannel ch)
中实现,代码如下:
// OP_WRITE 事件就绪 |
通过 Selector 轮询到 Channel 的
OP_WRITE
就绪时,调用AbstractNioUnsafe#forceFlush()
方法,强制 flush 。代码如下:// AbstractNioUnsafe.java
public final void forceFlush() {
// directly call super.flush0() to force a flush now
super.flush0();
}- 后续的逻辑,又回到 「6. AbstractUnsafe」 小节的
#flush0()
流程。 - 在完成强制 flush 之后,会取消对
SelectionKey.OP_WRITE
事件的感兴趣。
- 后续的逻辑,又回到 「6. AbstractUnsafe」 小节的
9.1 如何模拟
配置服务端 ServerBootstrap 的启动参数如下:
.childOption(ChannelOption.SO_SNDBUF, 5) // Socket 参数,TCP 数据发送缓冲区大小。
telnet
到启动的服务端,发送相对长的命令,例如"abcdefghijklmnopqrstuvw11321321321nhdkslk"
。
10. ChannelOutboundBuffer 写入控制
当我们不断调用 #addMessage(Object msg, int size, ChannelPromise promise)
方法,添加消息到 ChannelOutboundBuffer 内存队列中,如果不及时 flush 写到对端( 例如程序一直未调用 Channel#flush()
方法,或者对端接收数据比较慢导致 Channel 不可写 ),可能会导致 OOM 内存溢出。所以,在 ChannelOutboundBuffer 使用 totalPendingSize
属性,存储所有 Entry 预计占用的内存大小( pendingSize
)。
- 在
totalPendingSize
大于高水位阀值时(ChannelConfig.writeBufferHighWaterMark
,默认值为 64 KB ),关闭写开关(unwritable
)。详细解析,见 「10.1 incrementPendingOutboundBytes」 。 - 在
totalPendingSize
小于低水位阀值时(ChannelConfig.writeBufferLowWaterMark
,默认值为 32 KB ),打开写开关(unwritable
)。详细解析,见 「10.2 decrementPendingOutboundBytes」 。
该功能,对应 Github 提交为 《Take memory overhead of ChannelOutboundBuffer / PendingWriteQueue into account》 。
10.1 incrementPendingOutboundBytes
#incrementPendingOutboundBytes(long size, ...)
方法,增加 totalPendingSize
计数。代码如下:
1: /** |
- 第 15 行:增加
totalPendingSize
计数。 第 16 至 19 行:
totalPendingSize
大于高水位阀值时,调用#setUnwritable(boolean invokeLater)
方法,设置为不可写。代码如下:1: private void setUnwritable(boolean invokeLater) {
2: for (;;) {
3: final int oldValue = unwritable;
4: // 或位操作,修改第 0 位 bits 为 1
5: final int newValue = oldValue | 1;
6: // CAS 设置 unwritable 为新值
7: if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
8: // 若之前可写,现在不可写,触发 Channel WritabilityChanged 事件到 pipeline 中。
9: if (oldValue == 0 && newValue != 0) {
10: fireChannelWritabilityChanged(invokeLater);
11: }
12: break;
13: }
14: }
15: }- 第 2 行:
for
循环,直到 CAS 修改成功 - 第 5 行:或位操作,修改第 0 位 bits 为 1 。😈 比较神奇的是,
unwritable
的类型不是boolean
,而是int
类型。通过每个 bits ,来表示哪种类型不可写。感兴趣的胖友,可以看看io.netty.handler.traffic.AbstractTrafficShapingHandler
,使用了第 1、2、3 bits 。 - 第 7 行:CAS 设置
unwritable
为新值。 - 第 8 至 11 行:若之前可写,现在不可写,调用
#fireChannelWritabilityChanged(boolean invokeLater)
方法,触发 Channel WritabilityChanged 事件到 pipeline 中。详细解析,见 「10.3 fireChannelWritabilityChanged」 。
- 第 2 行:
10.1.1 bytesBeforeUnwritable
#bytesBeforeUnwritable()
方法,获得距离不可写还有多少字节数。代码如下:
public long bytesBeforeUnwritable() { |
- 基于高水位阀值来判断。
10.2 decrementPendingOutboundBytes
#decrementPendingOutboundBytes(long size, ...)
方法,减少 totalPendingSize
计数。代码如下:
1: /** |
- 第 15 行:减少
totalPendingSize
计数。 第 16 至 19 行:
totalPendingSize
小于低水位阀值时,调用#setWritable(boolean invokeLater)
方法,设置为可写。代码如下:1: private void setWritable(boolean invokeLater) {
2: for (;;) {
3: final int oldValue = unwritable;
4: // 并位操作,修改第 0 位 bits 为 0
5: final int newValue = oldValue & ~1;
6: // CAS 设置 unwritable 为新值
7: if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
8: // 若之前不可写,现在可写,触发 Channel WritabilityChanged 事件到 pipeline 中。
9: if (oldValue != 0 && newValue == 0) {
10: fireChannelWritabilityChanged(invokeLater);
11: }
12: break;
13: }
14: }
15: }- 第 2 行:
for
循环,直到 CAS 修改成功 - 第 5 行:并位操作,修改第 0 位 bits 为 0 。
- 第 7 行:CAS 设置
unwritable
为新值。 - 第 8 至 11 行:若之前可写,现在不可写,调用
#fireChannelWritabilityChanged(boolean invokeLater)
方法,触发 Channel WritabilityChanged 事件到 pipeline 中。详细解析,见 「10.3 fireChannelWritabilityChanged」 。
- 第 2 行:
10.2.1 bytesBeforeWritable
#bytesBeforeWritable()
方法,获得距离可写还要多少字节数。代码如下:
/** |
- 基于低水位阀值来判断。
10.3 fireChannelWritabilityChanged
#fireChannelWritabilityChanged(boolean invokeLater)
方法,触发 Channel WritabilityChanged 事件到 pipeline 中。代码如下:
private void fireChannelWritabilityChanged(boolean invokeLater) { |
- 根据
invokeLater
的值,分成两种方式,调用ChannelPipeline#fireChannelWritabilityChanged()
方法,触发 Channel WritabilityChanged 事件到 pipeline 中。具体,胖友看下代码注释。 - 后续的流程,就是 《精尽 Netty 源码解析 —— ChannelPipeline(五)之 Inbound 事件的传播》 。
- 通过 Channel WritabilityChanged 事件,配合
io.netty.handler.stream.ChunkedWriteHandler
处理器,实现 ChannelOutboundBuffer 写入的控制,避免 OOM 。ChunkedWriteHandler 的具体代码实现,我们在后续的文章,详细解析。- 所以,有一点要注意,ChannelOutboundBuffer 的
unwritable
属性,仅仅作为一个是否不可写的开关,具体需要配合响应的 ChannelHandler 处理器,才能实现“不可写”的功能。
- 所以,有一点要注意,ChannelOutboundBuffer 的
10.4 isWritable
#isWritable()
方法,是否可写。代码如下:
/** |
- 如果
unwritable
大于 0 ,则表示不可写。😈 一定要注意!!!
10.4.1 getUserDefinedWritability
#getUserDefinedWritability(int index)
方法,获得指定 bits 是否可写。代码如下:
/** |
- 为什么方法名字上会带有
"UserDefined"
呢?因为index
不能使用 0 ,表示只允许使用用户定义("UserDefined"
) bits 位,即[1, 31]
。
10.4.2 setUserDefinedWritability
#setUserDefinedWritability(int index, boolean writable)
方法,设置指定 bits 是否可写。代码如下:
/** |
- 代码比较简单,胖友自己看噢。
666. 彩蛋
比想象中,长的多的多的一篇文章。总的来说,绝大部分细节,都已经扣到,美滋滋。如果有解释不够清晰或错误的细节,一起多多沟通呀。
写完这篇,我简直疯了。。。。
推荐阅读文章: