1. 概述
本文我们分享 EventLoop 的具体代码实现。因为 EventLoop 涉及的代码量较大,所以笔者会分成好几篇文章分别分享。而本文,我们来分享 EventLoop 的初始化。
但是要将 EventLoop 拆出“初始化”部分的内容,笔者又觉得是件非常困难的事情。所以本文希望能达到如下的效果:
- 理解 EventLoop 有哪些属性
- 创建 EventLoop 的过程
- Channel 注册到 EventLoop 的过程
- EventLoop 的任务提交。
- 虽然任务的提交,比较接近任务的执行,但是考虑到胖友可以更容易的理解 EventLoop ,所以放在本文。
2. 类结构图
EventLoopGroup 的整体类结构如下图:
- 红框部分,为 EventLoopGroup 相关的类关系。其他部分,为 EventLoop 相关的类关系。
- 因为我们实际上使用的是 NioEventLoopGroup 和 NioEventLoop ,所以笔者省略了其它相关的类,例如 OioEventLoopGroup、EmbeddedEventLoop 等等。
下面,我们逐层看看每个接口和类的实现代码。
3. EventExecutor
io.netty.util.concurrent.EventExecutor
,继承 EventExecutorGroup 接口,事件执行器接口。代码如下:
// ========== 实现自 EventExecutorGroup 接口 ========== |
- 接口定义的方法比较简单,已经添加中文注释,胖友自己看下。
4. OrderedEventExecutor
io.netty.util.concurrent.OrderedEventExecutor
,继承 EventExecutor 接口,有序的事件执行器接口。代码如下:
/** |
- 没有定义任何方法,仅仅是一个标记接口,表示该执行器会有序 / 串行的方式执行。
5. EventLoop
io.netty.channel.EventLoop
,继承 OrderedEventExecutor 和 EventLoopGroup 接口,EventLoop 接口。代码如下:
/** |
#parent()
接口方法,覆写方法的返回类型为 EventLoopGroup 。- 接口上的英文注释,意思如下:
- EventLoop 将会处理注册在其上的 Channel 的所有 IO 操作。
- 通常,一个 EventLoop 上可以注册不只一个 Channel 。当然,这个也取决于具体的实现。
6. AbstractEventExecutor
io.netty.util.concurrent.AbstractEventExecutor
,实现 EventExecutor 接口,继承 AbstractExecutorService 抽象类,EventExecutor 抽象类。
6.1 构造方法
/** |
6.2 parent
#parent()
方法,获得所属 EventExecutorGroup 。代码如下:
|
6.3 next
#next()
方法,获得自己。代码如下:
|
6.4 inEventLoop()
#inEventLoop()
方法,判断当前线程是否在 EventLoop 线程中。代码如下:
|
- 具体的
#inEventLoop(Thread thread)
方法,需要在子类实现。因为 AbstractEventExecutor 类还体现不出它所拥有的线程。
6.5 iterator
#iterator()
方法,代码如下:
|
6.6 newPromise 和 newProgressivePromise
#newPromise()
和 #newProgressivePromise()
方法,分别创建 DefaultPromise 和 DefaultProgressivePromise 对象。代码如下:
|
- 我们可以看到,创建的 Promise 对象,都会传入自身作为 EventExecutor 。关于 Promise 相关的,我们在后续文章详细解析。实在想了解,也可以看看 《Netty 源码笔记 —— 第四章 Future 和 Promise》 。
6.7 newSucceededFuture 和 newFailedFuture
#newSucceededFuture(V result)
和 #newFailedFuture(Throwable cause)
方法,分别创建成功结果和异常的 Future 对象。代码如下:
|
- 创建的 Future 对象,会传入自身作为 EventExecutor ,并传入
result
或cause
分别作为成功结果和异常。
6.8 newTaskFor
#newTaskFor(...)
方法,创建 PromiseTask 对象。代码如下:
|
- 创建的 PromiseTask 对象,会传入自身作为 EventExecutor ,并传入 Runnable + Value 或 Callable 作为任务( Task )。
6.9 submit
#submit(...)
方法,提交任务。代码如下:
|
- 每个方法的实现上,是调用父类 AbstractExecutorService 的实现。
6.10 schedule
#schedule(...)
方法,都不支持,交给子类 AbstractScheduledEventExecutor 实现。代码如下:
|
6.11 safeExecute
#safeExecute(Runnable task)
静态方法,安全的执行任务。代码如下:
protected static void safeExecute(Runnable task) { |
- 所谓“安全”指的是,当任务执行发生异常时,仅仅打印告警日志。
6.12 shutdown
#shutdown()
方法,关闭执行器。代码如下:
|
- 具体的
#shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)
和#shutdown()
方法的实现,在子类中。
7. AbstractScheduledEventExecutor
io.netty.util.concurrent.AbstractScheduledEventExecutor
,继承 AbstractEventExecutor 抽象类,支持定时任务的 EventExecutor 的抽象类。
详细解析,见 《精尽 Netty 源码解析 —— EventLoop(七)之 EventLoop 处理定时任务》 。
8. SingleThreadEventExecutor
io.netty.util.concurrent.SingleThreadEventExecutor
,实现 OrderedEventExecutor 接口,继承 AbstractScheduledEventExecutor 抽象类,基于单线程的 EventExecutor 抽象类,即一个 EventExecutor 对应一个线程。
8.1 构造方法
/** |
- 属性比较多,我们耐心往下看。
taskQueue
属性,任务队列。addTaskWakesUp
属性,添加任务到taskQueue
队列时,是否唤醒thread
线程。详细解析,见 「8.11 execute」 。maxPendingTasks
属性,最大等待执行任务数量,即taskQueue
队列大小。rejectedExecutionHandler
属性,拒绝执行处理器。在taskQueue
队列超过最大任务数量时,怎么拒绝处理新提交的任务。
thread
属性,线程。在 SingleThreadEventExecutor 中,任务是提交到taskQueue
队列中,而执行在thread
线程中。threadProperties
属性,线程属性。详细解析,见 「8.15 threadProperties」 。executor
属性,执行器。通过它创建thread
线程。详细解析,见 「8.11 execute」 。interrupted
属性,线程是否打断。详细解析,详细解析,见 「8.14 interruptThread」 。lastExecutionTime
属性,最后执行时间。state
属性,线程状态。SingleThreadEventExecutor 在实现上,thread
的初始化采用延迟启动的方式,只有在第一个任务时,executor
才会执行并创建该线程,从而节省资源。目前thread
线程有 5 种状态,代码如下:private static final int ST_NOT_STARTED = 1; // 未开始
private static final int ST_STARTED = 2; // 已开始
private static final int ST_SHUTTING_DOWN = 3; // 正在关闭中
private static final int ST_SHUTDOWN = 4; // 已关闭
private static final int ST_TERMINATED = 5; // 已经终止
- 构造方法,虽然比较多,但是很简单,胖友自己看下。
8.2 newTaskQueue
#newTaskQueue(int maxPendingTasks)
方法,创建任务队列。代码如下:
/** |
- 方法上有一大段注释,简单的说,这个方法默认返回的是 LinkedBlockingQueue 阻塞队列。如果子类有更好的队列选择( 例如非阻塞队列 ),可以重写该方法。在下文,我们会看到它的子类 NioEventLoop ,就重写了这个方法。
8.3 inEventLoop
#inEventLoop(Thread thread)
方法,判断指定线程是否是 EventLoop 线程。代码如下:
|
8.4 offerTask
#offerTask(Runnable task)
方法,添加任务到队列中。若添加失败,则返回 false
。代码如下:
final boolean offerTask(Runnable task) { |
- 注意,即使对于 BlockingQueue 的
#offer(E e)
方法,也不是阻塞的!
8.5 addTask
#offerTask(Runnable task)
方法,在 #offerTask(Runnable task)
的方法的基础上,若添加任务到队列中失败,则进行拒绝任务。代码如下:
protected void addTask(Runnable task) { |
- 调用
#reject(task)
方法,拒绝任务。详细解析,见 「8.6 reject」 。 - 该方法是
void
,无返回值。
8.6 removeTask
#removeTask(Runnable task)
方法,移除指定任务。代码如下:
protected boolean removeTask(Runnable task) { |
8.7 peekTask
#peekTask()
方法,返回队头的任务,但是不移除。代码如下:
protected Runnable peekTask() { |
8.8 hasTasks
#hasTasks()
方法,队列中是否有任务。代码如下:
protected boolean hasTasks() { |
8.9 pendingTasks
#pendingTasks()
方法,获得队列中的任务数。代码如下:
public int pendingTasks() { |
8.10 reject
#reject(Runnable task)
方法,拒绝任务。代码如下:
protected final void reject(Runnable task) { |
- 调用
RejectedExecutionHandler#rejected(Runnable task, SingleThreadEventExecutor executor)
方法,拒绝该任务。
#reject()
方法,拒绝任何任务,用于 SingleThreadEventExecutor 已关闭( #isShutdown()
方法返回的结果为 true
)的情况。代码如下:
protected static void reject() { |
8.10.1 RejectedExecutionHandler
io.netty.util.concurrent.RejectedExecutionHandler
,拒绝执行处理器接口。代码如下:
/** |
8.10.2 RejectedExecutionHandlers
io.netty.util.concurrent.RejectedExecutionHandlers
,RejectedExecutionHandler 实现类枚举,目前有 2 种实现类。
第一种
private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() { |
- 通过
#reject()
方法,返回REJECT
实现类的对象。该实现在拒绝时,直接抛出 RejectedExecutionException 异常。 - 默认情况下,使用这种实现。
第二种
public static RejectedExecutionHandler backoff(final int retries, long backoffAmount, TimeUnit unit) { |
- 通过
#backoff(final int retries, long backoffAmount, TimeUnit unit)
方法,创建带多次尝试添加到任务队列的 RejectedExecutionHandler 实现类。 - 代码已经添加中文注释,胖友自己理解下,比较简单的。
8.11 execute
#execute(Runnable task)
方法,执行一个任务。但是方法名无法很完整的体现出具体的方法实现,甚至有一些出入,所以我们直接看源码,代码如下:
1: |
- 第 8 行:调用
#inEventLoop()
方法,获得当前是否在 EventLoop 的线程中。 - 第 10 行:调用
#addTask(Runnable task)
方法,添加任务到队列中。 - 第 11 行:非 EventLoop 的线程
- 第 13 行:调用
#startThread()
方法,启动 EventLoop 独占的线程,即thread
属性。详细解析,见 「8.12 startThread」 。 - 第 14 至 17 行:若已经关闭,则移除任务,并拒绝执行。
- 第 13 行:调用
第 20 至 23 行:调用
#wakeup(boolean inEventLoop)
方法,唤醒线程。详细解析,见 「8.13 wakeup」 。- 等等,第 21 行的
!addTaskWakesUp
有点奇怪,不是说好的addTaskWakesUp
表示“添加任务时,是否唤醒线程”?!但是,怎么使用!
取反了。这样反倒变成了,“添加任务时,是否【不】唤醒线程”。具体的原因是为什么呢?笔者 Google、Github Netty Issue、和基佬讨论,都未找到解答。目前笔者的理解是:addTaskWakesUp
真正的意思是,“添加任务后,任务是否会自动导致线程唤醒”。为什么呢?- 对于 Nio 使用的 NioEventLoop ,它的线程执行任务是基于 Selector 监听感兴趣的事件,所以当任务添加到
taskQueue
队列中时,线程是无感知的,所以需要调用#wakeup(boolean inEventLoop)
方法,进行主动的唤醒。 - 对于 Oio 使用的 ThreadPerChannelEventLoop ,它的线程执行是基于
taskQueue
队列监听( 阻塞拉取 )事件和任务,所以当任务添加到taskQueue
队列中时,线程是可感知的,相当于说,进行被动的唤醒。 - 感谢闪电侠,证实我的理解是正确的。参见:
- 对于 Nio 使用的 NioEventLoop ,它的线程执行任务是基于 Selector 监听感兴趣的事件,所以当任务添加到
调用
#wakesUpForTask(task)
方法,判断该任务是否需要唤醒线程。代码如下:protected boolean wakesUpForTask(Runnable task) {
return true;
}- 默认返回
true
。在 「9. SingleThreadEventLoop」 中,我们会看到对该方法的重写。
- 默认返回
- 等等,第 21 行的
8.12 startThread
#startThread()
方法,启动 EventLoop 独占的线程,即 thread
属性。代码如下:
1: private void doStartThread() { |
- 第 2 行:断言,保证
thread
为空。 - 第 3 行 至 72 行:调用
Executor#execute(Runnable runnable)
方法,执行任务。下面,我们来详细解析。 - 第 8 行:赋值当前的线程给
thread
属性。这就是,每个 SingleThreadEventExecutor 独占的线程的创建方式。 - 第 10 至 13 行:如果当前线程已经被标记打断,则进行打断操作。为什么会有这样的逻辑呢?详细解析,见 「8.14 interruptThread」 。
第 18 行:调用
#updateLastExecutionTime()
方法,更新最后执行时间。代码如下:/**
* Updates the internal timestamp that tells when a submitted task was executed most recently.
* {@link #runAllTasks()} and {@link #runAllTasks(long)} updates this timestamp automatically, and thus there's
* usually no need to call this method. However, if you take the tasks manually using {@link #takeTask()} or
* {@link #pollTask()}, you have to call this method at the end of task execution loop for accurate quiet period
* checks.
*/
protected void updateLastExecutionTime() {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}- 英文注释,自己看。😈
- 第 21 行:调用
SingleThreadEventExecutor#run()
方法,执行任务。详细解析,见 8.X run 。 - 第 25 至 69 行:TODO 1006 EventLoop 优雅关闭
- 第 55 行:调用
#cleanup()
方法,清理释放资源。详细解析,见 8.X cleanup 。
8.13 wakeup
#wakeup(boolean inEventLoop)
方法,唤醒线程。代码如下:
protected void wakeup(boolean inEventLoop) { |
<1>
处的!inEventLoop
代码段,判断不在 EventLoop 的线程中。因为,如果在 EventLoop 线程中,意味着线程就在执行中,不必要唤醒。<2>
处,调用Queue#offer(E e)
方法,添加任务到队列中。而添加的任务是WAKEUP_TASK
,代码如下:private static final Runnable WAKEUP_TASK = new Runnable() {
public void run() {
// Do nothing.
}
};- 这是一个空的 Runnable 实现类。仅仅用于唤醒基于
taskQueue
阻塞拉取的 EventLoop 实现类。 对于 NioEventLoop 会重写该方法,代码如下:
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}- 通过 NIO Selector 唤醒。
- 这是一个空的 Runnable 实现类。仅仅用于唤醒基于
8.14 interruptThread
#interruptThread()
方法,打断 EventLoop 的线程。代码如下:
protected void interruptThread() { |
- 因为 EventLoop 的线程是延迟启动,所以可能
thread
并未创建,此时通过interrupted
标记打断。之后在#startThread()
方法中,创建完线程后,再进行打断,也就是说,“延迟打断”。
8.15 threadProperties
#threadProperties()
方法,获得 EventLoop 的线程属性。代码如下:
1: public final ThreadProperties threadProperties() { |
- 第 2 至 3 行:获得 ThreadProperties 对象。若不存在,则进行创建 ThreadProperties 对象。
- 第 4 至 5 行:获得 EventLoop 的线程。因为线程是延迟启动的,所以会出现线程为空的情况。若线程为空,则需要进行创建。
- 第 15 行:调用 DefaultThreadProperties 对象。
- 第 16 至 19 行:CAS 修改
threadProperties
属性。
- 第 22 行:返回
threadProperties
。
8.15.1 ThreadProperties
io.netty.util.concurrent.ThreadProperties
,线程属性接口。代码如下:
Thread.State state(); |
8.15.2 DefaultThreadProperties
DefaultThreadProperties 实现 ThreadProperties 接口,默认线程属性实现类。代码如下:
DefaultThreadProperties 内嵌在 SingleThreadEventExecutor 中。
private static final class DefaultThreadProperties implements ThreadProperties { |
- 我们可以看到,每个实现方法,实际上就是对被包装的线程
t
的方法的封装。 - 那为什么
#threadProperties()
方法不直接返回thread
呢?因为如果直接返回thread
,调用方可以调用到该变量的其他方法,这个是我们不希望看到的。
8.16 run
#run()
方法,它是一个抽象方法,由子类实现,如何执行 taskQueue
队列中的任务。代码如下:
protected abstract void run(); |
SingleThreadEventExecutor 提供了很多执行任务的方法,方便子类在实现自定义运行任务的逻辑时:
- [x]
#runAllTasks()
- [x]
#runAllTasks(long timeoutNanos)
- [x]
#runAllTasksFrom(Queue<Runnable> taskQueue)
- [x]
#afterRunningAllTasks()
- [x]
#pollTask()
- [x]
#pollTaskFrom(Queue<Runnable> taskQueue)
#takeTask()
#fetchFromScheduledTaskQueue()
#delayNanos(long currentTimeNanos)
详细解析,见 《精尽 Netty 源码解析 —— EventLoop(四)之 EventLoop 运行》 。
8.17 cleanup
#cleanup()
方法,清理释放资源。代码如下:
/** |
- 目前该方法为空的。在子类 NioEventLoop 中,我们会看到它覆写该方法,关闭 NIO Selector 对象。
8.18 invokeAll
#invokeAll(...)
方法,在 EventExecutor 中执行多个普通任务。代码如下:
|
调用
#throwIfInEventLoop(String method)
方法,判断若在 EventLoop 的线程中调用该方法,抛出 RejectedExecutionException 异常。代码如下:private void throwIfInEventLoop(String method) {
if (inEventLoop()) {
throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
}
}调用父类 AbstractScheduledEventExecutor 的
#invokeAll(tasks, ...)
方法,执行多个普通任务。在该方法内部,会调用#execute(Runnable task)
方法,执行任务。调用栈如下图:invokeAll => execute 的流程
8.19 invokeAny
和
#invokeAll(...)
方法,类似。
#invokeAll(...)
方法,在 EventExecutor 中执行多个普通任务,有一个执行完成即可。代码如下:
|
- 调用
#throwIfInEventLoop(String method)
方法,判断若在 EventLoop 的线程中调用该方法,抛出 RejectedExecutionException 异常。 - 调用父类 AbstractScheduledEventExecutor 的
#invokeAny(tasks, ...)
方法,执行多个普通任务,有一个执行完成即可。在该方法内部,会调用#execute(Runnable task)
方法,执行任务。调用栈如下图:invokeAny => execute 的流程
8.20 shutdown
如下是优雅关闭,我们在 TODO 1006 EventLoop 优雅关闭
#addShutdownHook(final Runnable task)
#removeShutdownHook(final Runnable task)
#runShutdownHooks()
#shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)
#shutdown()
#terminationFuture()
#isShuttingDown()
#isShutdown()
#isTerminated()
#confirmShutdown()
#awaitTermination(long timeout, TimeUnit unit)
9. SingleThreadEventLoop
io.netty.channel.SingleThreadEventLoop
,实现 EventLoop 接口,继承 SingleThreadEventExecutor 抽象类,基于单线程的 EventLoop 抽象类,主要增加了 Channel 注册到 EventLoop 上。
9.1 构造方法
/** |
- 新增了一条
tailTasks
队列,执行的顺序在taskQueue
之后。详细解析,见 《精尽 Netty 源码解析 —— EventLoop(六)之 EventLoop 处理普通任务》 。 - 构造方法比较简单,胖友自己看下就可以了。
9.2 parent
#parent()
方法,获得所属 EventLoopGroup 。代码如下:
|
- 覆盖父类方法,将返回值转换成 EventLoopGroup 类。
9.3 next
#next()
方法,获得自己。代码如下:
|
- 覆盖父类方法,将返回值转换成 EventLoop 类。
9.4 register
#register(Channel channel)
方法,注册 Channel 到 EventLoop 上。代码如下:
|
- 将 Channel 和 EventLoop 创建一个 DefaultChannelPromise 对象。通过这个 DefaultChannelPromise 对象,我们就能实现对异步注册过程的监听。
调用
#register(final ChannelPromise promise)
方法,注册 Channel 到 EventLoop 上。代码如下:
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// 注册 Channel 到 EventLoop 上
promise.channel().unsafe().register(this, promise);
// 返回 ChannelPromise 对象
return promise;
}- 在方法内部,我们就看到在 《精尽 Netty 源码分析 —— 启动(一)之服务端》 的 「3.14.3 注册 Channel 到 EventLoopGroup」 章节,熟悉的内容,调用
AbstractUnsafe#register(EventLoop eventLoop, final ChannelPromise promise)
方法,注册 Channel 到 EventLoop 上。
- 在方法内部,我们就看到在 《精尽 Netty 源码分析 —— 启动(一)之服务端》 的 「3.14.3 注册 Channel 到 EventLoopGroup」 章节,熟悉的内容,调用
9.5 hasTasks
#hasTasks()
方法,队列中是否有任务。代码如下:
|
- 基于两个队列来判断是否还有任务。
9.6 pendingTasks
#pendingTasks()
方法,获得队列中的任务数。代码如下:
|
- 计算两个队列的任务之和。
9.7 executeAfterEventLoopIteration
#executeAfterEventLoopIteration(Runnable task)
方法,执行一个任务。但是方法名无法很完整的体现出具体的方法实现,甚至有一些出入,所以我们直接看源码,代码如下:
1: |
- 第 4 至 7 行:SingleThreadEventLoop 关闭时,拒绝任务。
- 第 10 行:调用
Queue#offer(E e)
方法,添加任务到队列中。- 第 12 行:若添加失败,调用
#reject(Runnable task)
方法,拒绝任务。
- 第 12 行:若添加失败,调用
- 第 15 至 18 行:唤醒线程。
- 第 16 行:SingleThreadEventLoop 重写了
#wakesUpForTask(Runnable task)
方法。详细解析,见 「9.9 wakesUpForTask」 。
- 第 16 行:SingleThreadEventLoop 重写了
9.8 removeAfterEventLoopIterationTask
#removeAfterEventLoopIterationTask(Runnable task)
方法,移除指定任务。代码如下:
|
9.9 wakesUpForTask
#wakesUpForTask(task)
方法,判断该任务是否需要唤醒线程。代码如下:
|
- 当任务类型为 NonWakeupRunnable ,则不进行唤醒线程。
9.9.1 NonWakeupRunnable
NonWakeupRunnable 实现 Runnable 接口,用于标记不唤醒线程的任务。代码如下:
/** |
9.10 afterRunningAllTasks
#afterRunningAllTasks()
方法,在运行完所有任务后,执行 tailTasks
队列中的任务。代码如下:
protected void afterRunningAllTasks() { |
- 调用
#runAllTasksFrom(queue)
方法,执行tailTasks
队列中的所有任务。
10. NioEventLoop
io.netty.channel.nio.NioEventLoop
,继承 SingleThreadEventLoop 抽象类,NIO EventLoop 实现类,实现对注册到其中的 Channel 的就绪的 IO 事件,和对用户提交的任务进行处理。
详细解析,见 《精尽 Netty 源码解析 —— EventLoop(四)之 EventLoop 运行》 。
666. 彩蛋
自顶向下的过了下 EventLoop 相关的类和方法。因为仅涉及 EventLoop 初始化相关的内容,所以对于 EventLoop 运行相关的内容,就不得不省略了。
那么,饥渴难耐的我们,《精尽 Netty 源码解析 —— EventLoop(四)之 EventLoop 运行》 ,走起!
推荐阅读如下文章:
- 永顺 《Netty 源码分析之 三 我就是大名鼎鼎的 EventLoop(一)》 的 「NioEventLoop」 小节。
- Hypercube 《自顶向下深入分析Netty(四)—— EventLoop-2》