1. 概述
本文我们分享 EventLoop 的处理 IO 事件相关代码的实现。对应如下图的绿条 process selected keys 部分:run
因为在 《精尽 Netty 源码解析 —— EventLoop(四)之 EventLoop 运行》 中,#openSelector()
和 #rebuildSelector()
方法并未做分享,所以我们先来一起看看。
2. SelectorTuple
SelectorTuple ,Selector 元组。代码如下:
SelectorTuple 内嵌在 NioEventLoop
private static final class SelectorTuple { |
3. openSelector
#openSelector()
方法,创建 Selector 对象。代码如下:
1: private SelectorTuple openSelector() { |
- 第 2 至 8 行:创建 Selector 对象,作为
unwrappedSelector
。 - 第 10 至 13 行:禁用 SelectionKey 的优化,则直接返回 SelectorTuple 对象。即,
selector
也使用unwrappedSelector
。 - 第 15 至 28 行:获得 SelectorImpl 类。胖友可以自动过滤掉
AccessController#.doPrivileged(...)
外层代码。在方法内部,调用Class#forName(String name, boolean initialize, ClassLoader loader)
方法,加载sun.nio.ch.SelectorImpl
类。加载成功,则返回该类,否则返回异常。- 第 30 至 39 行: 获得 SelectorImpl 类失败,则直接返回 SelectorTuple 对象。即,
selector
也使用unwrappedSelector
。
- 第 30 至 39 行: 获得 SelectorImpl 类失败,则直接返回 SelectorTuple 对象。即,
第 44 行:创建 SelectedSelectionKeySet 对象。这是 Netty 对 Selector 的
selectionKeys
的优化。关于 SelectedSelectionKeySet 的详细实现,见 「4. SelectedSelectionKeySet」 。- 第 46 至 75 行: 设置 SelectedSelectionKeySet 对象到
unwrappedSelector
中的selectedKeys
和publicSelectedKeys
属性。整个过程,笔者已经添加中文注释,胖友自己看下。 selectedKeys
和publicSelectedKeys
属性在 SelectorImpl 类中,代码如下:protected HashSet<SelectionKey> keys = new HashSet(); // => publicKeys
private Set<SelectionKey> publicKeys;
protected Set<SelectionKey> selectedKeys = new HashSet(); // => publicSelectedKeys
private Set<SelectionKey> publicSelectedKeys;
protected SelectorImpl(SelectorProvider var1) {
super(var1);
if (Util.atBugLevel("1.4")) { // 可以无视
this.publicKeys = this.keys;
this.publicSelectedKeys = this.selectedKeys;
} else {
this.publicKeys = Collections.unmodifiableSet(this.keys);
this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
}
}- 可以看到,
selectedKeys
和publicSelectedKeys
的类型都是 HashSet 。
- 可以看到,
- 第 77 至 83 行:设置 SelectedSelectionKeySet 对象到
unwrappedSelector
中失败,则直接返回 SelectorTuple 对象。即,selector
也使用unwrappedSelector
。
- 第 46 至 75 行: 设置 SelectedSelectionKeySet 对象到
- 第 86 行:设置 SelectedSelectionKeySet 对象到
selectedKeys
中。在下文,我们会看到,是否成功优化 Selector 对象,是通过selectedKeys
是否成功初始化来判断。 - 第 91 行:创建 SelectedSelectionKeySetSelector 对象。这是 Netty 对 Selector 的优化实现类。关于 SelectedSelectionKeySetSelector 的详细实现,见 「5. SelectedSelectionKeySetSelector」 。
- 第 91 行:创建 SelectorTuple 对象。即,
selector
使用 SelectedSelectionKeySetSelector 对象。😈 总算,创建成功优化的selector
对象了。
4. SelectedSelectionKeySet
io.netty.channel.nio.SelectedSelectionKeySet
,继承 AbstractSet 抽象类,已 select 的 NIO SelectionKey 集合。代码如下:
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> { |
- 通过
keys
和size
两个属性,实现可重用的数组。 #add(SelectionKey o)
方法,添加新 select 到就绪事件的 SelectionKey 到keys
中。当超过数组大小上限时,调用#increaseCapacity()
方法,进行两倍扩容。相比 SelectorImpl 中使用的selectedKeys
所使用的 HashSet 的#add(E e)
方法,事件复杂度从O(lgn)
降低到O(1)
。#reset(...)
方法,每次读取使用完数据,调用该方法,进行重置。- 因为
#remove(Object o)
、#contains(Object o)
、#iterator()
不会使用到,索性不进行实现。
5. SelectedSelectionKeySetSelector
io.netty.channel.nio.SelectedSelectionKeySetSelector
,基于 Netty SelectedSelectionKeySet 作为 selectionKeys
的 Selector 实现类。代码如下:
final class SelectedSelectionKeySetSelector extends Selector { |
- 除了 select 相关的 3 个方法,每个实现方法,都是基于 Java NIO Selector 对应的方法的调用。
- select 相关的 3 个方法,在调用对应的 Java NIO Selector 方法之前,会调用
SelectedSelectionKeySet#reset()
方法,重置selectionKeys
。从而实现,每次 select 之后,都是新的已 select 的 NIO SelectionKey 集合。
6. rebuildSelector
#rebuildSelector()
方法,重建 Selector 对象。代码如下:
该方法用于 NIO Selector 发生 epoll bug 时,重建 Selector 对象。
😈 突然又找到一个讨论,可以看看 《JDK 1.7 及以下 NIO 的 epoll bug》 和 《应用服务器中对JDK的epoll空转bug的处理》 。
public void rebuildSelector() { |
- 只允许在 EventLoop 的线程中,调用
#rebuildSelector0()
方法,重建 Selector 对象。
6.1 rebuildSelector0
#rebuildSelector0()
方法,重建 Selector 对象。代码如下:
1: private void rebuildSelector0() { |
- 第 7 行:调用
#openSelector()
方法,创建新的 Selector 对象。 - 第 16 至 52 行:遍历老的 Selector 对象的
selectionKeys
,将注册在 NioEventLoop 上的所有 Channel ,注册到新创建 Selector 对象上。- 第 22 至 24 行:校验 SelectionKey 有效,并且 Java NIO Channel 并未注册在新的 Selector 对象上。
- 第 28 行:调用
SelectionKey#cancel()
方法,取消老的 SelectionKey 。 - 第 30 行:将 Java NIO Channel 注册到新的 Selector 对象上,返回新的 SelectionKey 对象。
- 第 31 至 35 行:修改 Channel 的
selectionKey
指向新的 SelectionKey 对象 - 第 39 至 51 行:当发生异常时候,根据不同的 SelectionKey 的
attachment
来判断处理方式:- 第 41 至 44 行:当
attachment
是 Netty NIO Channel 时,调用Unsafe#close(ChannelPromise promise)
方法,关闭发生异常的 Channel 。 - 第 45 至 50 行:当
attachment
是 Netty NioTask 时,调用#invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause)
方法,通知 Channel 取消注册。详细解析,见 「8. NioTask」 。
- 第 41 至 44 行:当
- 第 54 至 56 行:修改
selector
和unwrappedSelector
指向新的 Selector 对象。 - 第 58 至 66 行:调用
Selector#close()
方法,关闭老的 Selector 对象。
总的来说,#rebuildSelector()
方法,相比 #openSelector()
方法,主要是需要将老的 Selector 对象的“数据”复制到新的 Selector 对象上,并关闭老的 Selector 对象。
7. processSelectedKeys
在 #run()
方法中,会调用 #processSelectedKeys()
方法,处理 Channel 新增就绪的 IO 事件。代码如下:
private void processSelectedKeys() { |
- 当
selectedKeys
非空,意味着使用优化的 SelectedSelectionKeySetSelector ,所以调用#processSelectedKeysOptimized()
方法;否则,调用#processSelectedKeysPlain()
方法。
7.1 processSelectedKeysOptimized
#processSelectedKeysOptimized()
方法,基于 Netty SelectedSelectionKeySetSelector ,处理 Channel 新增就绪的 IO 事件。代码如下:
老艿艿:从方法名,我们也可以看出,这是个经过优化的实现。
1: private void processSelectedKeysOptimized() { |
- 第 3 行:循环
selectedKeys
数组。- 第 4 至 7 行:置空,原因见 https://github.com/netty/netty/issues/2363 。
- 第 11 至 13 行:当
attachment
是 Netty NIO Channel 时,调用#processSelectedKey(SelectionKey k, AbstractNioChannel ch)
方法,处理一个 Channel 就绪的 IO 事件。详细解析,见 「7.3 processSelectedKey」 。 - 第 14 至 19 行:当
attachment
是 Netty NioTask 时,调用#processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task)
方法,使用 NioTask 处理一个 Channel 的 IO 事件。详细解析,见 「8. NioTask」 。 - 第 21 至 29 行:TODO 1007 NioEventLoop cancel 方法
7.2 processSelectedKeysPlain
#processSelectedKeysOptimized()
方法,基于 Java NIO 原生 Selecotr ,处理 Channel 新增就绪的 IO 事件。代码如下:
老艿艿:总体和
#processSelectedKeysOptimized()
方法类似。
1: private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) { |
- 第 10 至 11 行:遍历 SelectionKey 迭代器。
- 第 12 至 15 行:获得下一个 SelectionKey 对象,并从迭代器中移除。
- 第 18 至 20 行:当
attachment
是 Netty NIO Channel 时,调用#processSelectedKey(SelectionKey k, AbstractNioChannel ch)
方法,处理一个 Channel 就绪的 IO 事件。详细解析,见 「7.3 processSelectedKey」 。 - 第 21 至 26 行:当
attachment
是 Netty NioTask 时,调用#processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task)
方法,使用 NioTask 处理一个 Channel 的 IO 事件。详细解析,见 「8. NioTask」 。 - 第 33 至 44 行:TODO 1007 NioEventLoop cancel 方法
7.3 processSelectedKey
#processSelectedKey(SelectionKey k, AbstractNioChannel ch)
方法,处理一个 Channel 就绪的 IO 事件。代码如下:
1: private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { |
- 第 2 至 24 行:如果 SelectionKey 是不合法的,则关闭 Channel 。
- 第 30 至 42 行:如果对
OP_CONNECT
事件就绪:- 第 34 至 39 行:移除对
OP_CONNECT
的感兴趣,即不再监听连接事件。 - 【重要】第 41 行:调用
Unsafe#finishConnect()
方法,完成连接。后续的逻辑,对应 《精尽 Netty 源码分析 —— 启动(二)之客户端》 的 「3.6.4 finishConnect」 小节。
- 第 34 至 39 行:移除对
- 第 44 至 50 行:如果对
OP_WRITE
事件就绪,调用Unsafe#forceFlush()
方法,向 Channel 写入数据。在完成写入数据后,会移除对OP_WRITE
的感兴趣。想要提前了解的胖友,可以自己看下AbstractNioByteChannel#clearOpWrite()
和AbstractNioMessageChannel#doWrite(ChannelOutboundBuffer in)
方法。 - 第 52 至 58 行:如果对
OP_READ
或OP_ACCEPT
事件就绪:调用Unsafe#read()
方法,处理读或者者接受客户端连接的事件。
8. NioTask
io.netty.channel.nio.NioTask
,用于自定义 Nio 事件处理接口。对于每个 Nio 事件,可以认为是一个任务( Task ),代码如下:
public interface NioTask<C extends SelectableChannel> { |
#channelReady(C ch, SelectionKey key)
方法,处理 Channel IO 就绪的事件。相当于说,我们可以通过实现该接口方法,实现 「7.3 processSelectedKey」 的逻辑。#channelUnregistered(C ch, Throwable cause)
方法,Channel 取消注册。一般来说,我们可以通过实现该接口方法,关闭 Channel 。
😈 实际上,NioTask 在 Netty 自身中并未有相关的实现类,并且和闪电侠沟通了下,他在项目中,也并未使用。所以对 NioTask 不感兴趣的胖友,可以跳过本小节。另外,NioTask 是在 Allow a user to access the Selector of an EventLoop 中有相关的讨论。
8.1 register
#register(final SelectableChannel ch, final int interestOps, final NioTask<?> task)
方法,注册 Java NIO Channel ( 不一定需要通过 Netty 创建的 Channel )到 Selector 上,相当于说,也注册到了 EventLoop 上。代码如下:
/** |
<1>
处,调用SelectableChannel#register(Selector sel, int ops, Object att)
方法,注册 Java NIO Channel 到 Selector 上。这里我们可以看到,attachment
为 NioTask 对象,而不是 Netty Channel 对象。
8.2 invokeChannelUnregistered
#invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause)
方法,执行 Channel 取消注册。代码如下:
private static void invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause) { |
- 在方法内部,调用
NioTask#channelUnregistered()
方法,执行 Channel 取消注册。
8.3 processSelectedKey
#processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task)
方法,使用 NioTask ,自定义实现 Channel 处理 Channel IO 就绪的事件。代码如下:
private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) { |
- 代码比较简单,胖友自己看中文注释。主要是看懂
state
有 3 种情况:0
:未执行。1
:执行成功。2
:执行异常。
666. 彩蛋
简单小文一篇,没什么太大难度的一篇。
如果有不理解的地方,也可以看看下面的文章: