1. 概述
本文我们来分享,添加 ChannelHandler 到 pipeline 中的代码具体实现。
在 《精尽 Netty 源码解析 —— ChannelPipeline(一)之初始化》 中,我们看到 ChannelPipeline 定义了一大堆添加 ChannelHandler 的接口方法:
ChannelPipeline addFirst(String name, ChannelHandler handler); |
- 考虑到实际当中,我们使用
#addLast(ChannelHandler... handlers)
方法较多,所以本文只分享这个方法的具体实现。
2. addLast
#addLast(ChannelHandler... handlers)
方法,添加任意数量的 ChannelHandler 对象。代码如下:
|
<1>
处,调用#addLast(EventExecutorGroup group, String name, ChannelHandler handler)
方法,添加一个 ChannelHandler 对象到 pipeline 中。
#addLast(EventExecutorGroup group, String name, ChannelHandler handler)
方法,代码如下:
1: |
- 第 5 行:
synchronized
同步,为了防止多线程并发操作 pipeline 底层的双向链表。 - 第 7 行:调用
#checkMultiplicity(ChannelHandler)
方法,校验是否重复的 ChannelHandler 。详细解析,见 「3. checkMultiplicity」 。 - 第 11 行:调用
#filterName(String name, ChannelHandler handler)
方法,获得 ChannelHandler 的名字。详细解析,见 「4. filterName」 。 - 第 11 行:调用
#newContext(EventExecutorGroup group, String name, ChannelHandler handler)
方法,创建 DefaultChannelHandlerContext 节点。详细解析,见 「5. newContext」 。 - 第 14 行:
#addLast0(AbstractChannelHandlerContext newCtx)
方法,添加到最后一个节点。详细解析,见 「6. addLast0」 。 - ========== 后续分成 3 种情况 ==========
<1>
- 第 20 行:Channel 并未注册。这种情况,发生于 ServerBootstrap 启动的过程中。在
ServerBootstrap#init(Channel channel)
方法中,会添加 ChannelInitializer 对象到 pipeline 中,恰好此时 Channel 并未注册。 - 第 22 行:调用
AbstractChannelHandlerContext#setAddPending()
方法,设置 AbstractChannelHandlerContext 准备添加中。 - 第 24 行:调用
#callHandlerCallbackLater(AbstractChannelHandlerContext, added)
方法,添加 PendingHandlerAddedTask 回调。在 Channel 注册完成后,执行该回调。详细解析,见 「8. PendingHandlerCallback」 。 <2>
- 第 30 行:不在 EventLoop 的线程中。
- 第 32 行:调用
AbstractChannelHandlerContext#setAddPending()
方法,设置 AbstractChannelHandlerContext 准备添加中。 - 第 34 至 39 行:提交 EventLoop 中,调用
#callHandlerAdded0(AbstractChannelHandlerContext)
方法,执行回调 ChannelHandler 添加完成( added )事件。详细解析,见 「7. callHandlerAdded0」 。 <3>
- 这种情况,是
<2>
在 EventLoop 的线程中的版本。也因为此,已经确认在 EventLoop 的线程中,所以不需要在synchronized
中。 - 第 45 行:和【第 37 行】的代码一样,调用
#callHandlerAdded0(AbstractChannelHandlerContext)
方法,执行回调 ChannelHandler 添加完成( added )事件。
3. checkMultiplicity
#checkMultiplicity(ChannelHandler handler)
方法,校验是否重复的 ChannelHandler 。代码如下:
private static void checkMultiplicity(ChannelHandler handler) { |
- 在 pipeline 中,一个创建的 ChannelHandler 对象,如果不使用 Netty
@Sharable
注解,则只能添加到一个 Channel 的 pipeline 中。所以,如果我们想要重用一个 ChannelHandler 对象( 例如在 Spring 环境中 ),则必须给这个 ChannelHandler 添加@Sharable
注解。
例如,在 Dubbo 的 com.alibaba.dubbo.remoting.transport.netty.NettyHandler
处理器,它就使用了 @Sharable
注解。
4. filterName
#filterName(String name, ChannelHandler handler)
方法,获得 ChannelHandler 的名字。代码如下:
private String filterName(String name, ChannelHandler handler) { |
<1>
处,若未传入默认的名字name
,则调用#generateName(ChannelHandler)
方法,根据 ChannelHandler 生成一个唯一的名字。详细解析,见 「4.1 generateName」 。<2>
处,若已传入默认的名字name
,则调用#checkDuplicateName(String name)
方法,校验名字唯一。详细解析,见 「4.2 checkDuplicateName」 。
4.1 generateName
#generateName(ChannelHandler)
方法,根据 ChannelHandler 生成一个唯一名字。代码如下:
1: private String generateName(ChannelHandler handler) { |
- 第 2 至 5 行:从缓存
nameCaches
中,查询是否已经生成默认名字。- 若未生成过,调用
#generateName0(ChannelHandler)
方法,进行生成。而后,添加到缓存nameCaches
中。
- 若未生成过,调用
第 15 行:调用
#context0(String name)
方法,判断是否存在相同名字的节点。代码如下:private AbstractChannelHandlerContext context0(String name) {
AbstractChannelHandlerContext context = head.next;
// 顺序向下遍历节点,判断是否有指定名字的节点。如果有,则返回该节点。
while (context != tail) {
if (context.name().equals(name)) {
return context;
}
context = context.next;
}
return null;
}- 顺序向下遍历节点,判断是否有指定名字的节点。如果有,则返回该节点。
- 第 15 至 25 行:若存在相同名字的节点,则使用基础名字 + 编号,循环生成,直到一个名字是唯一的,然后结束循环。
4.2 checkDuplicateName
#checkDuplicateName(String name)
方法,校验名字唯一。代码如下:
private void checkDuplicateName(String name) { |
- 通过调用
#context0(String name)
方法,获得指定名字的节点。若存在节点,意味着不唯一,抛出 IllegalArgumentException 异常。
5. newContext
#newContext(EventExecutorGroup group, String name, ChannelHandler handler)
方法,创建 DefaultChannelHandlerContext 节点。而这个节点,内嵌传入的 ChannelHandler 参数。代码如下:
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) { |
<1>
处,调用#childExecutor(EventExecutorGroup group)
方法,创建子执行器。代码如下:private EventExecutor childExecutor(EventExecutorGroup group) {
// <1> 不创建子执行器
if (group == null) {
return null;
}
// <2> 根据配置项 SINGLE_EVENTEXECUTOR_PER_GROUP ,每个 Channel 从 EventExecutorGroup 获得不同 EventExecutor 执行器
Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
if (pinEventExecutor != null && !pinEventExecutor) {
return group.next();
}
// <3> 通过 childExecutors 缓存实现,一个 Channel 从 EventExecutorGroup 获得相同 EventExecutor 执行器
Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
if (childExecutors == null) {
// Use size of 4 as most people only use one extra EventExecutor.
childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
}
// Pin one of the child executors once and remember it so that the same child executor
// is used to fire events for the same channel.
EventExecutor childExecutor = childExecutors.get(group);
// 缓存不存在,进行 从 EventExecutorGroup 获得 EventExecutor 执行器
if (childExecutor == null) {
childExecutor = group.next();
childExecutors.put(group, childExecutor); // 进行缓存
}
return childExecutor;
}- 一共有三种情况:
<1>
,当不传入 EventExecutorGroup 时,不创建子执行器。即,使用 Channel 所注册的 EventLoop 作为执行器。对于我们日常使用,基本完全都是这种情况。所以,下面两种情况,胖友不理解也是没关系的。<2>
,根据配置项ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP
,每个 Channel 从 EventExecutorGroup 获得不同 EventExecutor 执行器。<3>
,通过childExecutors
缓存实现,每个 Channel 从 EventExecutorGroup 获得相同 EventExecutor 执行器。是否获得相同的 EventExecutor 执行器,这就是<2>
、<3>
的不同。
- 一共有三种情况:
- 注意,创建的是 DefaultChannelHandlerContext 对象。
6. addLast0
#addLast0(AbstractChannelHandlerContext newCtx)
方法,添加到最后一个节点。注意,实际上,是添加到 tail
节点之前。代码如下:
private void addLast0(AbstractChannelHandlerContext newCtx) { |
FROM 闪电侠 《Netty 源码分析之pipeline(一)》
7. callHandlerAdded0
#callHandlerAdded0(AbstractChannelHandlerContext)
方法,执行回调 ChannelHandler 添加完成( added )事件。代码如下:
1: private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { |
- 第 6 行:调用
AbstractChannelHandlerContext#setAddComplete()
方法,设置 AbstractChannelHandlerContext 已添加。 - 第 8 行:调用
ChannelHandler#handlerAdded(AbstractChannelHandlerContext)
方法,回调 ChannelHandler 添加完成( added )事件。一般来说,通过这个方法,来初始化 ChannelHandler 。注意,因为这个方法的执行在 EventLoop 的线程中,所以要尽量避免执行时间过长。 - 第 9 行:发生异常。
- 第 10 至 24 行:移除该节点( ChannelHandler )。详细解析,见 《精尽 Netty 源码解析 —— ChannelPipeline(三)之移除 ChannelHandler》 。
- 😈 所以,
ChannelHandler#handlerAdded(AbstractChannelHandlerContext)
方法的执行异常时,将被移除。
- 😈 所以,
- 第 26 至 35 行:触发异常的传播。详细解析,见 《精尽 Netty 源码解析 —— ChannelPipeline(六)之异常事件的传播》 。
- 第 10 至 24 行:移除该节点( ChannelHandler )。详细解析,见 《精尽 Netty 源码解析 —— ChannelPipeline(三)之移除 ChannelHandler》 。
8. PendingHandlerCallback
PendingHandlerCallback ,实现 Runnable 接口,等待添加 ChannelHandler 回调抽象类。代码如下:
PendingHandlerCallback 是 DefaultChannelPipeline 的内部静态类。
private abstract static class PendingHandlerCallback implements Runnable { |
- 通过
ctx
和next
字段,形成回调链。 #execute()
抽象方法,通过实现它,执行回调逻辑。
为什么会有 PendingHandlerCallback 呢?
因为 ChannelHandler 添加到 pipeline 中,会触发 ChannelHandler 的添加完成( added )事件,并且该事件需要在 Channel 所属的 EventLoop 中执行。
但是 Channel 并未注册在 EventLoop 上时,需要暂时将“触发 ChannelHandler 的添加完成( added )事件”的逻辑,作为一个 PendingHandlerCallback 进行“缓存”。在 Channel 注册到 EventLoop 上时,进行回调执行。
PendingHandlerCallback 有两个实现类:
- PendingHandlerAddedTask
- PendingHandlerRemovedTask
本文只分享 PendingHandlerAddedTask 的代码实现。
8.1 PendingHandlerAddedTask
PendingHandlerAddedTask 实现 PendingHandlerCallback 抽象类,用于回调添加 ChannelHandler 节点。代码如下:
private final class PendingHandlerAddedTask extends PendingHandlerCallback { |
- 在
#execute()
实现方法中,我们可以看到,和#addLast(EventExecutorGroup group, String name, ChannelHandler handler)
方法的【第 28 至 45 行】的代码比较类似,目的是,在 EventLoop 的线程中,执行#callHandlerAdded0(AbstractChannelHandlerContext)
方法,回调 ChannelHandler 添加完成( added )事件。 <1>
处,为什么 PendingHandlerAddedTask 可以直接提交到 EventLoop 中呢?因为 PendingHandlerAddedTask 是个 Runnable ,这也就是为什么 PendingHandlerCallback 实现 Runnable 接口的原因。
老艿艿:下面开始分享的方法,属于 DefaultChannelPipeline 类。
8.2 callHandlerCallbackLater
#callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added)
方法,添加 PendingHandlerCallback 回调。代码如下:
/** |
added
方法参数,表示是否是添加 ChannelHandler 的回调。所以在【第 5 行】的代码,根据added
是否为true
,创建 PendingHandlerAddedTask 或 PendingHandlerRemovedTask 对象。在本文中,当然创建的是 PendingHandlerAddedTask 。- 第 7 至 17 行:将创建的 PendingHandlerCallback 对象,“添加”到
pendingHandlerCallbackHead
中。
8.3 invokeHandlerAddedIfNeeded
#invokeHandlerAddedIfNeeded()
方法,执行在 PendingHandlerCallback 中的 ChannelHandler 添加完成( added )事件。它被两个方法所调用:
AbstractUnsafe#register0(ChannelPromise promise)
方法HeadContext#channelRegistered(ChannelHandlerContext ctx)
方法。
#invokeHandlerAddedIfNeeded()
方法,代码如下:
/** |
<1>
处,仅有首次注册有效(firstRegistration = true
) 时。而后,标记firstRegistration = false
。- 这也就是笔者为什么说,
HeadContext#channelRegistered(ChannelHandlerContext ctx)
方法对这个方法的调用,是没有效果的。
- 这也就是笔者为什么说,
<2>
处,调用#callHandlerAddedForAllHandlers()
方法,执行在 PendingHandlerCallback 中的 ChannelHandler 添加完成( added )事件。代码如下:1: private void callHandlerAddedForAllHandlers() {
2: final PendingHandlerCallback pendingHandlerCallbackHead;
3: // 获得 pendingHandlerCallbackHead
4: synchronized (this) {
5: assert !registered;
6:
7: // This Channel itself was registered.
8: registered = true; // 标记已注册
9:
10: pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
11: // Null out so it can be GC'ed.
12: this.pendingHandlerCallbackHead = null; // 置空,help gc
13: }
14:
15: // 顺序向下,执行 PendingHandlerCallback 的回调
16: // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
17: // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
18: // the EventLoop.
19: PendingHandlerCallback task = pendingHandlerCallbackHead;
20: while (task != null) {
21: task.execute();
22: task = task.next;
23: }
24: }- 第 3 至 13 行:获得
pendingHandlerCallbackHead
变量。- 第 8 行:标记
registered = true
,表示已注册。 - 第 10 至 12 行:置空对象的
pendingHandlerCallbackHead
属性,help GC 。 - 使用
synchronized
的原因,和#addLast(EventExecutorGroup group, String name, ChannelHandler handler)
的【第 16 至 26 行】的代码需要对pendingHandlerCallbackHead
互斥,避免并发修改的问题。
- 第 8 行:标记
- 第 15 至 23 行:顺序循环向下,调用
PendingHandlerCallback#execute()
方法,执行 PendingHandlerCallback 的回调,从而将 ChannelHandler 添加到 pipeline 中。- 这里不适用
synchronized
的原因,看英文注释哈。
- 这里不适用
- 第 3 至 13 行:获得
666. 彩蛋
添加 ChannelHandler 到 pipeline 中的代码,大部分的比较简单。比较复杂的可能是,「8. PendingHandlerCallback」 中,调用的过程涉及回调,所以理解上稍微可能困难。胖友可以多多调试进行解决噢。
推荐阅读文章: