2024-04-04  阅读(45)
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://www.skjava.com/mianshi/baodian/detail/1752750059

回答

ChannelPipeline 是 Netty 的核心处理链,用于实现网络事件的动态编排和有序传播。它负责组织和编排各种 ChannelHandler,使他们能够有序地组织在一起,这些 ChannelHandler 用于处理具体的业务逻辑和数据加工逻辑。

每一个 Channel 在创建时都会被赋予一个新的 ChannelPipeline,这个 ChannelPipeline 包含了一系列的 ChannelHandler。当该 Channel 发生 I/O 事件后,这个事件会沿着 ChannelPipeline 中的 ChannelHandler 链进行传播。传播的顺序如下:

  • 对于入站数据(如收到数据),事件会从 ChannelPipeline 的头部开始,依次经过每个 ChannelHandler。
  • 对于出站操作(如,写数据或关闭连接),事件会从 ChannelPipeline 的尾部开始,向前传递给每个 ChannelHandler。

ChannelHandler 是 Netty 的数据加工厂,用于处理实际业务逻辑的处理器,它有两个主要的子类型:ChannelInboundHandler 用于处理入站数据和事件,ChannelOutboundHandler 用于处理出站数据和操作。

所以,ChannelPipeline 的主要作用是管理和执行 ChannelHandler,确保它们按照正确的顺序和方式进行事件处理。

扩展

ChannelPipeline 概述

pipeline 翻译为管道、流水线,在 Netty 这个大工厂中,ChannelPipeline 就像一条流水线,数据流过 ChannelPipeline,被一步一步地加工,最后得到一个成熟的工艺品。

在 Netty 中,ChannelPipeline 是 Netty 的核心处理链,用于实现网络时间的动态编排和有序传播。它负责组织和编排各种 ChannelHandler,使他们能够有序地组织在一起,但实际的数据加工还是由 ChannelHandler 处理。

ChannelPipeline 的内部结构

ChannelPipeline 可以看作是 ChannelHandler 的容器载体,它是由一组 ChannelHandler 有序地组成的双向拦截链。每当新建一个 Channel 时都会新建一个 ChannelPipeline 与之绑定,而且这种绑定关系是永久的,当该 Channel 有 I/O 读写事件发生时,数据会贯穿整个 ChannelPipeline ,由里面的 ChannelHandler 依次拦截和处理。

注:在 Netty 中,ChannelHandler 是没有这种关系,每个 ChannelHandler 是互相独立的,他们在代码实现上是不会组成一个双向链表的,双向链表是由 ChannelHandlerContext 构成的 。

我们知道 ChannelHandler 分为出站 handler 和入站 handler ,但是 ChannelPipeline 并没有将他们分开,而是将出站 handler 和入站handler 混编在一起的,当一个入站事件从 ChannelPipeline 的头部向尾部开始传播的时候,每一个 ChannelHandler 都会判断下一个 ChannelHandler 的类型是否与当前 ChannelHandler 的类型相同,如果是则将事件传播给他,不是则跳过该 ChannelHandler 传递下一个,直到找到跟他相同类型的 ChannelHandler。下图是一个入站的传播路径:

我们一般都是选择入站节点作为头部,出站节点作为尾部的。

ChannelPipeline 也提供了一些 API 用于维护该双向链。

api 描述
addLast() 将该 ChannelHandler 添加到 ChannelPipeline 的末尾
addBefore() 将该ChannelHandler 添加在指定名称的 ChannelHandler 之前
addAfter() 将该ChannelHandler 添加在指定名称的 ChannelHandler 之后
addFirst() 将该 ChannelHandler 添加到 ChannelPipeline 的第一个位置
remove() 删除指定的 ChannelHandler
replace() 替换指定的 ChannelHandler

ChannelHandlerContext

ChannelHandlerContext 用于保存 ChannelHandler 的上下文,它包含了 ChannelHandler 生命周期中的所有事件,如 connectbindreadwrite

为什么会有一个 ChannelHandlerContext 呢?其实这是一种编程思想,我认为是单一职责,一个类只做一件事。试想下 ChannelHandler 是数据加工厂,但是我们现在又要他来维护它与周边 ChannelHandler 的关系,要负责事件的传播,还要维护其生命周期,累不累啊,功能严重耦合。所以我们需要 ChannelHandlerContext 来帮助他更好地工作,它只需要做事,其余的交给 ChannelHandlerContext 了(我靠,立刻脑补了我们的 996)。

ChannelHandlerContext 他代表了 ChannelHandler 和 ChannelPipeline 之间的关联,每当有一个 ChannelHandler 被添加到 ChannelPipeline 中时都会创建一个 ChannelHandlerContext 与之关联,它维护着该 ChannelHandler 与其他 ChannelHandler(同一个 ChannelPipeline) 的之间交互。

我们在初始化 ChannelPipeline 的时候会发现,ChannelPipeline 的双向链表其实是有特定的首尾节点的,其中首节点 HeadContext,尾节点 TailContext,我们所有自定义的 ChannelHandler 节点都是唯一这两个节点的中间。

从上图我们可以看出 HeadContext 既是 InboundHandler,也是 OutboundHandler,所以读事件则是从 HeadContext 开始,写事件也是在 HeadContext 结束。而 TailContext 则只是 OutboundHandler,它会在 ChannelPipeline 调用链的最后一步执行,用于终止 Inbound 的事件传播。TailContext 作为 Outbound 事件传播的第一站,它仅仅只是将 Outbound 事件进行传递。

加入 ChannelHandlerContext 的完整图如下:

ChannelPipeline 的 API

ChannelPipeline 的 API 不仅仅有对 ChannelHandler 的维护功能,还有一些入站和出站的方法。

  • 入站 API
方法 描述
fireChannelRegistered 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的 channelRegistered 方法
fireChannelUnregistered 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的 channelUnregistered 方法
fireChannelActive 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的 channelActive 方法
fireChannelInactive 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的 channelInactive 方法
fireExceptionCaught 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的 exceptionCaught 方法
fireUserEventTriggered 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的 userEventTriggered 方法
fireChannelRead 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的 channelRead 方法
fireChannelReadComplete 调用 ChannelPipeline 中下一个 ChannelInboundHandler 的 channelReadComplete 方法

从这里就可以看出,所有的 fireXx() 方法其实就是将消息传递给下一个节点

  • 出站 API
方法 描述
bind 调用 ChannelPipeline 中下一个 ChannelOutboundHandler 的bind 方法,将 Channel 与本地地址绑定
connect 调用 ChannelPipeline 中下一个 ChannelOutboundHandler 的connect 方法,将 Channel 连接到远程节点
disconnect 调用 ChannelPipeline 中下一个 ChannelOutboundHandler 的disconnect 方法,将 Channel 与远程连接断开
close 调用 ChannelPipeline 中下一个 ChannelOutboundHandler 的close 方法,将 Channel 关闭
deregister 调用 ChannelPipeline 中下一个 ChannelOutboundHandler 的deregister 方法,将 Channel 从其对应的 EventLoop 注销
flush 调用 ChannelPipeline 中下一个 ChannelOutboundHandler 的flush 方法,将 Channel 的数据冲刷到远程节点
read 调用 ChannelPipeline 中下一个 ChannelOutboundHandler 的 read 方法,从 Channel 中读取数据
write 调用 ChannelPipeline 中下一个 ChannelOutboundHandler 的 write 方法,将数据写入 Channel
writeAndFlush 先调用 write 方法,然后调用flush方法,将数据写入并刷回远程节点

出站类的方法都是与 Channel 相关的。

ChannelPipeline 事件传播机制

记住:InboundHandler顺序执行,OutboundHandler逆序执行。

ChannelPipeline 将 ChannelHandler 编排好后,就响应等待 I/O 事件了,但是这个事件是如何传播的呢?大明哥通过一个例子来跟你细说。首先我们需要先构造一个如下图的传播链。

代码如下:

public class ChannelPipelineTest_01_server {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        // 这里一定要按照顺序来添加
                        pipeline.addLast(new InboundHandler("InboundHandler-1",false));
                        pipeline.addLast(new InboundHandler("InboundHandler-2",false));
                        pipeline.addLast(new OutboundHandler("OutboundHandler-1"));
                        pipeline.addLast(new OutboundHandler("OutboundHandler-2"));
                        pipeline.addLast(new InboundHandler("InboundHandler-3",true));
                    }
                })
                .bind(8081);
    }

    private static class InboundHandler extends ChannelInboundHandlerAdapter {
        // handler 的名称
        private String handlerName;

        // 是否写数据
        private Boolean flushFlag;

        public InboundHandler(String handlerName,Boolean flushFlag) {
            this.handlerName = handlerName;
            this.flushFlag = flushFlag;
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("InboundHandler :" + handlerName);
            if (!flushFlag) {
                // 不需要写数据,传递给下一个节点
                ctx.fireChannelRead(msg);
            } else {
                // 写数据,则调用 channel.writeAndFlush()
                System.out.println("==============================");
                ctx.channel().writeAndFlush(msg);
            }
        }
    }

    private static class OutboundHandler extends ChannelOutboundHandlerAdapter {
        // handler 的名称
        private String handlerName;

        public OutboundHandler(String handlerName) {
            this.handlerName = handlerName;
        }

        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            System.out.println("OutboundHandler :" + handlerName);
            super.write(ctx,msg,promise);
        }
    }
}

执行结果如下:

InboundHandler :InboundHandler-1
InboundHandler :InboundHandler-2
InboundHandler :InboundHandler-3
==============================
OutboundHandler :OutboundHandler-2
OutboundHandler :OutboundHandler-1

由执行结果可见,Inbound 事件是有 Head —> Tail,而 Outbound 事件则是由 Tail —> Head,两者传播方向恰好相反。上面例子的运行流程如下:

红色为 Inbound 事件的响应路径,紫色为 Outbound 事件的响应路径。

在 InboundHandler 中,有段代码我们需要注意:

if (!flushFlag) {
    // 不需要写数据,传递给下一个节点
    ctx.fireChannelRead(msg);
} else {
    // 写数据,则调用 channel.writeAndFlush()
    System.out.println("==============================");
    ctx.channel().writeAndFlush(msg);
}

这了有小伙伴会有疑问,为什么传递到下一个节点的时候是调用 ChannelHandlerContext.fireChannelRead(),而写数据的时候调用的是 Channel.writeAndFlush(),其实如果小伙伴去看 API 的时候会发现 ChannelHandlerContext 也是有 writeAndFlush() 的,但是为什么不使用 ChannelHandlerContext 的,而使用 的呢?其实这正是 ChannelHandlerContext 与 Channel 或者 ChannelPipeline 的区别:

  • Channel 或 ChannelPipeline 的方法其影响是会沿着整个 ChannelPipeline 进行传播。
  • 而 ChannelHandlerContext 方法则是从与其相关联的 ChannelHandler 开始,且只会传播给该 ChannelPipeline 种下一个能处理该事件的 ChannelHandler。

有兴趣的小伙伴,可以将代码调整下:

ctx.channel().writeAndFlush(msg);
调整为
ctx.writeAndFlush(msg);

然后再运行下,大明哥就不再演示了。

阅读全文
  • 点赞