多图详解 Netty

什么是 Netty

简单来说 Netty 就是 JBOSS 开源的一个基于 NIO 的网络编程框架。它可以帮助我们快速开发高性能高可靠性的网络 IO 程序。

Netty 在 Java 语言中使用非常广泛,涉及到网络通信的基本上都使用 Netty,很少会直接去使用原生的 NIO 组件或者是其他框架。并且像Dubbo、RocketMQ、Zookeeper、ElasticSearch 这些知名的中间件所使用的网络通讯框架都是基于 Netty 去实现的。

Netty 是在原生 NIO 的基础上发展起来的框架,其中的许多理念都非常像,所以学习 Netty 前需要了解一下原生 NIO 编程。

原生 NIO 编程

在了解原生 NIO 编程之前需要了解一个基础概念 Socket。

Socket

Netty 是基于 TCP 协议的,我们知道 TCP 协议三个重要的特点分别是面向连接、可靠的和字节流。要达成这三点建立连接时需要客户端与服务端达成三个信息的共享,分别是:

  • Socket:包含五个信息:连接使用的协议、本地主机 IP 地址和端口号、远程主机的 IP 地址和端口号
  • 序列号:解决乱序问题
  • 容器大小:用来做流量控制

Socket 就是两台主机之间的逻辑连接的端点,TCP 所说的面向连接,指的就是面向客户端和服务端两个 Socket 之间的连接。

这里要注意的是,服务端会涉及到两种 socket,一种叫做监听 socket ,一种叫做已完成连接 socket 。当监听 Socket 发现连接成功了之后会返回一个已完成连接 socket 文件描述符,用于后续传输数据。

原生 NIO 组件

Netty 底层其实用了很多 Java 原生的 NIO 的组件,Netty 自定义的组件中有些理念也来自于原生的 NIO 组件。因此学习 Netty 之前需要了解一下原生的 NIO 组件的一些知识。

这里主要讲三个非常重要的组件:Channel (通道)、Buffer (缓冲区)、Selector (选择器)。

下图展示了这三个组件在 NIO 模型中发挥的作用:

Buffer (缓冲区)

Buffer 本质上就是一块可以读写数据的内存块,我们在使用的时候可以把它理解成一个数组。

下图是 Buffer 各个类的继承关系:

这里着重讲一下 ByteBuffer ,ByteBuffer 在原生 NIO 编程时使用频率是最高的。下面主要讲一下它的使用。

注意 ByteBuffer 初始化时其实是 创建并返回了一个它的子类 HeapByteBuffer 对象,我们操作的也是它的子类。

首先是初始化,初始化主要通过两种方式:

  • **allocate(int capacity)**:创建 byte 类型的指定长度的缓冲区;
  • wrap(byte[] array):创建 byte 类型的有内容的缓冲区。

在学习数据操作之前,有几个 ByteBuffer 非常重要的参数和方法需要了解一下:

  • position:当前读取或写入的起始坐标;
  • limite:最多可以操作到哪个索引;
  • capacity:缓冲区的总长度;
  • remaining():这个方法返回的是 limit - position 的计算值,代表还有多少空间可以操作。

数据操作主要是两个方法:

  • put():插入字节,它是一个重载方法,可以传入不同形式的字节;
  • get():读取字节,不传参获取 position 位置的字节并让 position + 1,也可以通过参数读取指定位置的字节。

下图是添加字节时各属性值的变化:

ByteBuffer 虽然即支持读也支持写,但同一时间只能是其中一种模式,模式切换需要调用相应的方法。

下图是调用 flip() 方法将写模式切换为读时各属性的变化:

下图调用 clear() 方法将读切换为写时各属性的变化:

Channel (通道)

通常来说 NIO 所有的操作都是由通道开始的,它跟我们平常使用的流(InputStream,OutputStream)有点类似。但也有些区别:

  • 通道可以读也可以写,流是单向的,所以需要输入流输出流;
  • 通道可以异步读写
  • 通道总是基于缓冲区来读写(将数据从通道读取到 buffer 或者将数据以 buffer 的形式写入到通道)

下图是 Channel 的继承关系:

常用的 Channel 主要有四种:

  • FileChannel:用于文件数据的读写;
  • DatagramChannel:用于 UDP 数据的读写;
  • ServerSocketChannel 和 SocketChannel:用于 TCP 数据的读写,前者代表服务端的通道,后者代表客户端。

使用 ServerSocketChannel 和 SocketChannel 进行 NIO 编程与直接使用 ServerSocket 和 Socket 类似,这里就不赘述了。

Selector (选择器)

Selector 是多路复用器的一种,虽然它的性能不是最好的,但它几乎在所有平台上都支持,具有良好的跨平台性。

Selector 是实现一个线程处理多个客户端请求的核心组件, Channel 注册到 Selector 上之后,如果有就绪事件产生,Selector 就会去获取事件然后针对事件进行相应的处理。

Selector 常用方法如下:

  • open() :静态方法,获取一个选择器对象;
  • select():调用后阻塞线程,阻塞期间会监控所有注册的通道,当有就绪事件需要操作时,会将 SelectionKey 放入集合并返回事件数量;
  • select(1000):只阻塞 1000 毫秒,阻塞期间与上面的方法相同;
  • selectedKeys():返回集合中保存的全部 SelectionKey 。

这些方法多次提到了 SelectionKey ,那么 SelectionKey 是什么呢?

SelectionKey 就是用来描述各种就绪事件的类,通过它能获取到当前的就绪事件类型。

SelectionKey 通过 4 个常量来定义 4 种不同的就绪事件:

  • OP_READ:值为 1 << 0,读就绪事件,表示通道中有可读数据,可以执行读操作;
  • OP_WRITE:值为 1 << 2,写就绪事件,表示可以向通道写数据了;
  • OP_CONNECT:值为 1 << 3,连接就绪事件,代表客户端与服务器连接已经建立成功了;
  • OP_ACCEPT: = 1 << 4,接收连接就绪事件,表示服务器监听到了客户端连接。

SelectionKey 通过以下 4 个静态方法判断当前是否是对应的就绪事件:

  • isReadable():是否是读就绪事件;
  • isWritable():是否是写就绪事件;
  • isConnectable():是否是连接就绪事件;
  • isAcceptable():是否是接收连接就绪事件。

原生 NIO 组件编程示例

下面是使用 Selector 、Channel 和 ByteBuffer 进行 NIO 编程的示例。

服务器端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package com.zephyr.selector;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;

/**
* 服务端-选择器
*/
public class NIOSelectorServer {
public static void main(String[] args) throws IOException {
//打开一个服务端通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//绑定对应的端口号
serverSocketChannel.bind(new InetSocketAddress(9999));
//通道默认是阻塞的,需要设置为非阻塞
serverSocketChannel.configureBlocking(false);
//创建选择器
Selector selector = Selector.open();
//将服务端通道注册到选择器上,并指定注册监听的事件为OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务端启动成功...");
while (true) {
//检查选择器是否有事件
int select = selector.select(2000);
if (select == 0) {
continue;
}
//获取事件集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
//判断事件是否是客户端连接事件 SelectionKey.isAcceptable()
SelectionKey key = iterator.next();
//得到客户端通道,并将通道注册到选择器上, 并指定监听事件为OP_READ
if (key.isAcceptable()) {
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("客户端已连接......" + socketChannel);
//必须设置通道为非阻塞, 因为selector需要轮询监听每个通道的事件
socketChannel.configureBlocking(false);
//并指定监听事件为OP_READ
socketChannel.register(selector, SelectionKey.OP_READ);
}
//判断是否是客户端读就绪事件SelectionKey.isReadable()
if (key.isReadable()) {
//得到客户端通道,读取数据到缓冲区
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(byteBuffer);
if (read > 0) {
System.out.println("客户端消息:" +
new String(byteBuffer.array(), 0, read,
StandardCharsets.UTF_8));
//给客户端回写数据
socketChannel.write(ByteBuffer.wrap("yo yo yo, hi man".getBytes(StandardCharsets.UTF_8)));
socketChannel.close();
}
}
//从集合中删除对应的事件, 因为防止二次处理.
iterator.remove();
}
}
}
}

客户端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.zephyr.selector;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;

/**
* 客户端
*/
public class NIOClient {
public static void main(String[] args) throws IOException {
//打开通道
SocketChannel socketChannel = SocketChannel.open();
//设置连接IP和端口号
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
//写出数据
socketChannel.write(ByteBuffer.wrap("What's up.".getBytes(StandardCharsets.UTF_8)));
//读取服务器写回的数据
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int read=socketChannel.read(readBuffer);
System.out.println("服务端消息:" + new String(readBuffer.array(), 0, read, StandardCharsets.UTF_8));
//释放资源
socketChannel.close(); }
}

为什么需要 Netty

上面讲了原生 NIO 相关的知识,那么问题就来了,既然原生就有完备的 NIO 编程的各个组件,为什么还需要 Netty 呢。

主要原因还是因为原生 NIO 存在一些弊端:

  • NIO 的类库和 API 繁杂:开发者需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等原生组件;
  • 有一定的门槛:必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序;
  • 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等;
  • JDK NIO 的 Bug:臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。

而 Netty 这个框架就很好地解决了这些问题,前三个比较好理解,简单讲一下第 4 个问题是怎么被解决的。

第 4 个问题讲到了 Selector 空轮询的 Bug,那么,什么是空轮询呢?

空轮询是指本来 Selector 调用 select() 方法如果没有就绪事件在设置的时间到之前是阻塞的,但由于 Linux 底层实现有问题,导致在没有就绪事件时也有概率直接返回,而 select() 方法一般都是放在 while (true) 循环里的,这时就会开始不断地空轮询,直到 CPU 使用率飙到 100% 。

Netty 解决这个问题主要分别两步:

  • 检测空轮询:判断阻塞时间小于 timeoutMillis (初始化的超时参数),且 select 执行次数大于阈值;
  • 重建 Selector :新创建一个 Selector 并把旧 Selector 的 Channel 注册到这个 Selector 上,然后关闭这个 Selector;

Netty 线程模型

接着我们学习一下 Netty 的线程模型,了解了 Netty 的线程模型之后我们对 Netty 的整体架构也就有了一个大致的了解。

由于 Netty 的线程模型是基于 Reactor 模型改进而来的,因此先讲讲 Reactor 模型,有助于我们对 Netty 线程模型的理解 。

Reactor 模型

Reactor 模型是指当服务器接收到多个请求时,服务器程序会把它们分派到不同的方法或线程去处理。Reactor 模式也被称作 Dispatcher 模式。它的核心是多路复用器,多路复用器收到事件后会进行分发,这点是网络服务器高并发的关键。

Reactor 模型分为三种:单 Reactor 单线程、单 Reactor 多线程和多 Reactor 多线程。

这三种模型按顺序来看理解起来复杂度不断提升,也会更接近 Netty 的线程模型,下面来分别看看这三种模型。

单 Reactor 单线程

这个最好理解,只有一个线程,只是会把建立连接和处理请求这两种任务分发给不同的类去处理,如下图所示:

整个流程简单来讲就是 Reactor 通过 Selector 监听事件,收到事件使用 dispatch 对事件进行分发,如果是连接事件就由 Acceptor 进行处理,处理完成会创建一个 Handler 对后续业务进行处理。后面的数据请求都会由 Handler 进行处理。

优点:

  • 模型简单,不会有多线程的那些问题

缺点:

  • 性能问题:单线程无法发挥多核 CPU 的性能
  • 可靠性问题:处理业务时往往容易出问题,当 Handler 出问题了,由于只有一个线程,整个节点也挂了

单 Reactor 多线程

这个线程模型针对前面的问题作出了一定的优化,多出了处理业务的线程池,如下图所示:

前面的流程与单 Reactor 单线程是一致的,到 Handler 这一步就不一样了。这个模型 Handler 只负责读取数据和发送数据部分,业务处理交给了 Worker 线程,而 Worker 线程是由 Worker 线程池统一管理的。

优点:

  • 可以充分利用多核 CPU 的处理能力

缺点:

  • 多线程资源共享和访问处理会比较复杂,在主线程处理所有的连接、监听和响应也会出现性能瓶颈

主从 Reactor 多线程

主从 Reactor 多线程模型又在前面的模型基础上做了进一步优化,增加了子 Reactor ,如下图所示:

整个流程大概可以分为以下几步:

  • 主线程的 MainReactor 负责监听连接请求,收到连接请求会由 Acceptor 进行处理,成功建立连接之后 MainReactor 会把连接分派给 SubReactor ,由 SubReactor 监听和处理数据请求;
  • SubReactor 监听到数据请求,会派发给 Handler 处理,Handler 只会处理读取数据和发送数据部分,中间业务处理部分也是放在线程池中完成。

优点:

  • MainReactor 与 SubReactor 职责分明,一个处理连接事件,一个处理数据请求;
  • MainReactor 与 SubReactor 交互逻辑比较简单,MainReactor 单向地将建立好的连接传递出去;
  • 多 Reactor 设计能在高并发场景拥有更好的性能。

缺点:

  • 编程复杂度较高

主从 Reactor 多线程模式是业界非常成熟的服务器程序设计模式,在很多中间件中都使用到了这种模式,像 Nginx、Memcached、Netty 等。这种模式也被称为 1 + M + N 模式,分别代指相对少的连接线程(不一定为 1 ),多个 I/O 线程和多个业务处理线程。

Netty 线程模型

Netty 线程模型是基于主从 Reactor 多线程模型优化而来的,整体架构如下图所示:

Netty 的线程模型主要分为两部分,分别是 BossGroup 和 WorkerGroup,它们都分别管理一个或多个 NioEventLoop。每个 NioEventLoop 对应着一个线程,一个 Selector,一个 Executor 和一个 TaskQueue。

NioEventLoop 可以理解成一个事件循环,当程序启动后每个 NioEventLoop 都会通过 Executor 启动一个线程,开始执行事件循环,在循环中 Selector 会通过 select 方法阻塞并监听就绪事件,当有事件到来时通过 processSeelectedKeys 方法处理 Selector 事件,之后再通过 runAllTasks 方法处理其他的任务。

与前面介绍的 主从 Reactor 多线程模型类似,BossGoup 负责连接事件,当建立连接之后会生成一个 NioSocketChannel 并注册到 WorkGroup 其中一个 NioEventLoop 的 Selector 上。WokerGroup 中的 NioEventLoop 负责处理数据请求,当请求到来时会调用 processSelectedKeys 方法,其中的业务处理会依次经过 Pipeline 中的多个 Handler。

Netty 编程

学习完 Netty 线程模型,我们来看一下使用 Netty 写出来的程序大概是什么样的。

服务端代码

Nettry 服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class NettyServer {

public static void main(String[] args) throws InterruptedException {
// 创建 BossGroup
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 创建 WorkerGroup
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 创建服务器启动类
ServerBootstrap bootstrap = new ServerBootstrap();
// 添加配置
bootstrap.group(bossGroup, workerGroup) // 设置 BossGroup 和 ChildGroup
.channel(NioServerSocketChannel.class) // 设置 Channel 具体类
.option(ChannelOption.SO_BACKLOG, 128) // 设置连接队列
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) // 设置开启保活机制
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 把自定义 Handler 添加到 pipeline
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
// 绑定端口号
ChannelFuture channelFuture = bootstrap.bind(new InetSocketAddress(9999)).sync();
System.out.println("服务器启动成功!");
// 阻塞直到通道关闭
channelFuture.channel().closeFuture().sync();
// 优雅地关闭 BossGroup
bossGroup.shutdownGracefully();
// 优雅地关闭 WorkerGroup
workerGroup.shutdownGracefully();
}

}

自定义服务器端 ChannelHandler 代码,只列出了主要几个方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class NettyServerHandler implements ChannelInboundHandler {

@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
ByteBuf byteBuf = (ByteBuf) o;
System.out.println(byteBuf.toString(CharsetUtil.UTF_8));
Channel channel = channelHandlerContext.pipeline().channel();
System.out.println(channel);
}

@Override
public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("这是服务器的响应信息...".getBytes(CharsetUtil.UTF_8)));
}

@Override
public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
System.out.println("通道注册");
}

...

}

客户端代码

Netty 客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class NettyClient {

public static void main(String[] args) throws InterruptedException {
// 创建 EventLoopGroup
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
// 创建启动类
Bootstrap bootstrap = new Bootstrap();
// 设置参数
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class) // 设置 Channel 的类
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 添加自定义 Handler
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
// 连接服务器
ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 9999)).sync();
System.out.println("客户端启动成功!");
// 阻塞直到通道判断
channelFuture.channel().closeFuture().sync();
// 优雅地关闭 EventLoopGroup
eventLoopGroup.shutdownGracefully();
}

}

自定义客户端 ChannelHandler 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class NettyClientHandler implements ChannelInboundHandler {

@Override
public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("这是客户端发来的消息", CharsetUtil.UTF_8));
}

@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
ByteBuf byteBuf = (ByteBuf) o;
System.out.println(byteBuf.toString(CharsetUtil.UTF_8));
}

@Override
public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
System.out.println("通道注册");
}

...

}

如果对原生 NIO 编程比较熟悉理解上面的代码应该比较容易,同时也能看出使用 Netty 框架编程的难度是远远小于原生 NIO 的。

下面我们就详细了解一下上面代码涉及的这些 Netty 组件。

Netty 的核心组件

ChannelHandler

ChannelHandler 是一个接口,继承于它的两个接口 ChannelInboundHandler 和 ChannelOutboundHandler 定义了很多事件处理方法,我们可以通过实现这些方法或者重写子类的方法的来实现相应的业务逻辑。

ChannelHandler 的继承关系如图所示:

如果通过实现上述接口来开发,需要实现的方法中常用的有以下几个:

  • public void channelActive(ChannelHandlerContext ctx) 通道就绪事件;
  • public void channelRead(ChannelHandlerContext ctx, Object msg) 通道读取数据事件;
  • public void channelReadComplete(ChannelHandlerContext ctx) 数据读取完毕事件;
  • public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 通道发生异常事件。

但一般开发中自定义 Handler 会直接继承 SimpleChannelInboundHandler ,我们自己必须要实现的就只有

protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) 这个方法,这种开发方式在继承的时候传入泛型指定出入站消息类型,配合编解码器使用会非常的方便。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
public class NettyChatRoomServerHandler extends SimpleChannelInboundHandler<String> {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel active");
}

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
System.out.println(msg);
}
}

ChannelHandlerContext

ChannelHandlerContext 是 ChannelHandler 的上下文,它的核心就是 ChannelHandler ,它同时也保存了 Channel、Pipeline、Executor (NioEventLoop) 等信息。

它的继承关系如下图所示:

Netty 中的 Context 分为三种: HeadContext 、TailContext 和 DefaultChannelHandlerContext 。

HeadContext 和 TailContext 比较特殊,它既是 ChannelHandlerContext 也是 ChannelHandler (实现了 Handler 的接口)。

我们通过 ChannelPipeline 的 addLast() 方法添加的 Handler 都会封装成 DefaultChannelHandlerContext 。

ChannelPipeline

ChannelPipeline 是一个接口,我们平常编程用到的一般是它的实现类 DefaultChannelPipeline 。

Pipeline 队列

DefaultChannelPipline 其实就是一个管道,它维护了一个 ChannelHandlerContext 的双链表队列。

在 Pipeline 初始化时会创建头节点和尾节点,它们的类型分别是 HeadContext 和 TailContext,所以整个链表至少有两个节点。

中间的节点类型都是 DefaultChannelHandlerContext 。

链表如图所示:

ChannelHandler 的传递性

前面说过 Handler 分为 InboundHander 和 OutboundHandler ,消息入站时只会访问 InboundHander ,消息出站时只会访问 OutboundHander 。如果既是 InboundHandler 又是 OutboundHandler 出站入站都会访问。

而 InboundHandler 与 OutboundHandler 都具有传递性,不过传递方法有些区别:

  • InboundHander 是向后传递,需要调用 ChannelHandlerContext 的 fireChannel…() ,比如如果是传递 ChannelRead() 方法就要调用 fireChannelRead() ,那么下一个节点的 ChannelRead() 方法就会被调用;
  • OutboundHandler 是向前传递,需要调用 ChannelHanderContext 的同名方法,比如如果是传递 write() 方法调用的也是 write() ,这里下一个节点的 write() 方法就会被调用。

正常我们在开发中对数据的读写使用一个节点就够了,不需要使用这种传递性,这种传递性一般用在编解码器上。

无论是我们写子类自定义的编解码器还是使用 Netty 提供的编解码器,它们内部都会自动调用这些传递方法,开发者对这些是无感知的。

我们了解这些传递性的最大意义在于确定在添加 Handler 到 pipeline 中时(Handle 会被封装成 DefaultChannelHandlerContext 然后添加到队列中去)的顺序:

  • 先添加编解码器,并且解码器在前,编码器在后;
  • 先添加 OutboundHandler ,后添加 InboundHandler。

Pipeline 消息入站

消息入站首先是 Selector 监听到读就绪事件,接着判断就绪事件如果是读事件就调用通道的 read() 方法,通道会把消息读到 ByteBuf 里,然后把 ByteBuf 传递给 Pipeline 自已去处理。

Pipeline 会直接把 ByteBuf 交给 HeadContext 去处理,而 HeadContext 没有具体的处理逻辑,会直接传递给下一个节点去处理。

下图就是 Pipeline 节点的处理顺序:

Pipeline 消息出站

消息出站与入站最大的不同是发起方。入站的消息是通过 Selector 监听到的。而出站是程序主动发起的。

对外写消息有三种方式:

  • 调用 channel 的 writeAndFlush(),它内部会直接调用 pipeline.writeAndFlush(msg),最终会从队列尾部开始调用;
  • 调用 pipeline 的 writeAndFlush(),它内部会直接调用 tail.writeAndFlush(msg),最终也是从队列尾部开始调用;
  • 调用 channelHandlerContext 的 writeAndFlush(),它内部会以当前节点为起点找到下一个 OutboundHandler 让它去处理,最终就是从这个节点的下一个 OutboundHander 开始处理。

下图展示了各个节点处理顺序:

NioEventLoop

NioEventLoop 就是一个事件循环类,几乎所有事件处理都会经过这个类,它的继承关系如下:

NioEventLoopGroup

NioEventLoopGroup 就是 NioEventLoop 组,负责管理 NioEventLoop,当有 Channel 需要注册的时候,NioEventLoopGroup 会轮询找到下一个 NioEventLoop 注册上去。在 NioEventLoopGroup 上作出的配置最终都会作用到 NioEventLoop 上。

ChannelOption

在程序初始化的时候我们可以通过 ChannelOption 对 Channel 设置一些参数,常用的参数有两个:SO_BACKLOG 和 SO_KEEPALIVE。

下面分别讲讲这两个参数 :

SO_BACKLOG

这个参数主要是用来控制 Accept 队列的大小的 (早期的 Linux 内核是控制的 SYN 队列的大小)。

这里展开说一下这两个队列,它们都是由 Linux 内核维护的。一个是保存第一次握手的 SYN 的队列,系统会依次从这个队列取出 SYN 并进行响应,一个是保存三次握手完成后的 Accept 队列,调用 accept 方法就能拿到已完成连接的 socket,反应在 Netty 里面就是返回一个新的 Channel。

SO_KEEPALIVE

这个参数对应的是连接的保活机制 ,如果不设置这个参数,请求完成连接就会被关闭。设置了这个参数之后,连接关闭的条件变成了如果客户端与服务器 2 个小时没有数据交互,那么客户端就会开始发探活数据报文,如果多次发送都没有响应,就断开连接。

ServerBootstrap 和 Bootstrap

服务端和客户端的启动类,负责对 Netty 的各个组件进行配置。

服务器端配置代码如下:

1
2
3
4
5
6
7
8
9
10
11
bootstrap.group(bossGroup, workerGroup) // 设置 BossGroup 和 ChildGroup
.channel(NioServerSocketChannel.class) // 设置 Channel 具体类
.option(ChannelOption.SO_BACKLOG, 128) // 设置连接队列
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) // 设置开启保活机制
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 把自定义 Handler 添加到 pipeline
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});

ChannelFuture

下图是 ChannelFuture 的继承关系

从图中可以看出,它继承的 Future 接口是 Netty 自定义的接口,这个接口同时也继承自 Java 原生的 Future 接口。

在 Netty 中最常用的是 ChannelFuture 的子类 DefaultChannelPromise ,而这个类大部分功能都是由 DefaultPromise 实现的。

DefaultPromise 阻塞线程使用的是 Object 的 wait() 方法,而原生 Future 的子类 FutureTask 阻塞线程使用的是 LockSupport 的 park() 方法。

ChannelFuture 支持添加 ChannelFutureListener ,监听各种事件。

Unpooled

这个类如果我们在使用 Netty 编程时不使用编解码器就会经常用到,它可以通过传入的字符串快速生成一个 ByteBuf (Netty 独有的类,类似于原生的 ByteBuffer,只是它在 ByteBuffer 的基础上做了封装) 对象。常用的方法如下:

1
public static ByteBuf copiedBuffer(CharSequence string, Charset charset)

StringDecoder 和 StringEncoder

这两个类分别是 Netty 提供的解码器和编码器,它们同时也是 ChannelHandler 的子类。有了这两个编解码器,就不再需要与 ByteBuf 打交道,代码写起来也更简洁方便。

StringDecoder

下图是解码器类 StringDecoder 的继承关系,注意它的父类是实现了 ChannelInboundHandler 接口的,作用在消息入站的时候:

如果有特殊需求需要自定义解码器也是可以的,只要实现 MessageToMessageDecoder 接口就可以了。

写法如下:

1
2
3
4
5
6
public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
list.add(byteBuf.toString(CharsetUtil.UTF_8));
}
}

StringEncoder

下图是编码器类 StringEncoder 的继承关系,注意它的父类是实现了 ChannelOutboundHandler 接口的,作用在消息出站:

如果要自定义编码器,实现 MessageToMessageEncoder 接口就行了。

写法如下:

1
2
3
4
5
6
public class MessageEncoder extends MessageToMessageEncoder<String> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, String s, List<Object> list) throws Exception {
list.add(Unpooled.copiedBuffer(s, CharsetUtil.UTF_8));
}
}

如果嫌为自定义编码器和自定义解码器分别创建一个类太麻烦,还可以直接继承 MessageToMessageCodec 接口。

这个接口继承关系如下,注意它的父类同时实现了 ChannelInboundHandler 和 ChannelOutboundHandler ,作用在消息入站和出站:

写法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MessageCodec extends MessageToMessageCodec<ByteBuf, String> {

@Override
protected void encode(ChannelHandlerContext channelHandlerContext, String s, List<Object> list) throws Exception {
list.add(Unpooled.copiedBuffer(s, CharsetUtil.UTF_8));
}

@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
list.add(byteBuf.toString(CharsetUtil.UTF_8));
}

}

LineBasedFrameDecoder 与 DelimiterBasedFrameDecoder

这两个类也都是解码器,但它们解决的问题与上面所讲的编解码器不同,这两个类主要是解决粘包拆包的问题。

那么问题来了,什么是粘包和拆包?为什么会出现粘包和拆包呢?

首先来说说什么是粘包和拆包:

在文章开始讲了 TCP 的三个重要的特点:面向连接、可靠的和字节流。而 Netty 底层是基于 TCP 的,它的客户端与服务端交互时发送的数据在传输层都是通过字节流传输的,字节流是没有界线的概念的,这时服务器在读取数据时就可能在一次读取中读取到到客户端分几次发的数据,这就叫粘包。如果客户端发送一次数据,服务器分几次才能完整读到,这就是拆包。

粘包拆包大致如下图所示:

粘包拆包大致有以下几个原因:

  • socket缓冲区与滑动窗口: 在发送数据的时,发送方必须要先确认接收方的窗口没有被填充满,如果没有填满,则可以发送
  • MSS/MTU限制
  • Nagle算法:Nagle算法是为了尽可能发送大块数据,避免网络中充斥着许多小数据块。

Netty 中解决粘包拆包的方法:

  • FixedLengthFrameDecoder:固定长度拆包器,使用固定长度进行拆分;
  • LineBasedFrameDecoder:行拆包器,使用换行符进行拆分;
  • DelimiterBasedFrameDecoder:分隔符拆包器,使用自定义的分隔符进行拆分;
  • LengthFieldBasedFrameDecoder:基于数据包长度的拆包器,基于应用层协议中传过来的长度进行拆分。

最常用的就是中间两个 LineBasedFrameDecoder 和 DelimiterBasedFrameDecoder。

总结

以上就是 Netty 编程相关的知识点。Netty 的组件非常多,可以自定义的地方也非常多,但熟悉这些组件之后使用它们编程会非常方便快捷。