1. 概述
本文我们分享 EventLoop 的执行任务相关代码的实现。对应如下图的紫条 run tasks 部分:run
EventLoop 执行的任务分成普通任务和定时任务,考虑到内容切分的更细粒度,本文近仅仅分享【普通任务】的部分。
2. runAllTasks 带超时
在 #run()
方法中,会调用 #runAllTasks(long timeoutNanos)
方法,执行所有任务直到完成所有,或者超过执行时间上限。代码如下:
|
- 方法的返回值,表示是否执行过任务。因为,任务队列可能为空,那么就会返回
false
,表示没有执行过任务。 - 第 3 行:调用
#fetchFromScheduledTaskQueue()
方法,将定时任务队列scheduledTaskQueue
到达可执行的任务,添加到任务队列taskQueue
中。通过这样的方式,定时任务得以被执行。详细解析,见 《精尽 Netty 源码解析 —— EventLoop(七)之 EventLoop 处理定时任务》 。 - 第 5 行:首次调用
#pollTask()
方法,获得队头的任务。详细解析,胖友先跳到 「4. pollTask」 。- 第 6 至 11 行:获取不到任务,结束执行,并返回
false
。- 第 9 行:调用
#afterRunningAllTasks()
方法,执行所有任务完成的后续方法。详细解析,见 「5. afterRunningAllTasks」 。
- 第 9 行:调用
- 第 6 至 11 行:获取不到任务,结束执行,并返回
- 第 14 行:计算执行任务截止时间。其中,
ScheduledFutureTask#nanoTime()
方法,我们可以暂时理解成,获取当前的时间,单位为纳秒。详细解析,见 《精尽 Netty 源码解析 —— EventLoop(七)之 EventLoop 处理定时任务》 。 - 第 17 至 46 行:循环执行任务。
- 第 20 行:【重要】调用
#safeExecute(Runnable task)
方法,执行任务。 - 第 23 行:计算
runTasks
加一。 - 第 29 至 36 行:每隔 64 个任务检查一次时间,因为
System#nanoTime()
是相对费时的操作。也因此,超过执行时间上限是“近似的”,而不是绝对准确。- 第 31 行:调用
ScheduledFutureTask#nanoTime()
方法,获取当前的时间。 - 第 32 至 35 行:超过执行时间上限,结束执行。
- 第 31 行:调用
- 第 39 行:再次调用
#pollTask()
方法,获得队头的任务。- 第 41 至 45 行:获取不到,结束执行。
- 第 43 行:调用
ScheduledFutureTask#nanoTime()
方法,获取当前的时间,作为最终的.lastExecutionTime
,即【第 52 行】的代码。
- 第 20 行:【重要】调用
- 第 49 行:调用
#afterRunningAllTasks()
方法,执行所有任务完成的后续方法。 - 第 53 行:返回
true
,表示有执行任务。
3. runAllTasks
在 #run()
方法中,会调用 #runAllTasks()
方法,执行所有任务直到完成所有。代码如下:
1: protected boolean runAllTasks() { |
- 第 4 行:
ranAtLeastOne
,标记是否执行过任务。 第 6 至 14 行:调用
#fetchFromScheduledTaskQueue()
方法,将定时任务队列scheduledTaskQueue
到达可执行的任务,添加到任务队列taskQueue
中。但是实际上,任务队列taskQueue
是有队列大小上限的,因此使用while
循环,直到没有到达可执行的任务为止。第 10 行:调用
#runAllTasksFrom(taskQueue)
方法,执行任务队列中的所有任务。代码如下:protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
// 获得队头的任务
Runnable task = pollTaskFrom(taskQueue);
// 获取不到,结束执行,返回 false
if (task == null) {
return false;
}
for (;;) {
// 执行任务
safeExecute(task);
// 获得队头的任务
task = pollTaskFrom(taskQueue);
// 获取不到,结束执行,返回 true
if (task == null) {
return true;
}
}
}- 代码比较简单,和
#runAllTasks(long timeoutNanos))
方法的代码,大体是相似的。
- 代码比较简单,和
- 第 12 行:若有任务被执行,则标记
ranAtLeastOne
为true
。
- 第 16 至 19 行:如果执行过任务,则设置最后执行时间。
- 第 22 行:调用
#afterRunningAllTasks()
方法,执行所有任务完成的后续方法。 - 第 23 行:返回是否执行过任务。和
#runAllTasks(long timeoutNanos))
方法的返回是一致的。
4. pollTask
#pollTask()
方法,获得队头的任务。代码如下:
protected Runnable pollTask() { |
<1>
处,调用Queue#poll()
方法,获得并移除队首元素。如果获得不到,返回 null 。注意,这个操作是非阻塞的。如果胖友不知道,请 Google 重新学习下。<2>
处,因为获得的任务可能是WAKEUP_TASK
,所以需要通过循环来跳过。
5. afterRunningAllTasks
在 《精尽 Netty 源码解析 —— EventLoop(三)之 EventLoop 初始化》 的 「9.10 afterRunningAllTasks」 中,#afterRunningAllTasks()
方法,执行所有任务完成的后续方法。代码如下:
// SingleThreadEventLoop.java |
- 在方法内部,会调用
#runAllTasksFrom(tailTasks)
方法,执行任务队列tailTasks
的任务。
那么,可能很多胖友会和我有一样的疑问,到底什么样的任务,适合添加到 tailTasks
中呢?笔者请教了自己的好基友,闪电侠,来解答了这个问题。他实现了批量提交写入功能的 Handler ,代码如下:
public class BatchFlushHandler extends ChannelOutboundHandlerAdapter { |
- 代码可能略微有一丢丢难懂,不过笔者已经添加中文注释,胖友可以自己理解下。
为什么这样做会有好处呢?在 《蚂蚁通信框架实践》 的 「5. 批量解包与批量提交」 有相关分享。
如此能减少
pipeline
的执行次数,同时提升吞吐量。这个模式在低并发场景,并没有什么优势,而在高并发场景下对提升吞吐量有不小的性能提升。
666. 彩蛋
美滋滋,比较简单。又是一个失眠的夜晚。