1. 概述
本文我们分享 EventLoop 的运行相关代码的实现。
因为 EventLoop 的运行主要是通过 NioEventLoop 的 #run()
方法实现,考虑到内容相对的完整性,在 《精尽 Netty 源码解析 —— EventLoop(三)之 EventLoop 初始化》 一文中,我们并未分享 NioEventLoop 的初始化,所以本文也会分享这部分的内容。
OK ,还是老样子,自上而下的方式,一起来看看 NioEventLoop 的代码实现。
老艿艿,本文的重点在 「2.9 run」 和 「2.12 select」 中。
2. NioEventLoop
io.netty.channel.nio.NioEventLoop
,继承 SingleThreadEventLoop 抽象类,NIO EventLoop 实现类,实现对注册到其中的 Channel 的就绪的 IO 事件,和对用户提交的任务进行处理。
2.1 static
在 static
代码块中,初始化了 NioEventLoop 的静态属性们。代码如下:
/** |
CLEANUP_INTERVAL
属性,TODO 1007 NioEventLoop cancelDISABLE_KEYSET_OPTIMIZATION
属性,是否禁用 SelectionKey 的优化,默认开启。详细解析,见 《精尽 Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》 。SELECTOR_AUTO_REBUILD_THRESHOLD
属性,NIO Selector 空轮询该 N 次后,重建新的 Selector 对象,用以解决 JDK NIO 的 epoll 空轮询 Bug 。MIN_PREMATURE_SELECTOR_RETURNS
属性,少于该 N 值,不开启空轮询重建新的 Selector 对象的功能。
<1>
处,解决Selector#open()
方法,发生 NullPointException 异常。详细解析,见 http://bugs.sun.com/view_bug.do?bug_id=6427854 和 https://github.com/netty/netty/issues/203 。<2>
处,初始化SELECTOR_AUTO_REBUILD_THRESHOLD
属性。默认 512 。
2.2 构造方法
/** |
- Selector 相关:
unwrappedSelector
属性,未包装的 NIO Selector 对象。selector
属性,包装的 NIO Selector 对象。Netty 对 NIO Selector 做了优化。详细解析,见 《精尽 Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》 。selectedKeys
属性,注册的 NIO SelectionKey 集合。Netty 自己实现,经过优化。详细解析,见 《精尽 Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》 。provider
属性,NIO SelectorProvider 对象,用于创建 NIO Selector 对象。- 在
<1>
处,调用#openSelector()
方法,创建 NIO Selector 对象。
wakenUp
属性,唤醒标记。因为唤醒方法Selector#wakeup()
开销比较大,通过该标识,减少调用。详细解析,见 「2.8 wakeup」 。selectStrategy
属性,Select 策略。详细解析,见 「2.10 SelectStrategy」 。ioRatio
属性,在 NioEventLoop 中,会三种类型的任务:1) Channel 的就绪的 IO 事件;2) 普通任务;3) 定时任务。而ioRatio
属性,处理 Channel 的就绪的 IO 事件,占处理任务的总时间的比例。- 取消 SelectionKey 相关:
cancelledKeys
属性, 取消 SelectionKey 的数量。TODO 1007 NioEventLoop cancelneedsToSelectAgain
属性,是否需要再次 select Selector 对象。TODO 1007 NioEventLoop cancel
2.3 openSelector
#openSelector()
方法,创建 NIO Selector 对象。
考虑到让本文更专注在 EventLoop 的逻辑,并且不影响对本文的理解,所以暂时不讲解它的具体实现。详细解析,见 《精尽 Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》 。
2.4 rebuildSelector
#rebuildSelector()
方法,重建 NIO Selector 对象。
考虑到让本文更专注在 EventLoop 的逻辑,并且不影响对本文的理解,所以暂时不讲解它的具体实现。详细解析,见 《精尽 Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》 。
2.5 newTaskQueue
#newTaskQueue(int maxPendingTasks)
方法,创建任务队列。代码如下:
该方法覆写父类的该方法。
|
调用
PlatformDependent#newMpscQueue(...)
方法,创建 mpsc 队列。我们来看看代码注释对 mpsc 队列的描述:Create a new { Queue} which is safe to use for multiple producers (different threads) and a single consumer (one thread!).
- mpsc 是 multiple producers and a single consumer 的缩写。
- mpsc 是对多线程生产任务,单线程消费任务的消费,恰好符合 NioEventLoop 的情况。
- 详细解析,见后续文章。当然,着急的胖友,可以先看看 《原理剖析(第 012 篇)Netty 之无锁队列 MpscUnboundedArrayQueue 原理分析》 。
2.6 pendingTasks
#pendingTasks()
方法,获得待执行的任务数量。代码如下:
该方法覆写父类的该方法。
|
- 因为 MpscQueue 仅允许单消费,所以获得队列的大小,仅允许在 EventLoop 的线程中调用。
2.7 setIoRatio
#setIoRatio(int ioRatio)
方法,设置 ioRatio
属性。代码如下:
/** |
2.8 wakeup
#wakeup(boolean inEventLoop)
方法,唤醒线程。代码如下:
|
<1>
处,因为 NioEventLoop 的线程阻塞,主要是调用Selector#select(long timeout)
方法,阻塞等待有 Channel 感兴趣的 IO 事件,或者超时。所以需要调用Selector#wakeup()
方法,进行唤醒 Selector 。<2>
处,因为Selector#wakeup()
方法的唤醒操作是开销比较大的操作,并且每次重复调用相当于重复唤醒。所以,通过wakenUp
属性,通过 CAS 修改false => true
,保证有且仅有进行一次唤醒。- 当然,详细的解析,可以结合 「2.9 run」 一起看,这样会更加清晰明了。
2.9 run
#run()
方法,NioEventLoop 运行,处理任务。这是本文最重要的方法。代码如下:
1: |
- 第 3 行:“死”循环,直到 NioEventLoop 关闭,即【第 78 至 89 行】的代码。
- 第 5 行:调用
SelectStrategy#calculateStrategy(IntSupplier selectSupplier, boolean hasTasks)
方法,获得使用的 select 策略。详细解析,胖友先跳到 「2.10 SelectStrategy」 中研究。😈 看完回来。- 我们知道
SelectStrategy#calculateStrategy(...)
方法,有 3 种返回的情况。 - 第 6 至 7 行:第一种,
SelectStrategy.CONTINUE
,默认实现下,不存在这个情况。 - 第 8 至 44 行:第二种,
SelectStrategy.SELECT
,进行 Selector 阻塞 select 。- 第 11 行:重置
wakeUp
标识为false
,并返回修改前的值。 - 第 11 行:调用
#select(boolean oldWakeUp)
方法,选择( 查询 )任务。直接看这个方法不能完全表达出该方法的用途,所以详细解析,见 「2.12 select」 。 - 第 41 至 44 行:若唤醒标识
wakeup
为true
时,调用Selector#wakeup()
方法,唤醒 Selector 。可能看到此处,很多胖友会和我一样,一脸懵逼。实际上,耐下性子,答案在上面的英文注释中。笔者来简单解析下:- 1)在
wakenUp.getAndSet(false)
和#select(boolean oldWakeUp)
之间,在标识wakeUp
设置为false
时,在#select(boolean oldWakeUp)
方法中,正在调用Selector#select(...)
方法,处于阻塞中。 - 2)此时,有另外的线程调用了
#wakeup()
方法,会将标记wakeUp
设置为true
,并唤醒Selector#select(...)
方法的阻塞等待。 - 3)标识
wakeUp
为true
,所以再有另外的线程调用#wakeup()
方法,都无法唤醒Selector#select(...)
。为什么呢?因为#wakeup()
的 CAS 修改false => true
会失败,导致无法调用Selector#wakeup()
方法。 - 解决方式:所以在
#select(boolean oldWakeUp)
执行完后,增加了【第 41 至 44 行】来解决。 - 😈😈😈 整体比较绕,胖友结合实现代码 + 英文注释,再好好理解下。
- 1)在
- 第 11 行:重置
- 第 46 行:第三种,
>= 0
,已经有可以处理的任务,直接向下。
- 我们知道
- 第 49 至 51 行:TODO 1007 NioEventLoop cancel 方法
- 第 53 至 74 行:根据
ioRatio
的配置不同,分成略有差异的 2 种:- 第一种,
ioRatio
为 100 ,则不考虑时间占比的分配。- 第 57 行:调用
#processSelectedKeys()
方法,处理 Channel 感兴趣的就绪 IO 事件。详细解析,见 《精尽 Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》 。 - 第 58 至 62 行:调用
#runAllTasks()
方法,运行所有普通任务和定时任务,不限制时间。详细解析,见 《精尽 Netty 源码解析 —— EventLoop(五)之 EventLoop 处理 IO 事件》 。
- 第 57 行:调用
- 第二种,
ioRatio
为< 100
,则考虑时间占比的分配。- 第 64 行:记录当前时间。
- 第 67 行:和【第 57 行】的代码一样。
- 第 71 至 72 行:🙂 比较巧妙的方式,是不是和胖友之前认为的不太一样。它是以
#processSelectedKeys()
方法的执行时间作为基准,计算#runAllTasks(long timeoutNanos)
方法可执行的时间。 - 第 72 行:调用 #runAllTasks(long timeoutNanos)` 方法,运行所有普通任务和定时任务,限制时间。
- 第一种,
第 75 至 77 行:当发生异常时,调用
#handleLoopException(Throwable t)
方法,处理异常。代码如下:private static void handleLoopException(Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}第 78 至 89 行:TODO 1006 EventLoop 优雅关闭
- 总的来说,
#run()
的执行过程,就是如下一张图:run
2.10 SelectStrategy
io.netty.channel.SelectStrategy
,选择( select )策略接口。代码如下:
public interface SelectStrategy { |
calculateStrategy(IntSupplier selectSupplier, boolean hasTasks)
接口方法有 3 种返回的情况:SELECT
,-1
,表示使用阻塞 select 的策略。CONTINUE
,-2
,表示需要进行重试的策略。实际上,默认情况下,不会返回CONTINUE
的策略。>= 0
,表示不需要 select ,目前已经有可以执行的任务了。
2.10.1 DefaultSelectStrategy
io.netty.channel.DefaultSelectStrategy
,实现 SelectStrategy 接口,默认选择策略实现类。代码如下:
final class DefaultSelectStrategy implements SelectStrategy { |
当
hasTasks
为true
,表示当前已经有任务,所以调用IntSupplier#get()
方法,返回当前 Channel 新增的 IO 就绪事件的数量。代码如下:private final IntSupplier selectNowSupplier = new IntSupplier() {
public int get() throws Exception {
return selectNow();
}
};io.netty.util.IntSupplier
,代码如下:public interface IntSupplier {
/**
* Gets a result.
*
* @return a result
*/
int get() throws Exception;
}- 类似 Java 自带的
Callable<Int>
。
- 类似 Java 自带的
- IntSupplier 在 NioEventLoop 中的实现为
selectNowSupplier
属性。在它的内部会调用#selectNow()
方法。详细解析,见 「2.11 selectNow」 。 - 实际上,这里不调用
IntSupplier#get()
方法,也是可以的。只不过考虑到,可以通过#selectNow()
方法,无阻塞的 select Channel 是否有感兴趣的就绪事件。
- 当
hasTasks
为false
时,直接返回SelectStrategy.SELECT
,进行阻塞 select Channel 感兴趣的就绪 IO 事件。
2.11 selectNow
#selectNow()
方法,代码如下:
int selectNow() throws IOException { |
<1>
处,调用Selector#selectorNow()
方法,立即( 无阻塞 )返回 Channel 新增的感兴趣的就绪 IO 事件数量。<2>
处,若唤醒标识wakeup
为true
时,调用Selector#wakeup()
方法,唤醒 Selector 。因为<1>
处的Selector#selectorNow()
会使用我们对 Selector 的唤醒,所以需要进行复原。有一个冷知道,可能有胖友不知道:注意,如果有其它线程调用了
#wakeup()
方法,但当前没有线程阻塞在#select()
方法上,下个调用#select()
方法的线程会立即被唤醒。😈 有点神奇。
2.12 select
#select(boolean oldWakenUp)
方法,选择( 查询 )任务。这是本文最重要的方法。代码如下:
1: private void select(boolean oldWakenUp) throws IOException { |
- 第 3 行:获得使用的 Selector 对象,不需要每次访问使用
volatile
修饰的selector
属性。 - 第 6 行:获得 select 操作的计数器。主要用于记录 Selector 空轮询次数,所以每次在正在轮询完成( 例如:轮询超时 ),则重置
selectCnt
为 1 。 - 第 8 行:记录当前时间,单位:纳秒。
- 第 10 行:计算 select 操作的截止时间,单位:纳秒。
#delayNanos(currentTimeNanos)
方法返回的为下一个定时任务距离现在的时间,如果不存在定时任务,则默认返回 1000 ms 。该方法的详细解析,见后续文章。
- 第 12 行:“死”循环,直到符合如下任一一种情况后结束:
- select 操作超时,对应【第 18 至 24 行】。
- 若有新的任务加入,对应【第 26 至 37 行】。
- 查询到任务或者唤醒,对应【第 45 至 51 行】。
- 线程被异常打断,对应【第 52 至 66 行】。
- 发生 NIO 空轮询的 Bug 后重建 Selector 对象后,对应【第 75 至 93 行】。
- 第 16 行:计算本次 select 的超时时长,单位:毫秒。因为【第 40 行】的
Selector#select(timeoutMillis)
方法,可能因为各种情况结束,所以需要循环,并且每次重新计算超时时间。至于+ 500000L
和/ 1000000L
的用途,看下代码注释。 - 第 17 至 24 行:如果超过 select 超时时长,则结束 select 。
- 第 19 至 21 行:如果是首次 select ,则调用
Selector#selectNow()
方法,获得非阻塞的 Channel 感兴趣的就绪的 IO 事件,并重置selectCnt
为 1 。
- 第 19 至 21 行:如果是首次 select ,则调用
- 第 26 至 37 行:若有新的任务加入。这里实际要分成两种情况:
- 第一种,提交的任务的类型是 NonWakeupRunnable ,那么它并不会调用
#wakeup()
方法,原因胖友自己看#execute(Runnable task)
思考下。Netty 在#select()
方法的设计上,能尽快执行任务。此时如果标记wakeup
为false
,说明符合这种情况,直接结束 select 。 - 第二种,提交的任务的类型不是 NonWakeupRunnable ,那么在
#run()
方法的【第 8 至 11 行】的wakenUp.getAndSet(false)
之前,发起了一次#wakeup()
方法,那么因为wakenUp.getAndSet(false)
会将标记wakeUp
设置为false
,所以就能满足hasTasks() && wakenUp.compareAndSet(false, true)
的条件。- 这个解释,就和【第 27 至 28 行】的英文注释
So we need to check task queue again before executing select operation.If we don't, the task might be pended until select operation was timed out.
有出入了?这是为什么呢?因为 Selector 被提前 wakeup 了,所以下一次 Selector 的 select 是被直接唤醒结束的。
- 这个解释,就和【第 27 至 28 行】的英文注释
- 第 33 行:虽然已经发现任务,但是还是调用
Selector#selectNow()
方法,非阻塞的获取一次 Channel 新增的就绪的 IO 事件。 - 对应 Github 的代码提交为 https://github.com/lightningMan/netty/commit/f44f3e7926f1676315ae86d0f18bdd9b95681d9f 。
- 第一种,提交的任务的类型是 NonWakeupRunnable ,那么它并不会调用
- 第 40 行:调用
Selector#select(timeoutMillis)
方法,阻塞 select ,获得 Channel 新增的就绪的 IO 事件的数量。 - 第 42 行:select 计数器加 1 。
- 第 44 至 51 行:如果满足下面任一一个条件,结束 select :
selectedKeys != 0
时,表示有 Channel 新增的就绪的 IO 事件,所以结束 select ,很好理解。oldWakenUp || wakenUp.get()
时,表示 Selector 被唤醒,所以结束 select 。hasTasks() || hasScheduledTasks()
,表示有普通任务或定时任务,所以结束 select 。- 那么剩余的情况,主要是 select 超时或者发生空轮询,即【第 68 至 93 行】的代码。
- 第 52 至 66 行:线程被打断。一般情况下不会出现,出现基本是 bug ,或者错误使用。感兴趣的胖友,可以看看 https://github.com/netty/netty/issues/2426 。
- 第 69 行:记录当前时间。
- 第 70 至 73 行:若满足
time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos
,说明到达此处时,Selector 是超时 select ,那么是正常的,所以重置selectCnt
为 1 。 - 第 74 至 93 行:不符合 select 超时的提交,若 select 次数到达重建 Selector 对象的上限,进行重建。这就是 Netty 判断发生 NIO Selector 空轮询的方式,N ( 默认 512 )次 select 并未阻塞超时这么长,那么就认为发生 NIO Selector 空轮询。过多的 NIO Selector 将会导致 CPU 100% 。
- 第 82 行:调用
#rebuildSelector()
方法,重建 Selector 对象。 - 第 84 行:重新获得使用的 Selector 对象。
- 第 86 至 90 行:同【第 20 至 21 行】的代码。
- 第 92 行:结束 select 。
- 第 82 行:调用
- 第 70 至 73 行:若满足
- 第 95 行:记录新的当前时间,用于【第 16 行】,重新计算本次 select 的超时时长。
666. 彩蛋
总的来说还是比较简单的,比较困难的,在于对标记 wakeup
的理解。真的是,细思极恐!!!感谢在理解过程中,闪电侠和大表弟普架的帮助。
推荐阅读文章:
老艿艿:全文的 NIO Selector 空轮询,指的是 epoll cpu 100% 的 bug 。