总结netty的知识点

一、NIO vs Netty

先回顾一下同步、异步、阻塞、非阻塞的概念。

原生的JDK基于多路复用IO模型epoll,是同步非阻塞IO,它的API比较复杂,使用难度较大且出现问题之后排查难度大,而且原生NIO有一些已知的BUG,所以不建议使用原生NIO进行开发,建议使用一些生产级别的封装组件,Netty就是其中一个。

netty事件处理流程

Netty事件处理流程.png

netty基本原理

netty基本原理.png

Boss线程池(接收客户端请求)职责如下:

  • (1)接收客户端的连接,初始化Channel参数
  • (2)将链路状态变更时间通知给ChannelPipeline

Worker线程池(处理IO操作)作用是:

  • (1)异步读取通信对端的数据报,发送读事件到ChannelPipeline
  • (2)异步发送消息到通信对端,调用ChannelPipeline的消息发送接口
  • (3)执行系统调用Task
  • (4)执行定时任务Task

通过配置boss和worker线程池的线程个数以及是否共享线程池等方式,netty的线程模型可以在单线程、多线程、主从线程之间切换。见下方reactor模型。

NioEventLoop

NioEventLoopGroup就是一个线程池,NioEventLoop就是池里面的一个线程。

NioEventLoop#run()方法是一个无限循环,主要逻辑分为以下三大步骤:

  • select 选择任务
  • processSelectedKeys 处理Channel感兴趣的就绪IO事件
  • runAllTasks 运行所有普通任务和定时任务

reactor模型

netty是NIO reactor模型的实现,常见的reactor模型有:

  • reactor单线程:一个线程处理所有的事情
  • reactor多线程模型:一个专门的acceptor线程用来监听TCP请求,一个NIO线程池负责消息的读写、编码/解码
  • reactor主从多线程模型:acceptor不是单独的一个线程,而是一个线程池,另外同样有一个线程池负责消息读写、编码/解码

netty可以通过EventLoopGroup参数来控制使用的是哪一个模型

//1. 主从模式,不控制主从的线程池的数量,通常这么用
//NioEventLoopGroup的参数如果不写,默认线程数量是cpu核心数*2
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 1024)
            .childHandler(new ChildChannelHandler());

    ChannelFuture f = b.bind(port).sync();
    f.channel().closeFuture().sync();
} finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
}

//2. 多线程模式,区别仅仅是acceptor线程池的线程数量为1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
    //...

//3. 单线程模式
EventLoopGroup group = new NioEventLoopGroup(1);
try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(group, group)
    //...

序列化框架

netty默认支持google的protobuf序列化框架,性能比较高

零拷贝

操作系统的零拷贝指的是避免在用户空间与内核空间来回拷贝数据,不太一样的是,Netty的零拷贝指的是避免在用户空间中JVM堆内存和直接内存之间的数据拷贝。

netty深入理解系列-Netty零拷贝的实现原理

通过FileRegion实现零拷贝

Netty的FileRegion底层调用的是NIO FileChannel的transferTo函数,实际上会触发系统级别的sendTo函数,使用的是系统级别的零拷贝

DirectBuffers

Netty的接收和发送ByteBuffer采用DIRECT BUFFERS,使用堆外部的直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。如果使用JVM的堆内存进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才写入Socket中。

通过CompositeByteBuf实现合并bytebuf
ByteBuf header = ...
ByteBuf body = ...
//CompositeByteBuf就是一个逻辑块,
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
compositeByteBuf.addComponents(true,header,body);
通过wrap操作实现零拷贝
ByteBuf header = ...
ByteBuf body = ...
//Unpooled提供很多重载方法,将一个或多个buffer包装为一个 ByteBuf对象,从而实现零拷贝。
ByteBuf allByteBuf = Unpooled.wrappedBuffer(header, body);
通过slice操作实现零拷贝
ByteBuf byteBuf = ...
//与Unpooled.wrappedBuffer操作相反
ByteBuf header = byteBuf.slice(0, 5);
ByteBuf body = byteBuf.slice(5, 10);

内存池

在上面零拷贝中提到的Netty可以直接在JVM堆外部的直接内存中使用缓冲区ByteBuf,为了重用缓冲区,避免频繁在直接内存中分配和回收对象,Netty提供了基于内存池的缓冲区重用机制。

//未使用内存池,直接使用堆外直接内存
ByteBuf unpooledBuffer = Unpooled.directBuffer(1024);
//使用内存池性能会好很多
ByteBuf pooledBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(1024);

串行无锁化

NioEventLoop.jpg

NioEventLoop就是Netty框架的Reactor线程,需要处理IO操作,所以内部①有一个多路复用器Selector,另外内部②还有一个volatile的变量Thread(父类SingleThreadEventExecutor中定义了一个volatile的线程private volatile Thread thread;),代表当前线程本身,另外还有一个队列MPSCQ(multiple producer single consumer)。

在netty启动过程中,通过调用inEventLoop()判断volatile thread是不是当前线程,如果是则执行,否则把想要执行的任务放到队列中。

二、Netty in glance

依赖

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.36.Final</version>
</dependency>

服务端

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class TimeServer {

    public void bind(int port) throws Exception {
        // 配置服务端的NIO线程组,就是reactor线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(); //用来接收请求
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //用来处理read/write事件
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childHandler(new ChildChannelHandler()); //读写事件放在ChildChannelHandler中处理
            // 绑定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();

            // 等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel arg0) throws Exception {
            arg0.pipeline().addLast(new TimeServerHandler());
        }

    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
            port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
            // 采用默认值
            }
        }
        new TimeServer().bind(port);
    }
}

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class TimeServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //读到客户端发送过来的消息之后的操作
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("The time server receive order : " + body);
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
            System.currentTimeMillis()).toString() : "BAD ORDER";
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.write(resp);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
	    ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
	    ctx.close();
    }
}

客户端

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class TimeClient {

    public void connect(int port, String host) throws Exception {
        // 配置客户端NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch)
                    throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
                });

            // 发起异步连接操作
            ChannelFuture f = b.connect(host, port).sync();

            // 当代客户端链路关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放NIO线程组
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
            port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
            // 采用默认值
            }
        }
        new TimeClient().connect(port, "127.0.0.1");
    }
}

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import java.util.logging.Logger;
public class TimeClientHandler extends ChannelHandlerAdapter {

    private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());
    private final ByteBuf firstMessage;

    public TimeClientHandler() {
        byte[] req = "QUERY TIME ORDER".getBytes();
        firstMessage = Unpooled.buffer(req.length);
        firstMessage.writeBytes(req);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(firstMessage);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("Now is : " + body);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 释放资源
        logger.warning("Unexpected exception from downstream : " + cause.getMessage());
        ctx.close();
    }
}

三、解决TCP粘包/拆包问题

客户端发送的多条消息可能被合并成一条发送到服务端,也可能一条消息被拆分成多条消息发送到服务端,为了解决这个问题,netty提供了几种解码器。

  • LineBasedFrameDecoder //以换行符为标志的解码器,支持配置单行最大长度,超过则抛异常
  • DelimiterBasedFrameDecoder //自定义分隔符解码器
  • FixedLengthFrameDecoder //固定长度字符串解码器
  • StringDecoder //将接收到的消息转换为字符串
//服务端初始化Channel时指定2个解码器LineBasedFrameDecoder+StringDecoder
//这2个解码器配合起来组成了按行切分的文本解码器
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel arg0) throws Exception {
        arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
        arg0.pipeline().addLast(new StringDecoder());
        arg0.pipeline().addLast(new TimeServerHandler());
    }
}

//服务端初始化Channel时指定2个解码器DelimiterBasedFrameDecoder+StringDecoder
//通过自定义分隔符"$_"来分割字符串
b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 100)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch)
                throws Exception {
                ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                ch.pipeline().addLast(new StringDecoder());
                ch.pipeline().addLast(new EchoServerHandler());
            }
            });

//服务端初始化Channel时指定2个解码器FixedLengthFrameDecoder+StringDecoder
//通过固定长度来分割字符串
b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 100)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch)
                throws Exception {
                ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
                ch.pipeline().addLast(new StringDecoder());
                ch.pipeline().addLast(new EchoServerHandler());
            }
            });

除去以上3种方式,还有一种是通过在消息头中定义长度字段来标示消息的总长度

//示例通过msgpack进行编码解码,通过LengthFieldBasedFrameDecoder、LengthFieldPrepender添加粘包/半包支持
private class ClientHandlerInit extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            //这里设置通过增加包头表示报文长度来避免粘包
            //1) lengthFieldOffset = 0;//长度字段的偏差
            //2) lengthFieldLength = 2;//长度字段占的字节数
            //3) lengthAdjustment = 0;//添加到长度字段的补偿值
            //4) initialBytesToStrip = 0。//从解码帧中第一次去除的字节数
            ch.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535, 0, 2,0,2));
            //增加msgpack解码器
            ch.pipeline().addLast("msgpack decoder",new MsgpackDecoder());
            //这里设置读取报文的包头长度来避免粘包,长度字段(包头)字节数为2位
            ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2));
            //增加msgpack编码器
            ch.pipeline().addLast("msgpack encoder",new MsgpackEncoder());
            ch.pipeline().addLast(new TimeClientHandler());
        }

    }

疑问

为什么NIO是同步非阻塞的但是Netty是异步非阻塞的

NIO(Non-Blocking IO)是同步非阻塞IO模型,Netty底层采用了NIO技术,但是通过一些手段实现了异步的效果,使用者的用法就是异步的。即使是BIO,通过多线程的方式,也可以实现异步的效果。

Netty为什么使用NIO,而不是AIO

the main reasons were:

  • Not faster than NIO (epoll) on unix systems (which is true)
  • There is no daragram suppport
  • Unnecessary threading model (too much abstraction without usage)

future.channel().closeFuture().sync()有什么用

主线程执行到这里就 wait 子线程结束,子线程才是真正监听和接受请求的,closeFuture()是开启了一个channel的监听器,负责监听channel是否关闭的状态,如果监听到channel关闭了,子线程才会释放,syncUninterruptibly()让主线程同步等待子线程结果

默认情况下,netty服务端起多少线程?何时启动?

2*CPU核数个线程

ServerBootstrap#bind#doBind#doBind0 channel.eventLoop().execute

NioEventLoopGroup#register —>MultithreadEventLoopGroup#register(从group中获取到一个NioEventLoop) —>SingleThreadEventLoop#register —>AbstractChannel#AbstractUnsafe#register

//通过inEventLoop方法判断是NioEventLoop线程本身还是用户线程
//如果是用户线程,从这里启动NioEventLoop线程,把任务放到队列,启动NioEventLoop线程之后,由于NioEventLoop继承SingleThreadEventExecutor,所以会触发scheduleExecution,定时执行任务队列中的任务
if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new OneTimeTask() {
SingleThreadEventExecutor#scheduleExecution
protected final void scheduleExecution() {
        updateThread(null); 
        executor.execute(asRunnable);
    }

Netty是如何解决jdk空轮询bug的?

JDK epoll空轮询BUG是什么?

jdk_epoll_bug.png

int selectCnt = 0;
...
long currentTimeNanos = System.nanoTime();
for (;;) {
    // 1.定时任务截止事时间快到了,中断本次轮询
    ...
    // 2.轮询过程中发现有任务加入,中断本次轮询
    ...
    // 3.阻塞式select操作
    selector.select(timeoutMillis);
    selectCnt ++;
    // 4.解决jdk的nio bug
    long time = System.nanoTime();
    if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
        selectCnt = 1;
    } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {

        rebuildSelector();
        selector = this.selector;
        selector.selectNow();
        selectCnt = 1;
        break;
    }
    currentTimeNanos = time; 
    ...
 }

netty 会在每次进行 selector.select(timeoutMillis) 之前记录一下开始时间currentTimeNanos,在select之后记录一下结束时间,判断select操作是否至少持续了timeoutMillis秒(这里将time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos改成time - currentTimeNanos >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis)或许更好理解一些), 如果持续的时间大于等于timeoutMillis,说明就是一次有效的轮询,重置selectCnt标志,否则,表明该阻塞方法并没有阻塞这么长时间,可能触发了jdk的空轮询bug,当空轮询的次数超过一个阀值的时候,默认是512,就开始重建selector。

Netty如何保证异步串行无锁化?

NioEventLoop的父类SingleThreadEventExecutor中定义了判断执行任务的线程是否为NioEventLoop当前线程,主要是通过volatile变量判断,如果是则用当前的NioEventLoop线程执行任务,如果不是,则把任务放入队列,NioEventLoop线程会周期性的执行队列中的任务。这样保证了任务的单线程串行执行。

private volatile Thread thread;

@Override
public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
}

netty的高性能高在哪里?

基于I/O多路复用模型

零拷贝

基于NIO的Buffer

基于内存池的缓冲区重用机制

无锁化的串行设计理念

I/O操作的异步处理

提供对protobuf等高性能序列化协议支持

可以对TCP进行更加灵活地配置

参考

Netty权威指南(第二版)

Netty权威指南(第二版)源码

[netty]–最通用TCP黏包解决方案:LengthFieldBasedFrameDecoder和LengthFieldPrepender

Netty自定义协议解析原理与应用

SpringBoot使用netty

使用Netty实现HTTP服务器

Netty : 臭名昭著的JDK的NIO bug(空轮询bug)