总结netty的知识点
一、NIO vs Netty
先回顾一下同步、异步、阻塞、非阻塞的概念。
原生的JDK基于多路复用IO模型epoll,是同步非阻塞IO,它的API比较复杂,使用难度较大且出现问题之后排查难度大,而且原生NIO有一些已知的BUG,所以不建议使用原生NIO进行开发,建议使用一些生产级别的封装组件,Netty就是其中一个。
netty事件处理流程
netty基本原理
Boss线程池(接收客户端请求)职责如下:
- (1)接收客户端的连接,初始化Channel参数
- (2)将链路状态变更时间通知给ChannelPipeline
Worker线程池(处理IO操作)作用是:
- (1)异步读取通信对端的数据报,发送读事件到ChannelPipeline
- (2)异步发送消息到通信对端,调用ChannelPipeline的消息发送接口
- (3)执行系统调用Task
- (4)执行定时任务Task
通过配置boss和worker线程池的线程个数以及是否共享线程池等方式,netty的线程模型可以在单线程、多线程、主从线程之间切换。见下方reactor模型。
NioEventLoop
NioEventLoopGroup就是一个线程池,NioEventLoop就是池里面的一个线程。
NioEventLoop#run()方法是一个无限循环,主要逻辑分为以下三大步骤:
- select 选择任务
- processSelectedKeys 处理Channel感兴趣的就绪IO事件
- runAllTasks 运行所有普通任务和定时任务
reactor模型
netty是NIO reactor模型的实现,常见的reactor模型有:
- reactor单线程:一个线程处理所有的事情
- reactor多线程模型:一个专门的acceptor线程用来监听TCP请求,一个NIO线程池负责消息的读写、编码/解码
- reactor主从多线程模型:acceptor不是单独的一个线程,而是一个线程池,另外同样有一个线程池负责消息读写、编码/解码
netty可以通过EventLoopGroup参数来控制使用的是哪一个模型
//1. 主从模式,不控制主从的线程池的数量,通常这么用
//NioEventLoopGroup的参数如果不写,默认线程数量是cpu核心数*2
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
//2. 多线程模式,区别仅仅是acceptor线程池的线程数量为1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
//...
//3. 单线程模式
EventLoopGroup group = new NioEventLoopGroup(1);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group, group)
//...
序列化框架
netty默认支持google的protobuf序列化框架,性能比较高
零拷贝
操作系统的零拷贝指的是避免在用户空间与内核空间来回拷贝数据,不太一样的是,Netty的零拷贝指的是避免在用户空间中JVM堆内存和直接内存之间的数据拷贝。
通过FileRegion实现零拷贝
Netty的FileRegion底层调用的是NIO FileChannel的transferTo函数,实际上会触发系统级别的sendTo
函数,使用的是系统级别的零拷贝
DirectBuffers
Netty的接收和发送ByteBuffer采用DIRECT BUFFERS,使用堆外部的直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。如果使用JVM的堆内存进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才写入Socket中。
通过CompositeByteBuf实现合并bytebuf
ByteBuf header = ...
ByteBuf body = ...
//CompositeByteBuf就是一个逻辑块,
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
compositeByteBuf.addComponents(true,header,body);
通过wrap操作实现零拷贝
ByteBuf header = ...
ByteBuf body = ...
//Unpooled提供很多重载方法,将一个或多个buffer包装为一个 ByteBuf对象,从而实现零拷贝。
ByteBuf allByteBuf = Unpooled.wrappedBuffer(header, body);
通过slice操作实现零拷贝
ByteBuf byteBuf = ...
//与Unpooled.wrappedBuffer操作相反
ByteBuf header = byteBuf.slice(0, 5);
ByteBuf body = byteBuf.slice(5, 10);
内存池
在上面零拷贝中提到的Netty可以直接在JVM堆外部的直接内存中使用缓冲区ByteBuf,为了重用缓冲区,避免频繁在直接内存中分配和回收对象,Netty提供了基于内存池的缓冲区重用机制。
//未使用内存池,直接使用堆外直接内存
ByteBuf unpooledBuffer = Unpooled.directBuffer(1024);
//使用内存池性能会好很多
ByteBuf pooledBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(1024);
串行无锁化
NioEventLoop就是Netty框架的Reactor线程,需要处理IO操作,所以内部①有一个多路复用器Selector,另外内部②还有一个volatile的变量Thread(父类SingleThreadEventExecutor中定义了一个volatile的线程private volatile Thread thread;
),代表当前线程本身,另外还有一个队列MPSCQ(multiple producer single consumer)。
在netty启动过程中,通过调用inEventLoop()
判断volatile thread是不是当前线程,如果是则执行,否则把想要执行的任务放到队列中。
二、Netty in glance
依赖
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.36.Final</version>
</dependency>
服务端
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class TimeServer {
public void bind(int port) throws Exception {
// 配置服务端的NIO线程组,就是reactor线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(); //用来接收请求
EventLoopGroup workerGroup = new NioEventLoopGroup(); //用来处理read/write事件
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler()); //读写事件放在ChildChannelHandler中处理
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
arg0.pipeline().addLast(new TimeServerHandler());
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new TimeServer().bind(port);
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class TimeServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//读到客户端发送过来的消息之后的操作
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("The time server receive order : " + body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
System.currentTimeMillis()).toString() : "BAD ORDER";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
客户端
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class TimeClient {
public void connect(int port, String host) throws Exception {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
// 当代客户端链路关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new TimeClient().connect(port, "127.0.0.1");
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import java.util.logging.Logger;
public class TimeClientHandler extends ChannelHandlerAdapter {
private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());
private final ByteBuf firstMessage;
public TimeClientHandler() {
byte[] req = "QUERY TIME ORDER".getBytes();
firstMessage = Unpooled.buffer(req.length);
firstMessage.writeBytes(req);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("Now is : " + body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 释放资源
logger.warning("Unexpected exception from downstream : " + cause.getMessage());
ctx.close();
}
}
三、解决TCP粘包/拆包问题
客户端发送的多条消息可能被合并成一条发送到服务端,也可能一条消息被拆分成多条消息发送到服务端,为了解决这个问题,netty提供了几种解码器。
- LineBasedFrameDecoder //以换行符为标志的解码器,支持配置单行最大长度,超过则抛异常
- DelimiterBasedFrameDecoder //自定义分隔符解码器
- FixedLengthFrameDecoder //固定长度字符串解码器
- StringDecoder //将接收到的消息转换为字符串
//服务端初始化Channel时指定2个解码器LineBasedFrameDecoder+StringDecoder
//这2个解码器配合起来组成了按行切分的文本解码器
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
arg0.pipeline().addLast(new StringDecoder());
arg0.pipeline().addLast(new TimeServerHandler());
}
}
//服务端初始化Channel时指定2个解码器DelimiterBasedFrameDecoder+StringDecoder
//通过自定义分隔符"$_"来分割字符串
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoServerHandler());
}
});
//服务端初始化Channel时指定2个解码器FixedLengthFrameDecoder+StringDecoder
//通过固定长度来分割字符串
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoServerHandler());
}
});
除去以上3种方式,还有一种是通过在消息头中定义长度字段来标示消息的总长度
//示例通过msgpack进行编码解码,通过LengthFieldBasedFrameDecoder、LengthFieldPrepender添加粘包/半包支持
private class ClientHandlerInit extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//这里设置通过增加包头表示报文长度来避免粘包
//1) lengthFieldOffset = 0;//长度字段的偏差
//2) lengthFieldLength = 2;//长度字段占的字节数
//3) lengthAdjustment = 0;//添加到长度字段的补偿值
//4) initialBytesToStrip = 0。//从解码帧中第一次去除的字节数
ch.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535, 0, 2,0,2));
//增加msgpack解码器
ch.pipeline().addLast("msgpack decoder",new MsgpackDecoder());
//这里设置读取报文的包头长度来避免粘包,长度字段(包头)字节数为2位
ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));
//增加msgpack编码器
ch.pipeline().addLast("msgpack encoder",new MsgpackEncoder());
ch.pipeline().addLast(new TimeClientHandler());
}
}
疑问
为什么NIO是同步非阻塞的但是Netty是异步非阻塞的
NIO(Non-Blocking IO)是同步非阻塞IO模型,Netty底层采用了NIO技术,但是通过一些手段实现了异步的效果,使用者的用法就是异步的。即使是BIO,通过多线程的方式,也可以实现异步的效果。
Netty为什么使用NIO,而不是AIO
the main reasons were:
- Not faster than NIO (epoll) on unix systems (which is true)
- There is no daragram suppport
- Unnecessary threading model (too much abstraction without usage)
future.channel().closeFuture().sync()有什么用
主线程执行到这里就 wait 子线程结束,子线程才是真正监听和接受请求的,closeFuture()是开启了一个channel的监听器,负责监听channel是否关闭的状态,如果监听到channel关闭了,子线程才会释放,syncUninterruptibly()让主线程同步等待子线程结果
默认情况下,netty服务端起多少线程?何时启动?
2*CPU核数个线程
ServerBootstrap#bind#doBind#doBind0 channel.eventLoop().execute
NioEventLoopGroup#register —>MultithreadEventLoopGroup#register(从group中获取到一个NioEventLoop) —>SingleThreadEventLoop#register —>AbstractChannel#AbstractUnsafe#register
//通过inEventLoop方法判断是NioEventLoop线程本身还是用户线程
//如果是用户线程,从这里启动NioEventLoop线程,把任务放到队列,启动NioEventLoop线程之后,由于NioEventLoop继承SingleThreadEventExecutor,所以会触发scheduleExecution,定时执行任务队列中的任务
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new OneTimeTask() {
SingleThreadEventExecutor#scheduleExecution
protected final void scheduleExecution() {
updateThread(null);
executor.execute(asRunnable);
}
Netty是如何解决jdk空轮询bug的?
JDK epoll空轮询BUG是什么?
int selectCnt = 0;
...
long currentTimeNanos = System.nanoTime();
for (;;) {
// 1.定时任务截止事时间快到了,中断本次轮询
...
// 2.轮询过程中发现有任务加入,中断本次轮询
...
// 3.阻塞式select操作
selector.select(timeoutMillis);
selectCnt ++;
// 4.解决jdk的nio bug
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
rebuildSelector();
selector = this.selector;
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
...
}
netty 会在每次进行 selector.select(timeoutMillis) 之前记录一下开始时间currentTimeNanos,在select之后记录一下结束时间,判断select操作是否至少持续了timeoutMillis秒(这里将time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos改成time - currentTimeNanos >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis)或许更好理解一些), 如果持续的时间大于等于timeoutMillis,说明就是一次有效的轮询,重置selectCnt标志,否则,表明该阻塞方法并没有阻塞这么长时间,可能触发了jdk的空轮询bug,当空轮询的次数超过一个阀值的时候,默认是512,就开始重建selector。
Netty如何保证异步串行无锁化?
NioEventLoop的父类SingleThreadEventExecutor中定义了判断执行任务的线程是否为NioEventLoop当前线程,主要是通过volatile变量判断,如果是则用当前的NioEventLoop线程执行任务,如果不是,则把任务放入队列,NioEventLoop线程会周期性的执行队列中的任务。这样保证了任务的单线程串行执行。
private volatile Thread thread;
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
netty的高性能高在哪里?
基于I/O多路复用模型
零拷贝
基于NIO的Buffer
基于内存池的缓冲区重用机制
无锁化的串行设计理念
I/O操作的异步处理
提供对protobuf等高性能序列化协议支持
可以对TCP进行更加灵活地配置
参考
[netty]–最通用TCP黏包解决方案:LengthFieldBasedFrameDecoder和LengthFieldPrepender