高性能架构的Netty

1、Netty概述

  • 原生NIO存在的问题

    • NIO的类库和API繁杂,使用麻烦:需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
    • 需要具备其他的额外技能:要熟悉Java多线程编程,因为NIO编程涉及到Reactor模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的NIO程序。
    • 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。
    • JDK NIO的Bug:例如臭名昭著的Epoll Bug,它会导致Selector空轮询,最终导致CPU100%。直到 JDK1.7版本该问题仍旧存在,没有被根本解决。
  • Netty介绍

    • Netty是由JBOSS提供的一个Java开源框架。Netty提供异步(采用多线程,基于多路复用)的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络IO程序。

    • Netty可以帮助你快速、简单的开发出一个网络应用,相当于简化和流程化了NIO的开发过程。

    • Netty是目前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,知名的Elasticsearch、Dubbo框架内部都采用了Netty。

    • 其架构如下图:

    • Netty优点:Netty对JDK自带的NIO的API进行了封装,解决了下述问题。

      • 将一些基础的网络协议定义好直接使用。
      • 解决TCP传输问题,如粘包、半包。
      • 解决了epoll空轮询导致CPU100%的问题。
      • 对API进行增强,使之更易用,如FastThreadLocal=>ThreadLocal,ByteBuf=>ByteBuffer。
      • 设计优雅:适用于各种传输类型的统一API阻塞和非阻塞Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型-单线程,一个或多个线程池。
      • 使用方便:详细记录的Javadoc,用户指南和示例;没有其他依赖项,JDK5(Netty3.x)或 6(Netty4.x)就足够了。
      • 高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。
      • 安全:完整的SSL/TLS和StartTLS支持。
      • 社区活跃、不断更新:社区活跃,版本迭代周期短,发现的Bug可以被及时修复,同时,更 多的新功能会被加入。
    • Netty在Java网络应用框架中的地位就好比:Spring框架在JavaEE开发中的地位。以下的框架都使用了 Netty,因为它们有网络通信需求!

      • Cassandra - nosql数据库
      • Spark - 大数据分布式计算框架
      • Hadoop - 大数据分布式存储框架
      • RocketMQ - ali开源的消息队列
      • ElasticSearch - 搜索引擎
      • gRPC - rpc框架
      • Dubbo - rpc框架
      • Spring 5.x - flux api完全抛弃了tomcat,使用netty作为服务器端
      • Zookeeper - 分布式协调框架
  • 最简单的netty客户端和服务端程序:

    • 客户端:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      public class HelloClient {
      public static void main(String[] args) throws InterruptedException {
      // 1、启动类
      new Bootstrap()
      // 2、添加EventLoop
      .group(new NioEventLoopGroup())
      // 3、选择客户端channel实现
      .channel(NioSocketChannel.class)
      // 4、添加处理器
      .handler(new ChannelInitializer<NioSocketChannel>() {
      @Override // 在连接建立后被调用
      protected void initChannel(NioSocketChannel ch) throws Exception {
      ch.pipeline().addLast(new StringEncoder());
      }
      })
      // 5、连接到服务器
      .connect(new InetSocketAddress("localhost", 8080))
      .sync()
      .channel()
      // 6、向服务器发送数据
      .writeAndFlush("hello, world");
      }
      }
    • 服务端:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      public class HelloServer {
      public static void main(String[] args) {
      // 1、启动器,负责组装netty组件,启动服务器
      new ServerBootstrap()
      // 2、NioEventLoopGroup包含BossEventLoop和WorkerEventLoop(selector,thread), 即为group组
      .group(new NioEventLoopGroup())
      // 3、选择服务器的ServerSocketChannel实现
      .channel(NioServerSocketChannel.class)
      // 4、boss负责处理连接,worker(child)负责处理读写,决定了worker(child)能执行哪些操作(handler)
      .childHandler(
      // 5、channel代表和客户端进行数据读写的通道,Initializer为初始化器,ChannelInitializer其实也是个特殊的handler,负责添加别的handler
      new ChannelInitializer<NioSocketChannel>() {
      @Override
      protected void initChannel(NioSocketChannel ch) throws Exception {
      // 6、添加具体handler
      ch.pipeline().addLast(new StringDecoder()); // 将ByteBuf转换为字符串
      ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { // 自定义handler
      @Override // 读事件
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      System.out.println(msg); // 打印上一步转换好的字符串
      }
      });
      }
      })
      // 7、绑定监听端口
      .bind(8080);
      }
      }
    • 流程解析如下图:

      • 把channel理解为数据的通道。
      • 把msg理解为流动的数据,最开始输入是ByteBuf,但经过pipeline的加工,会变成其它类型对象,最后输出又变成ByteBuf。
      • 把handler理解为数据的处理工序。
        • 工序有多道,合在一起就是pipeline,pipeline负责发布事件(读、读取完成…)传播给每个handler,handler对自己感兴趣的事件进行处理(重写了相应事件处理方法)。
        • handler分Inbound和Outbound两类。
      • 把eventLoop(内部使用单线程的线程池实现)理解为处理数据的工人。
        • 工人可以管理多个channel的io操作,并且一旦工人负责了某个channel,就要负责到底(绑定)。
        • 工人既可以执行io操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个channel的待处理任务,任务分为普通任务、定时任务。
        • 工人按照pipeline顺序,依次按照handler的规划(代码)处理数据,可以为每道工序指定不同的工人。

2、Netty组件

2.1 EventLoop

2.1.1 事件循环对象与组

事件循环对象

  • EventLoop(事件循环对象)本质是一个单线程执行器(同时维护了一个Selector),里面有run方法处理Channel上源源不断的io事件。它的继承关系比较复杂:
    • 一条线是继承自j.u.c.ScheduledExecutorService,因此包含了线程池中所有的方法。
    • 另一条线是继承自netty自己的OrderedEventExecutor。
      • 提供了boolean inEventLoop(Thread thread)方法判断一个线程是否属于此EventLoop。
      • 提供了parent方法来看看自己属于哪个EventLoopGroup。

事件循环组

  • EventLoopGroup是一组EventLoop,Channel一般会调用EventLoopGroup的register方法来绑定其中一个EventLoop,后续这个Channel上的io事件都由此EventLoop来处理(保证了io事件处理时的线程安全)。它继承自netty自己的EventExecutorGroup。

    • 实现了Iterable接口提供遍历EventLoop的能力。
    • 另有next方法获取集合中下一个EventLoop。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    @Slf4j
    public class TestEventLoop {
    public static void main(String[] args) throws InterruptedException {
    // 1、创建事件循环组,默认的线程数是当前系统CPU核心数*2
    EventLoopGroup group = new NioEventLoopGroup(2); // 能处理io事件,普通任务或者定时任务
    // EventLoopGroup group = new DefaultEventLoopGroup(); // 只能处理普通任务,定时任务
    // 2、获取下一个事件循环对象
    System.out.println(group.next());// io.netty.channel.nio.NioEventLoop@2ef5e5e3
    System.out.println(group.next());// io.netty.channel.nio.NioEventLoop@36d4b5c
    System.out.println(group.next());// io.netty.channel.nio.NioEventLoop@2ef5e5e3
    System.out.println(group.next());// io.netty.channel.nio.NioEventLoop@36d4b5c

    // 3、执行普通任务
    group.next().execute(() -> {
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    log.debug("测试执行普通任务ok");
    });

    Thread.sleep(2000);

    // 4、执行定时任务
    group.next().scheduleAtFixedRate(() -> {
    log.debug("测试执行定时任务ok");
    }, 0, 1, TimeUnit.SECONDS);
    }
    }

2.1.2 NioEventLoop处理IO事件

1638363708138
  • 客户端:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    @Slf4j
    public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
    Channel channel = new Bootstrap()
    .group(new NioEventLoopGroup(1))
    .handler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
    ch.pipeline().addLast(new StringEncoder());
    }
    })
    .channel(NioSocketChannel.class).connect("localhost", 8080)
    .sync()
    .channel();

    channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("1".getBytes()));
    Thread.sleep(2000);
    channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("2".getBytes()));
    }
    }
  • 服务端:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    @Slf4j
    public class EventLoopServer {
    public static void main(String[] args) {
    // 细分2:创建一个独立的EventLoopGroup,用来处理耗时较长的操作
    EventLoopGroup group = new DefaultEventLoopGroup();
    new ServerBootstrap()
    // 划分为boss和worker
    // 细分1:boss只负责ServerSocketChannel上的accept事件,worker只负责socketChannel上的读写
    // NioServerSocketChannel只会跟一个NioEventLoop绑定
    .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
    ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
    @Override // ByteBuf
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf buf = (ByteBuf) msg;
    log.debug(buf.toString(Charset.defaultCharset()));
    ctx.fireChannelRead(msg); // 让消息传递给下一个handler
    }
    }).addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
    @Override // ByteBuf
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf buf = (ByteBuf) msg;
    log.debug(buf.toString(Charset.defaultCharset()));
    }
    });
    }
    })
    .bind(8080);
    }
    }
  • 先开启服务端,再开启三个客户端后,服务器端打印如下:

    • 一个NioEventLoop可以管理多个SocketChannel,但一个SocketChannel一旦绑定了一个NioEventLoop,那么接下来在此SocketChannel上的读写都由这个NioEventLoop来处理。
  • 那么handler在执行中是如何换人的呢?在handler的执行过程中会调用以下方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // 下一个handler的事件循环是否与当前的事件循环是同一个线程
    EventExecutor executor = next.executor();// 返回下一个handler的eventLoop

    // 判断当前handler中的线程是否和下一个handler中的eventLoop是同个线程,如果是则直接调用
    if (executor.inEventLoop()) {
    next.invokeChannelRead(m);
    }
    // 如果不是则将要执行的代码作为任务提交给下一个事件循环处理(换人)
    else {
    executor.execute(new Runnable() {
    @Override
    public void run() {
    next.invokeChannelRead(m);
    }
    });
    }
    }
    • 如果两个handler绑定的是同一个线程,那么就直接调用。
    • 否则,把要调用的代码封装为一个任务对象,由下一个handler的线程来调用。

2.2 Channel

2.2.1 Channel简介

  • channel的主要作用:

    • close()可以用来关闭channel。
    • closeFuture()用来处理channel的关闭。
      • sync方法作用是同步等待channel关闭。
      • 而addListener方法是异步等待channel关闭。
    • pipeline()方法添加处理器。
    • write()方法将数据写入。
    • writeAndFlush()方法将数据写入并刷出。

2.2.2 ChannelFuture

  • 一段客户端代码拆开后如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
    ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) {
    ch.pipeline().addLast(new StringEncoder());
    }
    })
    // main线程发起调用,真正执行的是NioEventLoop线程,是异步非阻塞的
    .connect("127.0.0.1", 8080); // 1

    //channelFuture.sync(); 阻塞当前线程直至nio线程连接建立完毕
    Channel channel = channelFuture.channel();
    channel.writeAndFlush("hello world!");
    }
    }
  • 将channelFuture.sync()注释后,发现客户端无法向服务端发送数据,原因是connect方法是异步非阻塞的,意味着不等连接建立,方法执行就返回了。因此channelFuture对象中不能立刻获得到正确的Channel对象。实验如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    @Slf4j
    public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
    ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) {
    ch.pipeline().addLast(new StringEncoder());
    }
    })
    .connect("127.0.0.1", 8080);
    log.debug("{}",channelFuture.channel()); // 1
    channelFuture.sync(); // 2
    log.debug("{}",channelFuture.channel()); // 3
    }
    }
    • 执行到1时,连接未建立,打印 [id: 0x7e782185]

    • 执行到2时,sync方法是同步等待连接建立完成。

    • 执行到3时,连接肯定建立了,打印[id: 0x7e782185, L:/127.0.0.1:59164 - R:/127.0.0.1:8080]

  • 除了用sync方法可以让异步操作同步以外,还可以使用回调的方式:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    @Slf4j
    public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
    ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) {
    ch.pipeline().addLast(new StringEncoder());
    }
    })
    .connect("127.0.0.1", 8080); // 1

    channelFuture.addListener(new ChannelFutureListener() {
    @Override
    // 在nio线程连接建立好后就会调用此方法
    public void operationComplete(ChannelFuture future) throws Exception {
    Channel channel = future.channel();
    log.debug("{}", channel);
    channel.writeAndFlush("hello world!");
    }
    });
    }
    }

2.2.3 CloseFuture

  • 可以使用CloseFuture实现在连接关闭后做一些善后操作。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    @Slf4j
    public class CloseFutureClient {
    public static void main(String[] args) throws InterruptedException {
    NioEventLoopGroup group = new NioEventLoopGroup();
    ChannelFuture channelFuture = new Bootstrap()
    .group(group)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<NioSocketChannel>() {
    @Override // 在连接建立后被调用
    protected void initChannel(NioSocketChannel ch) throws Exception {
    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
    ch.pipeline().addLast(new StringEncoder());
    }
    })
    .connect(new InetSocketAddress("localhost", 8080));
    Channel channel = channelFuture.sync().channel();
    log.debug("{}", channel);
    new Thread(() -> {
    Scanner scanner = new Scanner(System.in);
    while (true) {
    String line = scanner.nextLine();
    if ("q".equals(line)) {
    channel.close(); // close是异步操作
    // log.debug("处理关闭之后的操作"); // 不能在这里善后
    break;
    }
    channel.writeAndFlush(line);
    }
    }, "input").start();
    // 获取CloseFuture对象
    ChannelFuture closeFuture = channel.closeFuture();
    // 同步处理关闭
    log.debug("waiting close...");
    closeFuture.sync();
    log.debug("处理关闭之后的操作");
    }
    }
  • 还可以使用异步处理的方式:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    @Slf4j
    public class CloseFutureClient {
    public static void main(String[] args) throws InterruptedException {
    NioEventLoopGroup group = new NioEventLoopGroup();
    ChannelFuture channelFuture = new Bootstrap()
    .group(group)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<NioSocketChannel>() {
    @Override // 在连接建立后被调用
    protected void initChannel(NioSocketChannel ch) throws Exception {
    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
    ch.pipeline().addLast(new StringEncoder());
    }
    })
    .connect(new InetSocketAddress("localhost", 8080));
    Channel channel = channelFuture.sync().channel();
    log.debug("{}", channel);
    new Thread(() -> {
    Scanner scanner = new Scanner(System.in);
    while (true) {
    String line = scanner.nextLine();
    if ("q".equals(line)) {
    channel.close(); // close是异步操作
    // log.debug("处理关闭之后的操作"); // 不能在这里善后
    break;
    }
    channel.writeAndFlush(line);
    }
    }, "input").start();
    // 获取CloseFuture对
    ChannelFuture closeFuture = channel.closeFuture();
    // 异步处理关闭
    closeFuture.addListener((ChannelFutureListener) future -> {
    log.debug("处理关闭之后的操作");
    // 使用shutdownGracefully方法优雅关闭线程。该方法会首先切换EventLoopGroup到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行,从而确保整体应用是在正常有序的状态下退出的。
    group.shutdownGracefully();
    });
    }
    }

2.3 Future与Promise

  • 在异步处理时,经常用到这两个接口:Future和Promise。首先要说明netty中的Future与jdk中的Future同名,但是是两个接口,netty的Future继承自jdk的Future,而Promise又对netty Future进行了扩展。

    功能/名称 jdk Future netty Future Promise
    cancel 取消任务 - -
    isCanceled 任务是否取消 - -
    isDone 任务是否完成,不能区分成功失败 - -
    get 获取任务结果,阻塞等待 - -
    getNow - 获取任务结果,非阻塞,还未产生结果时返回 null -
    await - 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 -
    sync - 等待任务结束,如果任务失败,抛出异常 -
    isSuccess - 判断任务是否成功 -
    cause - 获取失败信息,非阻塞,如果没有失败,返回null -
    addLinstener - 添加回调,异步接收结果 -
    setSuccess - - 设置成功结果
    setFailure - - 设置失败结果

2.3.1 jdk Future

  • jdk Future只能同步等待任务结束(或成功、或失败)才能得到结果。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    @Slf4j
    public class TestJdkFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    // 创建线程池
    ExecutorService service = Executors.newFixedThreadPool(2);
    // 提交任务
    Future<Integer> future = service.submit(new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
    log.debug("执行计算");
    Thread.sleep(1000);
    return 50;
    }
    });
    // 主线程通过future来获取结果
    log.debug("等待结果");
    log.debug("结果是 {}", future.get());
    }
    }

2.3.2 netty Future

  • netty Future可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束。

    • 同步等待任务结束得到结果。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      @Slf4j
      public class TestNettyFuture {
      public static void main(String[] args) throws ExecutionException, InterruptedException {
      NioEventLoopGroup group = new NioEventLoopGroup();
      // 每个EventLoop中只有一个线程
      EventLoop eventLoop = group.next();
      Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
      @Override
      public Integer call() throws Exception {
      log.debug("执行计算");
      Thread.sleep(1000);
      return 70;
      }
      });
      log.debug("等待结果");
      log.debug("结果是 {}", future.get());
      }
      }

    • 异步方式得到结果。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      @Slf4j
      public class TestNettyFuture {
      public static void main(String[] args) throws ExecutionException, InterruptedException {
      NioEventLoopGroup group = new NioEventLoopGroup();
      // 每个EventLoop中只有一个线程
      EventLoop eventLoop = group.next();
      Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
      @Override
      public Integer call() throws Exception {
      log.debug("执行计算");
      Thread.sleep(1000);
      return 70;
      }
      });
      future.addListener(new GenericFutureListener<Future<? super Integer>>() {
      @Override
      public void operationComplete(Future<? super Integer> future) throws Exception {
      log.debug("接收结果:{}", future.getNow());
      }
      });
      }
      }

2.3.3 netty Promise

  • netty Promise不仅有netty Future的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    @Slf4j
    public class TestNettyPromise {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    // 准备EventLoop对象
    EventLoop eventLoop = new NioEventLoopGroup().next();
    // 可以主动创建promise, 结果容器
    DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
    new Thread(() -> {
    // 创建任意一个线程执行计算,计算完毕后向promise填充结果
    log.debug("开始计算...");
    try {
    int i = 1 / 0;
    Thread.sleep(1000);
    promise.setSuccess(80);
    } catch (Exception e) {
    e.printStackTrace();
    promise.setFailure(e);
    }

    }).start();
    // 接收结果的线程
    log.debug("等待结果...");
    log.debug("结果是: {}", promise.get());
    }
    }

2.4 Handler与Pipeline

  • ChannelHandler用来处理Channel上的各种事件,分为入站、出站两种。所有ChannelHandler被连成一串,就是Pipeline。打个比喻,每个Channel是一个产品的加工车间,Pipeline是车间中的流水线,ChannelHandler就是流水线上的各道工序,而后面要讲的ByteBuf是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品。

    • 入站处理器通常是ChannelInboundHandlerAdapter的子类,主要用来读取客户端数据,写回结果。

    • 出站处理器通常是ChannelOutboundHandlerAdapter的子类,主要对写回结果进行加工。

      • 客户端使用之前的CloseFutureClient,服务端代码如下:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        @Slf4j
        public class TestPipeline {
        public static void main(String[] args) {
        new ServerBootstrap()
        .group(new NioEventLoopGroup())
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) throws Exception {
        // 1、通过channel拿到pipeline
        ChannelPipeline pipeline = ch.pipeline();
        // 2、添加处理器head -> h1 -> h2 -> h3 -> h4 -> h5 -> h6 -> tail
        pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.debug("1");
        super.channelRead(ctx, msg);
        }
        });
        pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.debug("2");
        super.channelRead(ctx, msg); // 将数据传递给下个handler,如果不调用,调用链会断开 或者调用ctx.fireChannelRead(student);
        }
        });
        pipeline.addLast("h3", new ChannelInboundHandlerAdapter(){
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.debug("3");
        // 因为h3后没有入站处理器了,所以可以不调用此方法
        super.channelRead(ctx, msg);
        // 为了触发OutboundHandler的执行
        ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
        }
        });
        pipeline.addLast("h4", new ChannelOutboundHandlerAdapter(){
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        log.debug("4");
        super.write(ctx, msg, promise);
        }
        });
        pipeline.addLast("h5", new ChannelOutboundHandlerAdapter(){
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        log.debug("5");
        super.write(ctx, msg, promise);
        }
        });
        pipeline.addLast("h6", new ChannelOutboundHandlerAdapter(){
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        log.debug("6");
        super.write(ctx, msg, promise);
        }
        });
        }
        })
        .bind(8080);
        }
        }

        客户端往服务端发消息后,服务端控制台打印如下:

        可以看到,ChannelInboundHandlerAdapter是按照addLast的顺序执行的,而 ChannelOutboundHandlerAdapter是按照addLast的逆序执行的。实际上ChannelPipeline的实现是一个ChannelHandlerContext(包装了ChannelHandler) 组成的双向链表。

      • 演示msg在各个入站处理器中的互传。

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
          @Slf4j
        public class TestPipeline {
        public static void main(String[] args) {
        new ServerBootstrap()
        .group(new NioEventLoopGroup())
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) throws Exception {
        // 1、通过channel拿到pipeline
        ChannelPipeline pipeline = ch.pipeline();
        // 2、添加处理器head -> h1 -> h2 -> h3 -> h4 -> h5 -> h6 -> tail
        pipeline.addLast("h1", new ChannelInboundHandlerAdapter() {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.debug("1");
        ByteBuf byteBuf = (ByteBuf) msg;
        String name = byteBuf.toString(Charset.defaultCharset());
        super.channelRead(ctx, name);
        }
        });
        pipeline.addLast("h2", new ChannelInboundHandlerAdapter() {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {
        log.debug("2");
        Student student = new Student(name.toString());
        super.channelRead(ctx, student); // 将数据传递给下个handler,如果不调用,调用链会断开。或者调用ctx.fireChannelRead(student);
        }
        });
        pipeline.addLast("h3", new ChannelInboundHandlerAdapter() {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.debug("3,结果是:{},类型是:{}", msg, msg.getClass());
        // super.channelRead(ctx, msg);由于没有下一个入站处理器,所以可以不写
        ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
        }
        });
        pipeline.addLast("h4", new ChannelOutboundHandlerAdapter() {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        log.debug("4");
        super.write(ctx, msg, promise);
        }
        });
        pipeline.addLast("h5", new ChannelOutboundHandlerAdapter() {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        log.debug("5");
        super.write(ctx, msg, promise);
        }
        });
        pipeline.addLast("h6", new ChannelOutboundHandlerAdapter() {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        log.debug("6");
        super.write(ctx, msg, promise);
        }
        });
        }
        })
        .bind(8080);
        }

        @Data
        @AllArgsConstructor
        static class Student {
        private String name;
        }
        }

        如果注释了handler2中的super.channelRead(ctx, student),则服务端只会打印1和2,因为无法传给handler3,也就无法触发写流程逻辑;如果将handler3中的ch.writeAndFlush()换成ctx.writeAndFlush()方法,则只会打印1,2,3,而其余三个出站处理器6,5,4不会打印,因为ctx.writeAndFlush()是从当前节点找上一个出站处理器,而ch.writeAndFlush()是从尾部开始查找。

  • 为了快速测试netty的入站和出站操作,netty提供了便捷的EmbeddedChannel类来模拟入站和出站操作。

    • 模拟入站操作。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      @Slf4j
      public class TestEmbeddedChannel {
      public static void main(String[] args) {
      ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      log.debug("1");
      super.channelRead(ctx, msg);
      }
      };
      ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      log.debug("2");
      super.channelRead(ctx, msg);
      }
      };
      ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
      @Override
      public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
      log.debug("3");
      super.write(ctx, msg, promise);
      }
      };
      ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
      @Override
      public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
      log.debug("4");
      super.write(ctx, msg, promise);
      }
      };
      EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
      // 模拟入站操作
      channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
      }
      }
    • 模拟出站操作。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      @Slf4j
      public class TestEmbeddedChannel {
      public static void main(String[] args) {
      ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      log.debug("1");
      super.channelRead(ctx, msg);
      }
      };
      ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      log.debug("2");
      super.channelRead(ctx, msg);
      }
      };
      ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
      @Override
      public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
      log.debug("3");
      super.write(ctx, msg, promise);
      }
      };
      ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
      @Override
      public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
      log.debug("4");
      super.write(ctx, msg, promise);
      }
      };
      EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
      // 模拟出站操作
      channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes()));
      }
      }

2.5 ByteBuf

  • ByteBuf是对字节数据的封装,它具有以下优势:

    • 池化,可以重用池中ByteBuf实例,更节约内存,减少内存溢出的可能。
    • 读写指针分离,不需要像ByteBuffer一样切换读写模式。
    • 可以自动扩容。
    • 支持链式调用,使用更流畅。
    • 很多地方体现零拷贝,例如slice、duplicate、CompositeByteBuf。

2.5.1 创建

  • 创建一个默认的ByteBuf(池化基于直接内存的ByteBuf),初始容量是10。

    1
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public class TestByteBuf {
    public static void main(String[] args) {
    // 初始容量默认为256
    ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
    log(buf);
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < 32; i++) {
    sb.append("a");
    }
    buf.writeBytes(sb.toString().getBytes());
    log(buf);
    }

    public static void log(ByteBuf buffer) {
    int length = buffer.readableBytes();
    int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
    StringBuilder buf = new StringBuilder(rows * 80 * 2)
    .append("read index:").append(buffer.readerIndex())
    .append(" write index:").append(buffer.writerIndex())
    .append(" capacity:").append(buffer.capacity())
    .append(NEWLINE);
    appendPrettyHexDump(buf, buffer);
    System.out.println(buf.toString());
    }
    }

2.5.2 直接内存与堆内存

  • 可以使用下面的代码来创建池化基于堆的ByteBuf。(读写性能低,分配效率高)

    1
    ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
  • 也可以使用下面的代码来创建池化基于直接内存的ByteBuf。(读写性能高,分配效率低)

    1
    ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
    • 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用。
    • 直接内存对GC压力小,因为这部分内存不受JVM垃圾回收的管理,但也要注意及时主动释放。

2.5.3 池化与非池化

  • 池化的最大意义在于可以重用ByteBuf,优点有:

    • 没有池化,则每次都得创建新的ByteBuf实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加GC压力。
    • 有了池化,则可以重用池中ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提升分配效率。
    • 高并发时,池化功能更节约内存,减少内存溢出的可能。
  • 池化功能是否开启,可以通过下面的系统环境变量来设置。

    1
    -Dio.netty.allocator.type={unpooled|pooled}
    • 4.1以后,非Android平台默认启用池化实现,Android平台启用非池化实现。

    • 4.1之前,池化功能还不成熟,默认是非池化实现。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      public class TestByteBuf {
      public static void main(String[] args) {
      // 初始容量默认为256
      ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
      System.out.println(buf.getClass());
      buf = ByteBufAllocator.DEFAULT.heapBuffer();
      System.out.println(buf.getClass());
      }
      }
      1638601411261

2.5.4 组成

  • ByteBuf由四部分组成(最开始读写指针都在0位置):

    • 与NIO中的ByteBuffer区别如下:
      • ByteBuffer共用一个读写指针,因此需要在读写模式下切换,而ByteBuf分出了读写指针,十分便利。
      • ByteBuffer没有自动扩容机制,而ByteBuf有。

2.5.5 写入

  • 方法列表如下(还有一类方法是set开头的一系列方法,也可以写入数据,但不会改变写指针位置):

    方法签名 含义 备注
    writeBoolean(boolean value) 写入 boolean 值 用一字节 01|00 代表 true|false
    writeByte(int value) 写入 byte 值
    writeShort(int value) 写入 short 值
    writeInt(int value) 写入 int 值 Big Endian,即 0x250,写入后 00 00 02 50
    writeIntLE(int value) 写入 int 值 Little Endian,即 0x250,写入后 50 02 00 00
    writeLong(long value) 写入 long 值
    writeChar(int value) 写入 char 值
    writeFloat(float value) 写入 float 值
    writeDouble(double value) 写入 double 值
    writeBytes(ByteBuf src) 写入 netty 的 ByteBuf
    writeBytes(byte[] src) 写入 byte[]
    writeBytes(ByteBuffer src) 写入 nio 的 ByteBuffer
    int writeCharSequence(CharSequence sequence, Charset charset) 写入字符串
  • 这些方法的未指明返回值的,其返回值都是ByteBuf,意味着可以链式调用。

  • 网络传输,默认习惯是Big Endian。

1
2
3
4
5
6
7
8
9
10
public class TestByteBuf {
public static void main(String[] args) {
// 初始容量默认为256
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
buf.writeBytes(new byte[]{1, 2, 3, 4});
log(buf);
buf.writeInt(5);
log(buf);
}
}

2.5.6 扩容

  • 扩容规则是:

    • 如果写入后数据大小未超过512,则选择下一个16的整数倍,例如写入后大小为12,则扩容后capacity是16。
    • 如果写入后数据大小超过512,则选择下一个2^n,例如写入后大小为513,则扩容后capacity是2^10=1024(2^9=512已经不够了)。
    • 扩容不能超过max capacity,否则会报错。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public class TestByteBuf {
    public static void main(String[] args) {
    // 初始容量默认为256
    ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
    buf.writeBytes(new byte[]{1, 2, 3, 4});
    log(buf);
    buf.writeInt(5);
    log(buf);
    // 触发扩容
    buf.writeInt(6);
    log(buf);
    }
    }

2.5.7 读取

  • 例如读了4次,每次一个字节,那么读过的内容,就属于废弃部分了,再读只能读那些尚未读取的部分。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public class TestByteBuf {
    public static void main(String[] args) {
    // 初始容量默认为256
    ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
    buf.writeBytes(new byte[]{1, 2, 3, 4});
    log(buf);
    buf.writeInt(5);
    log(buf);
    buf.writeInt(6);
    log(buf);
    System.out.println(buf.readByte());
    System.out.println(buf.readByte());
    System.out.println(buf.readByte());
    System.out.println(buf.readByte());
    log(buf);
    }
    }
  • 如果需要重复读取int整数5,可以使用标记mark。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public class TestByteBuf {
    public static void main(String[] args) {
    // 初始容量默认为256
    ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
    buf.writeBytes(new byte[]{1, 2, 3, 4});
    log(buf);
    buf.writeInt(5);
    log(buf);
    buf.writeInt(6);
    log(buf);
    System.out.println(buf.readByte());
    System.out.println(buf.readByte());
    System.out.println(buf.readByte());
    System.out.println(buf.readByte());
    log(buf);
    buf.markReaderIndex();
    System.out.println(buf.readInt());
    log(buf);
    }
    }

    这时要重复读取的话,重置到标记位置reset。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class TestByteBuf {
    public static void main(String[] args) {
    // 初始容量默认为256
    ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
    buf.writeBytes(new byte[]{1, 2, 3, 4});
    log(buf);
    buf.writeInt(5);
    log(buf);
    buf.writeInt(6);
    log(buf);
    System.out.println(buf.readByte());
    System.out.println(buf.readByte());
    System.out.println(buf.readByte());
    System.out.println(buf.readByte());
    log(buf);
    buf.markReaderIndex();
    System.out.println(buf.readInt());
    log(buf);
    buf.resetReaderIndex();
    log(buf);
    }
    }

    还有get()开头的一系列方法,这些方法不会改变read index。

2.5.8 内存释放

  • 由于Netty中有堆外内存的ByteBuf实现,堆外内存最好是手动来释放,而不是等GC垃圾回收。

    • UnpooledHeapByteBuf使用的是JVM内存,只需等GC回收内存即可。
    • UnpooledDirectByteBuf使用的就是直接内存了,需要特殊的方法来回收内存。
    • PooledByteBuf和它的子类使用了池化机制,需要更复杂的规则来回收内存。
  • Netty这里采用了引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口

    • 每个ByteBuf对象的初始计数为1。
    • 调用release方法计数减1,如果计数为0,ByteBuf内存被回收。
    • 调用retain方法计数加1,表示调用者没用完之前,其它handler即使调用了release也不会造成回收。
    • 当计数为0时,底层内存会被回收,这时即使ByteBuf对象还在,其各个方法均无法正常使用。
  • 不能简单地在handler里使用try catch finally来释放,因为pipeline的存在,一般需要将ByteBuf传递给下一个ChannelHandler,如果在finally中release了,就失去了传递性(当然,如果在这个ChannelHandler内这个ByteBuf已完成了它的使命,那么便无须再传递),所以基本规则是,谁是最后使用者,谁负责release,详细分析如下:

    • 起点,对于NIO实现来讲,在io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read方法中首次创建ByteBuf放入pipeline(line 163 pipeline.fireChannelRead(byteBuf))。

    • 入站ByteBuf处理原则。

      • 对原始ByteBuf不做处理,调用ctx.fireChannelRead(msg)向后传递,这时无须release。

      • 将原始ByteBuf转换为其它类型的Java对象,这时ByteBuf就没用了,必须release。

      • 如果不调用ctx.fireChannelRead(msg)向后传递,那么也必须release。

      • 注意各种异常,如果ByteBuf没有成功传递到下一个 ChannelHandler,必须release。

      • 假设消息一直向后传,那么TailContext会负责释放未处理消息(原始的ByteBuf)。

      • TailContext释放未处理消息逻辑:

    • 出站ByteBuf处理原则。

      • 出站消息最终都会转为ByteBuf输出,一直向前传,由HeadContext flush后release。

    • 异常处理原则。

      • 有时候不清楚ByteBuf被引用了多少次,但又必须彻底释放,可以循环调用release直到返回true。

2.5.9 slice

  • slice是零拷贝(这里指减少数据复制)的体现之一,对原始ByteBuf进行切片成多个ByteBuf,切片后的ByteBuf并没有发生内存复制,还是使用原始ByteBuf的内存,切片后的ByteBuf维护独立的read,write指针。

    1638607920429
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class TestSlice {
    public static void main(String[] args) {
    ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
    buf.writeBytes(new byte[]{'a','b','c','d','e','f','g','h','i','j'});
    log(buf);
    // 在切片过程中,没有发生数据复制
    ByteBuf f1 = buf.slice(0, 5);
    ByteBuf f2 = buf.slice(5, 5);
    log(f1);
    log(f2);
    System.out.println("====================");
    f1.setByte(0, 'b');
    log(f1);
    log(buf);
    }
    }

    切片后如果追加数据会报错。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public class TestSlice {
    public static void main(String[] args) {
    ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
    buf.writeBytes(new byte[]{'a','b','c','d','e','f','g','h','i','j'});
    log(buf);
    // 在切片过程中,没有发生数据复制
    ByteBuf f1 = buf.slice(0, 5);
    ByteBuf f2 = buf.slice(5, 5);
    log(f1);
    log(f2);
    f1.writeByte(5);
    }
    }

    如果切片后释放了原ByteBuf的内存,再使用切片后的ByteBuf也会报错,因为底层都是同一块内存,可以使用f1.retain()使引用计数加1。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public class TestSlice {
    public static void main(String[] args) {
    ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
    buf.writeBytes(new byte[]{'a','b','c','d','e','f','g','h','i','j'});
    log(buf);
    // 在切片过程中,没有发生数据复制
    ByteBuf f1 = buf.slice(0, 5);
    ByteBuf f2 = buf.slice(5, 5);
    log(f1);
    log(f2);
    System.out.println("释放原有byteBuf内存");
    buf.release();
    log(f1);
    }
    }

2.5.10 duplicate

  • duplicate是零拷贝(这里指减少数据复制)的体现之一,就好比截取了原始ByteBuf所有内容,并且没有max capacity的限制,也是与原始ByteBuf使用同一块底层内存,只是读写指针是独立的。

    1638622492169

2.5.11 copy

  • 会将底层内存数据进行深拷贝,因此无论读写,都与原始ByteBuf无关。

2.5.12 CompositeByteBuf

  • 也是零拷贝(这里指减少数据复制)的体现之一,可以将多个ByteBuf合并为一个逻辑上的 ByteBuf,避免拷贝。

  • CompositeByteBuf是一个组合的ByteBuf,它内部维护了一个Component数组,每个Component管理一个ByteBuf,记录了这个ByteBuf相对于整体偏移量等信息,代表着整体中某一段的数据。

    • 优点,对外是一个虚拟视图,组合这些ByteBuf不会产生内存复制。
    • 缺点,复杂了很多,多次操作会带来性能的损耗。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public class TestCompositeByteBuf {
    public static void main(String[] args) {
    ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();
    buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});

    ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();
    buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});

    // 以下方式会发生数据复制,对性能造成影响
    // ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    // buffer.writeBytes(buf1).writeBytes(buf2);
    // log(buffer);
    CompositeByteBuf buffer = ByteBufAllocator.DEFAULT.compositeBuffer();
    // 为true表示自动增长写指针
    buffer.addComponents(true, buf1, buf2);
    log(buffer);
    }
    }

2.5.13 Unpooled

  • Unpooled是一个工具类,类如其名,提供了非池化的ByteBuf创建、组合、复制等操作。它有着和零拷贝相关的wrappedBuffer方法,可以用来包装ByteBuf。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public class TestUnpooled {
    public static void main(String[] args) {
    ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
    buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
    ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
    buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});

    // 当包装ByteBuf个数超过一个时, 底层使用了CompositeByteBuf
    ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);
    System.out.println(ByteBufUtil.prettyHexDump(buf3));
    }
    }

    也可以用来包装普通字节数组,底层也不会有拷贝操作。

    1
    2
    3
    4
    5
    6
    7
    public class TestUnpooled {
    public static void main(String[] args) {
    ByteBuf buf4 = Unpooled.wrappedBuffer(new byte[]{1, 2, 3}, new byte[]{4, 5, 6});
    System.out.println(buf4.getClass());
    System.out.println(ByteBufUtil.prettyHexDump(buf4));
    }
    }

3、Netty架构

3.1 线程模型

  • 目前存在的线程模型有:传统阻塞I/O服务模型Reactor模式
  • 根据Reactor的数量和处理资源池线程的数量不同,有3种典型的实现。
    • 单Reactor单线程。
    • 单Reactor多线程。
    • 主从Reactor多线程。
  • Netty线程模式(Netty主要基于主从Reactor多线程模型做了一定的改进,其中主从Reactor多线程模型有多个Reactor)

3.1.1 传统阻塞I/O服务模型

  • 工作原理图如下:

    • 采用阻塞IO模式获取输入的数据。
    • 每个连接都需要独立的线程完成数据的输入,业务处理,数据返回。
    • 当并发数很大,就会创建大量的线程,占用很大系统资源。
    • 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在read操作,造成线程资源浪费。

3.1.2 Reactor模式

  • 针对传统阻塞I/0服务模型的2个缺点,解决方案如下:

    • 基于I/O复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。Reactor对应的叫法:

      • 反应器模式。
      • 分发者模式(Dispatcher)
      • 通知者模式(Notifier)
    • 基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。

      • Reactor模式,通过一个或多个输入同时传递给服务处理器(ServiceHandler)的模式(基于事件驱动)
      • 服务器端程序处理传入的多个请求,并将它们同步分派到相应的处理线程,因此Reactor模式也叫Dispatcher模式。
      • Reactor模式使用IO复用监听事件,收到事件后,分发给某个线程(进程),这点就是网络服务器高并发处理关键。
  • Reactor模式中核心组成:

    • Reactor:Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对IO事件做出反应。它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人。
    • Handlers:处理程序执行I/O事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor通过调度适当的处理程序来响应I/O事件,处理程序执行非阻塞操作。
  • Reactor模式具有如下的优点:

    • 响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的。
    • 可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销。
    • 扩展性好,可以方便的通过增加Reactor实例个数来充分利用CPU资源。
    • 复用性好,Reactor模型本身与具体事件处理逻辑无关,具有很高的复用性。

3.1.2.1 单Reactor单线程

  • 工作原理示意图:

    • select是前面I/O复用模型介绍的标准网络编程API,可以实现应用程序通过一个阻塞对象监听多路连接请求。
    • Reactor对象通过Select监控客户端请求事件,收到事件后通过Dispatch进行分发。
    • 如果是建立连接请求事件,则由Acceptor通过Accept处理连接请求,然后创建一个Handler对象处理连接完成后的后续业务处理。
    • 如果不是建立连接事件,则Reactor会分发调用连接对应的Handler来响应。
    • Handler会完成Read→业务处理→Send的完整业务流程。
    • 服务器端用一个线程通过多路复用搞定所有的IO操作(包括连接,读、写等),编码简单,清晰明了,但是如果客户端连接数量较多,将无法支撑,前面的NIO案例就属于这种模型。
  • 优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成。

  • 缺点:

    • 性能问题,只有一个线程,无法完全发挥多核CPU的性能。Handler在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈。
    • 可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。
  • 使用场景:客户端的数量有限,业务处理非常快速,比如Redis在业务处理的时间复杂度O(1)的情况。

3.1.2.2 单Reactor多线程

  • 工作原理示意图:

    • Reactor对象通过Select监控客户端请求事件,收到事件后,通过Dispatch进行分发。
    • 如果建立连接请求,则由Acceptor通过accept处理连接请求,然后创建一个Handler对象处理完成连接后的各种事件。
    • 如果不是连接请求,则由Reactor分发调用连接对应的handler来处理。
    • handler只负责响应事件,不做具体的业务处理,通过read读取数据后,会分发给后面的worker线程池的某个线程处理业务。
    • worker线程池会分配独立线程完成真正的业务,并将结果返回给handler。
    • handler收到响应后,通过send将结果返回给client。
  • 优点:可以充分的利用多核cpu的处理能力。

  • 缺点:多线程数据共享和访问比较复杂,Reactor处理所有的事件的监听和响应,在单线程运行,在高并发场景容易出现性能瓶颈。

3.1.2.3 主从Reactor多线程

  • 工作原理示意图:

    • 针对单Reactor多线程模型中,Reactor在单线程中运行,高并发场景下容易成为性能瓶颈,可以让Reactor在多线程中运行。

      • Reactor主线程的MainReactor对象通过select监听连接事件,收到事件后,通过Acceptor处理连接事件。
      • 当Acceptor处理连接事件后,MainReactor将连接分配给SubReactor。
      • SubReactor将连接加入到连接队列进行监听,并创建handler进行各种事件处理。
      • 当有新事件发生时,SubReactor就会调用对应的handler处理。
      • handler通过read读取数据,分发给后面的worker线程处理。
      • worker线程池分配独立的worker线程进行业务处理,并返回结果。
      • handler收到响应的结果后,再通过send将结果返回给client。
      • Reactor主线程可以对应多个Reactor子线程,即MainRecator可以关联多个SubReactor。
  • 优点:

    • 父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。
    • 父线程与子线程的数据交互简单,Reactor主线程只需要把新连接传给子线程,子线程无需返回数据。
  • 缺点:编程复杂度较高。

  • 这种模型在许多项目中广泛使用,包括Nginx主从Reactor多进程模型,Memcached主从多线程,Netty主从多线程模型的支持。

3.2 Netty模型

  • Netty主要基于主从Reactors多线程模型(如图)做了一定的改进,其中主从Reactor多线程模型有多个Reactor。

    • BossGroup线程维护Selector,只关注Accept。
    • 当接收到Accept事件,获取到对应的SocketChannel,封装成NIOScoketChannel并注册到Worker线程(事件循环),并进行维护。
    • 当Worker线程监听到Selector中通道发生自己感兴趣的事件后,就进行处理(就由handler),注意handler已经加入到通道。
  • 进阶图示:

  • 详细图示:

    • Netty抽象出两组线程池:BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写。
    • BossGroup和WorkerGroup类型都是NioEventLoopGroup。
    • NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是NioEventLoop。
    • NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个Selector,用于监听绑定在其上的socket的网络通讯。
    • NioEventLoopGroup可以有多个线程,即可以含有多个NioEventLoop。
    • BossGroup的每个NioEventLoop循环执行的步骤有3步。
      • 轮询accept事件。
      • 处理accept事件,与client建立连接,生成NioScocketChannel,并将其注册到WorkerGroup的某个NIOEventLoop上的Selector。
      • 处理任务队列的任务,即runAllTasks。
    • WorkerGroup的每个NIOEventLoop循环执行的步骤:
      • 轮询read,write事件。
      • 处理I/O事件,即read,write事件,在对应NioScocketChannel处理。
      • 处理任务队列的任务,即runAllTasks。
    • WorkerGroup的每个NIOEventLoop处理业务时,会使用pipeline(管道),pipeline中包含了channel,即通过pipeline可以获取到对应通道,管道中维护了很多的处理器。
    • 注意:
      • NioEventLoopGroup下包含多个NioEventLoop。
      • 每个NioEventLoop中包含有一个Selector,一个taskQueue。
      • 每个NioEventLoop的Selector上可以注册监听多个NioChannel。
      • 每个NioChannel只会绑定在唯一的NioEventLoop上。
      • 每个NioChannel都绑定有一个自己的ChannelPipeline。
  • Netty实例:

    • Netty服务器在6668端口监听,客户端能发送消息给服务器”hello,服务器~”。

    • 服务器可以回复消息给客户端”hello,客户端~”。

      • 服务器端:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        70
        71
        72
        73
        74
        75
        76
        77
        78
        79
        80
        public class NettyServer {
        public static void main(String[] args) throws Exception {
        //1. 创建两个线程组 bossGroup 和 workerGroup
        //2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成
        //3. 两个都是无限循环
        //4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数默认是 cpu逻辑核数 * 2
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
        //创建服务器端的启动对象,配置参数
        ServerBootstrap bootstrap = new ServerBootstrap();
        //使用链式编程来进行设置
        bootstrap.group(bossGroup, workerGroup) //设置两个线程组
        .channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现
        .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
        .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
        .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
        //给pipeline 设置处理器
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
        System.out.println("客户socketChannel hashcode=" + ch.hashCode()); //可以使用一个集合管理 SocketChannel, 在推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue
        ch.pipeline().addLast(new NettyServerHandler());
        }
        }); // 给workerGroup 的 EventLoop 对应的管道设置处理器

        System.out.println("服务器 is ready...");

        //绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
        //启动服务器(并绑定端口)
        ChannelFuture cf = bootstrap.bind(6668).sync();
        //对关闭通道进行监听
        cf.channel().closeFuture().sync();
        }finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
        }
        }
        }

        /**
        * 我们自定义一个Handler需要继承netty规定好的某个HandlerAdapter(规范)
        */
        public class NettyServerHandler extends ChannelInboundHandlerAdapter {
        //读取数据事件(这里我们可以读取客户端发送的消息)
        /**
        * @param ctx 上下文对象, 含有管道pipeline、通道channel、地址
        * @param msg 就是客户端发送的数据,默认是Object
        * @throws Exception 异常
        */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channel =" + ctx.channel());
        System.out.println("server ctx =" + ctx);
        System.out.println("看看channel 和 pipeline的关系");
        Channel channel = ctx.channel();
        ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站

        //将 msg 转成一个 ByteBuf
        //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址:" + channel.remoteAddress());
        }

        //数据读取完毕
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //writeAndFlush 是 write + flush
        //将数据写入到缓存,并刷新
        //一般讲,我们对这个发送的数据进行编码
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~", CharsetUtil.UTF_8));
        }

        //处理异常, 一般是需要关闭通道
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        }
        }
    • 客户端:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      public class NettyClient {
      public static void main(String[] args) throws InterruptedException {
      //客户端需要一个事件循环组
      EventLoopGroup group = new NioEventLoopGroup();
      try {
      //创建客户端启动对象
      //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
      Bootstrap bootstrap = new Bootstrap();

      //设置相关参数
      bootstrap.group(group) //设置线程组
      .channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
      .handler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
      ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器
      }
      });

      System.out.println("客户端 ok..");

      //启动客户端去连接服务器端
      //关于 ChannelFuture 要分析,涉及到netty的异步模型
      ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
      //给关闭通道进行监听
      channelFuture.channel().closeFuture().sync();
      }finally {
      group.shutdownGracefully();
      }
      }
      }

      public class NettyClientHandler extends ChannelInboundHandlerAdapter {
      //当通道就绪就会触发该方法
      @Override
      public void channelActive(ChannelHandlerContext ctx) throws Exception {
      System.out.println("client " + ctx);
      ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server", CharsetUtil.UTF_8));
      }

      //当通道有读取事件时,会触发
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

      ByteBuf buf = (ByteBuf) msg;
      System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
      System.out.println("服务器的地址: "+ ctx.channel().remoteAddress());
      }

      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
      cause.printStackTrace();
      ctx.close();
      }
      }

3.2.1 任务队列

  • NioEventLoop中的TaskQueue有3种典型使用场景:

    • 用户程序自定义的普通任务。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      public class NettyServerHandler extends ChannelInboundHandlerAdapter {
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      //比如这里我们有一个非常耗时长的业务-> 异步执行 -> 提交该channel 对应的
      //NIOEventLoop 的 taskQueue中,
      //解决方案1 用户程序自定义的普通任务
      ctx.channel().eventLoop().execute(new Runnable() {
      @Override
      public void run() {
      try {
      Thread.sleep(5 * 1000);
      ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));
      System.out.println("channel code=" + ctx.channel().hashCode());
      } catch (Exception ex) {
      System.out.println("发生异常" + ex.getMessage());
      }
      }
      });
      System.out.println("go on ...");
      }

      //数据读取完毕
      @Override
      public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

      //writeAndFlush 是 write + flush
      //将数据写入到缓存,并刷新
      //一般讲,我们对这个发送的数据进行编码
      ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
      }

      //处理异常, 一般是需要关闭通道
      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
      ctx.close();
      }
      }
  • 用户自定义定时任务。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //解决方案2 : 用户自定义定时任务 -》 该任务是提交到 scheduleTaskQueue中
    ctx.channel().eventLoop().schedule(new Runnable() {
    @Override
    public void run() {

    try {
    Thread.sleep(5 * 1000);
    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵4", CharsetUtil.UTF_8));
    System.out.println("channel code=" + ctx.channel().hashCode());
    } catch (Exception ex) {
    System.out.println("发生异常" + ex.getMessage());
    }
    }
    }, 5, TimeUnit.SECONDS);
    System.out.println("go on ...");
    }

    //数据读取完毕
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

    //writeAndFlush 是 write + flush
    //将数据写入到缓存,并刷新
    //一般讲,我们对这个发送的数据进行编码
    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
    }

    //处理异常, 一般是需要关闭通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.close();
    }
    }
  • 非当前Reactor线程调用Channel的各种方法,例如在推送系统的业务线程里面,根据用户的标识,找到对应的Channel引用,然后调用Write类方法向该用户推送消息,就会进入到这种场景。最终的Write会提交到任务队列中后被异步消费。

3.2.2 异步模型

  • 异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。

  • Netty中的I/O操作是异步的,包括Bind、Write、Connect等操作会简单的返回一个ChannelFuture。

  • 调用者并不能立刻获得结果,而是通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。

    • 当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。

    • 常见有如下操作:

      • 通过isDone方法来判断当前操作是否完成。

      • 通过isSuccess方法来判断已完成的当前操作是否成功。

      • 通过getCause方法来获取已完成的当前操作失败的原因。

      • 通过isCancelled方法来判断已完成的当前操作是否被取消。

      • 通过addListener方法来注册监听器,当操作已完成(isDone方法返回完成),将会通知指定的监听器;如果Future对象已完成,则通知指定的监听器。

      • 例如给前面服务器端可以在绑定端口后进行监听:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        //绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
        //启动服务器(并绑定端口)
        ChannelFuture cf = bootstrap.bind(6668).sync();

        //给cf 注册监听器,监控我们关心的事件
        cf.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
        if (cf.isSuccess()) {
        System.out.println("监听端口 6668 成功");
        } else {
        System.out.println("监听端口 6668 失败");
        }
        }
        });
  • Netty的异步模型是建立在future和callback之上的。callback就是回调。重点说Future,它的核心思想是:假设一个方法fun,计算过程可能非常耗时,等待fun返回显然不合适。那么可以在调用fun的时候,立马返回一个Future,后续可以通过Future去监控方法fun的处理过程(即:Future-Listener机制)

  • Future说明:

    • 表示异步的执行结果,可以通过它提供的方法来检测执行是否完成,比如检索计算等等。
    • ChannelFuture是一个接口:public interface ChannelFuture extends Future,我们可以添加监听器,当监听的事件发生时,就会通知到监听器。
  • 工作原理示意图:

    • 在使用Netty进行编程时,拦截操作和转换出入站数据只需要您提供callback或利用 future 即可。这使得链式操作简单、高效, 并有利于编写可重用的、通用的代码。
    • Netty框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来、解脱出来。
    • 相比传统阻塞I/O,执行I/O操作后线程会被阻塞住, 直到操作完成;异步处理的好处是不会造成线程阻塞,线程在I/O操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量。

4、Netty进阶

4.1 粘包与半包现象

  • 粘包现象。

    • 客户端。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      @Slf4j
      public class HelloClient {
      public static void main(String[] args) {
      NioEventLoopGroup worker = new NioEventLoopGroup();
      try {
      Bootstrap bootstrap = new Bootstrap();
      bootstrap.channel(NioSocketChannel.class);
      bootstrap.group(worker);
      bootstrap.handler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
      log.debug("connected...");
      ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
      // 连接建立成功后会触发此事件
      @Override
      public void channelActive(ChannelHandlerContext ctx) throws Exception {
      log.debug("sending...");
      for (int i = 0; i < 10; i++) {
      ByteBuf buffer = ctx.alloc().buffer();
      buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
      ctx.writeAndFlush(buffer);
      }
      }
      });
      }
      });
      ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
      channelFuture.channel().closeFuture().sync();
      } catch (InterruptedException e) {
      log.error("client error", e);
      } finally {
      worker.shutdownGracefully();
      }
      }
      }
    • 服务端。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      @Slf4j
      public class HelloServer {
      public static void main(String[] args) {
      NioEventLoopGroup boss = new NioEventLoopGroup(1);
      NioEventLoopGroup worker = new NioEventLoopGroup();
      try {
      ServerBootstrap serverBootstrap = new ServerBootstrap();
      serverBootstrap.channel(NioServerSocketChannel.class);
      serverBootstrap.group(boss, worker);
      serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
      ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
      ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
      // 连接建立成功后会触发此事件
      @Override
      public void channelActive(ChannelHandlerContext ctx) throws Exception {
      log.debug("connected {}", ctx.channel());
      super.channelActive(ctx);
      }

      @Override
      public void channelInactive(ChannelHandlerContext ctx) throws Exception {
      log.debug("disconnect {}", ctx.channel());
      super.channelInactive(ctx);
      }
      });
      }
      });
      ChannelFuture channelFuture = serverBootstrap.bind(8080);
      log.debug("{} binding...", channelFuture.channel());
      channelFuture.sync();
      log.debug("{} bound...", channelFuture.channel());
      channelFuture.channel().closeFuture().sync();
      } catch (InterruptedException e) {
      log.error("server error", e);
      } finally {
      boss.shutdownGracefully();
      worker.shutdownGracefully();
      log.debug("stoped");
      }
      }
      }

      服务器端的某次输出,可以看到一次就接收了160个字节,而非分10次接收,即为粘包现象。原因:

      • 应用层:接收方ByteBuf设置太大(Netty 默认1024)。
      • 滑动窗口:假设发送方256 bytes表示一个完整报文,但由于接收方处理不及时且窗口大小足够大,这256bytes字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会粘包。
      • Nagle算法:会造成粘包。
  • 半包现象。

    • 客户端希望发送1个消息,这个消息是160字节,代码改为:

      1
      2
      3
      4
      5
      ByteBuf buffer = ctx.alloc().buffer();
      for (int i = 0; i < 10; i++) {
      buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
      }
      ctx.writeAndFlush(buffer);
    • 为现象明显,服务端修改一下接收缓冲区,其它代码不变。

      1
      serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);

      服务器端的某次输出,可以看到接收的消息被分为两节,第一次20字节,第二次140字节。serverBootstrap.option(ChannelOption.SO_RCVBUF, 10)影响的底层接收缓冲区(即滑动窗口)大小,仅决定了netty读取的最小单位,netty实际每次读取的一般是它的整数倍。原因:

      • 应用层:接收方ByteBuf小于实际发送数据量。
      • 滑动窗口:假设接收方的窗口只剩了128bytes,发送方的报文大小是256bytes,这时放不下了,只能先发送前128bytes,等待ack后才能发送剩余部分,这就造成了半包。
      • MSS限制:当发送的数据超过MSS限制后,会将数据切分发送,就会造成半包。

4.2 解决方案

4.2.1 短链接

  • 客户端发送完整数据后,立即关闭连接。即发一个包建立一次连接,这样连接建立到连接断开之间就是消息的边界,缺点效率太低。以解决粘包为例:

    • 客户端:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      @Slf4j
      public class HelloClient {
      public static void main(String[] args) {
      for (int i = 0; i < 5; i++) {
      send();
      }
      }

      private static void send() {
      NioEventLoopGroup worker = new NioEventLoopGroup();
      try {
      Bootstrap bootstrap = new Bootstrap();
      bootstrap.channel(NioSocketChannel.class);
      bootstrap.group(worker);
      bootstrap.handler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
      log.debug("connected...");
      ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
      // 连接建立成功后会触发此事件
      @Override
      public void channelActive(ChannelHandlerContext ctx) throws Exception {
      log.debug("sending...");
      ByteBuf buffer = ctx.alloc().buffer();
      buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
      ctx.writeAndFlush(buffer);
      // 发完即关
      ctx.channel().close();
      }
      });
      }
      });
      ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
      channelFuture.channel().closeFuture().sync();
      } catch (InterruptedException e) {
      log.error("client error", e);
      } finally {
      worker.shutdownGracefully();
      }
      }
      }
    • 服务端:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      @Slf4j
      public class HelloServer {
      public static void main(String[] args) {
      NioEventLoopGroup boss = new NioEventLoopGroup(1);
      NioEventLoopGroup worker = new NioEventLoopGroup();
      try {
      ServerBootstrap serverBootstrap = new ServerBootstrap();
      serverBootstrap.channel(NioServerSocketChannel.class);
      serverBootstrap.group(boss, worker);
      serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
      ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
      }
      });
      ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
      channelFuture.channel().closeFuture().sync();
      } catch (InterruptedException e) {
      log.error("server error", e);
      } finally {
      boss.shutdownGracefully();
      worker.shutdownGracefully();
      log.debug("stoped");
      }
      }
      }
  • 半包用这种办法还是不好解决,因为接收方的缓冲区大小是有限的。

    • 客户端在上方基础上多传输两个字节的数据,服务端修改netty接收缓冲区的大小,使得客户端一次性发送的数据大于此缓冲区,服务端代码如下:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      @Slf4j
      public class HelloServer {
      public static void main(String[] args) {
      NioEventLoopGroup boss = new NioEventLoopGroup(1);
      NioEventLoopGroup worker = new NioEventLoopGroup();
      try {
      ServerBootstrap serverBootstrap = new ServerBootstrap();
      serverBootstrap.channel(NioServerSocketChannel.class);
      // 调整系统的接收缓冲区大小(滑动窗口)
      // serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
      // 调整netty的接收缓冲区(ByteBuf)
      serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16, 16, 16));
      serverBootstrap.group(boss, worker);
      serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
      ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
      }
      });
      ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
      channelFuture.channel().closeFuture().sync();
      } catch (InterruptedException e) {
      log.error("server error", e);
      } finally {
      boss.shutdownGracefully();
      worker.shutdownGracefully();
      log.debug("stoped");
      }
      }
      }

4.2.2 固定长度

  • 每一条消息采用固定长度,缺点是浪费空间。下面假设客户端每次发送10个字节的完整数据,服务端添加个解码器FixedLengthFrameDecoder,并固定长度为10,这样服务端就会每10个字节截取一次。

    • 客户端:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
        public class Client {
      static final Logger log = LoggerFactory.getLogger(Client.class);

      public static void main(String[] args) {
      send();
      System.out.println("finish");
      }

      public static byte[] fill10Bytes(char c, int len) {
      byte[] bytes = new byte[10];
      Arrays.fill(bytes, (byte) '_');
      for (int i = 0; i < len; i++) {
      bytes[i] = (byte) c;
      }
      System.out.println(new String(bytes));
      return bytes;
      }

      private static void send() {
      NioEventLoopGroup worker = new NioEventLoopGroup();
      try {
      Bootstrap bootstrap = new Bootstrap();
      bootstrap.channel(NioSocketChannel.class);
      bootstrap.group(worker);
      bootstrap.handler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) {
      ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
      ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
      // 会在连接channel建立成功后,会触发active事件
      @Override
      public void channelActive(ChannelHandlerContext ctx) {
      ByteBuf buf = ctx.alloc().buffer();
      char c = '0';
      Random r = new Random();
      for (int i = 0; i < 10; i++) {
      byte[] bytes = fill10Bytes(c, r.nextInt(10) + 1);
      c++;
      buf.writeBytes(bytes);
      }
      ctx.writeAndFlush(buf);
      }
      });
      }
      });
      ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
      channelFuture.channel().closeFuture().sync();
      } catch (InterruptedException e) {
      log.error("client error", e);
      } finally {
      worker.shutdownGracefully();
      }
      }
      }
    • 服务端:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      @Slf4j
      public class Server {
      void start() {
      NioEventLoopGroup boss = new NioEventLoopGroup(1);
      NioEventLoopGroup worker = new NioEventLoopGroup();
      try {
      ServerBootstrap serverBootstrap = new ServerBootstrap();
      serverBootstrap.channel(NioServerSocketChannel.class);
      // 调整系统的接收缓冲区大小(滑动窗口)
      // serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
      // 调整netty的接收缓冲区(ByteBuf)
      serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16, 16, 16));
      serverBootstrap.group(boss, worker);
      serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
      // FixedLengthFrameDecoder需要放在LoggingHandler前面
      ch.pipeline().addLast(new FixedLengthFrameDecoder(10));
      ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
      }
      });
      ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
      channelFuture.channel().closeFuture().sync();
      } catch (InterruptedException e) {
      log.error("server error", e);
      } finally {
      boss.shutdownGracefully();
      worker.shutdownGracefully();
      log.debug("stoped");
      }
      }

      public static void main(String[] args) {
      new Server().start();
      }
      }
    1
    2
    3
    4
    5
    6

    {% asset_img 1638709147178.png 1638709147178 %}

    {% asset_img 1638709176649.png 1638709176649 %}

    可以看出即使客户端出现了粘包现象,但是服务端能够正确读取数据。

4.2.3 固定分隔符

  • 每一条消息采用分隔符,例如\n,缺点是需要转义。

    • 客户端在每条消息之后,加入\n分隔符。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      public class Client {
      static final Logger log = LoggerFactory.getLogger(Client.class);

      public static void main(String[] args) {
      send();
      System.out.println("finish");
      }

      public static StringBuilder makeString(char c, int len) {
      StringBuilder sb = new StringBuilder(len + 2);
      for (int i = 0; i < len; i++) {
      sb.append(c);
      }
      sb.append("\n");
      return sb;
      }

      private static void send() {
      NioEventLoopGroup worker = new NioEventLoopGroup();
      try {
      Bootstrap bootstrap = new Bootstrap();
      bootstrap.channel(NioSocketChannel.class);
      bootstrap.group(worker);
      bootstrap.handler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) {
      ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
      ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
      // 会在连接channel建立成功后,会触发active事件
      @Override
      public void channelActive(ChannelHandlerContext ctx) {
      ByteBuf buf = ctx.alloc().buffer();
      char c = '0';
      Random r = new Random();
      for (int i = 0; i < 10; i++) {
      StringBuilder sb = makeString(c, r.nextInt(256) + 1);
      c++;
      buf.writeBytes(sb.toString().getBytes());
      }
      ctx.writeAndFlush(buf);
      }
      });
      }
      });
      ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
      channelFuture.channel().closeFuture().sync();
      } catch (InterruptedException e) {
      log.error("client error", e);
      } finally {
      worker.shutdownGracefully();
      }
      }
      }
    • 服务端加入解码器LineBasedFrameDecoder,即以\n或\r\n作为分隔符,如果超出指定长度仍未出现分隔符,则抛出异常。

      除此之外还可以使用自定义分隔符解码器DelimiterBasedFrameDecoder:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      @Slf4j
      public class Server {
      void start() {
      NioEventLoopGroup boss = new NioEventLoopGroup(1);
      NioEventLoopGroup worker = new NioEventLoopGroup();
      try {
      ServerBootstrap serverBootstrap = new ServerBootstrap();
      serverBootstrap.channel(NioServerSocketChannel.class);
      // 调整系统的接收缓冲区大小(滑动窗口)
      // serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
      // 调整netty的接收缓冲区(ByteBuf)
      serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16, 16, 16));
      serverBootstrap.group(boss, worker);
      serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
      ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
      ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
      }
      });
      ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
      channelFuture.channel().closeFuture().sync();
      } catch (InterruptedException e) {
      log.error("server error", e);
      } finally {
      boss.shutdownGracefully();
      worker.shutdownGracefully();
      log.debug("stoped");
      }
      }

      public static void main(String[] args) {
      new Server().start();
      }
      }

4.2.4 预设长度

  • 每一条消息分为head和body,head中包含body的长度。即在发送消息前,先约定用定长字节表示接下来数据的长度。

    • 客户端在发送消息前,先约定用定长字节表示接下来数据的长度。这里使用到解码器LengthFieldBasedFrameDecoder。

    • 这里使用EmbeddedChannel进行测试。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      public class TestLengthFieldDecoder {
      public static void main(String[] args) {
      EmbeddedChannel channel = new EmbeddedChannel(
      /**
      * int maxFrameLength,
      * int lengthFieldOffset,
      * int lengthFieldLength,
      * int lengthAdjustment,
      * int initialBytesToStrip
      */
      new LengthFieldBasedFrameDecoder(
      1024, 0, 4, 0,0),
      new LoggingHandler(LogLevel.DEBUG)
      );

      // 客户端发4个字节的内容长度
      ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
      send(buffer, "Hello, world");
      send(buffer, "Hi!");
      channel.writeInbound(buffer);
      }

      private static void send(ByteBuf buffer, String content) {
      byte[] bytes = content.getBytes(); // 实际内容
      int length = bytes.length; // 实际内容长度
      buffer.writeInt(length);
      buffer.writeBytes(bytes);
      }
      }

      如果要剥离头部,则方法调用改为:

      1
      new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)

      如果在消息长度和内容间加了个字节代表版本号,即:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      public class TestLengthFieldDecoder {
      public static void main(String[] args) {
      EmbeddedChannel channel = new EmbeddedChannel(
      /**
      * int maxFrameLength,
      * int lengthFieldOffset,
      * int lengthFieldLength,
      * int lengthAdjustment,
      * int initialBytesToStrip
      */
      new LengthFieldBasedFrameDecoder(
      1024, 0, 4, 1,0),
      new LoggingHandler(LogLevel.DEBUG)
      );

      // 客户端发4个字节的内容长度
      ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
      send(buffer, "Hello, world");
      send(buffer, "Hi!");
      channel.writeInbound(buffer);
      }

      private static void send(ByteBuf buffer, String content) {
      byte[] bytes = content.getBytes(); // 实际内容
      int length = bytes.length; // 实际内容长度
      buffer.writeInt(length);
      buffer.writeByte(1); // 代表版本1
      buffer.writeBytes(bytes);
      }
      }

4.3 协议设计与解析

4.3.1 Redis协议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Slf4j
public class TestRedis {
/*
set name zhangsan
*3:字符串数组长度
$3:set命令长度
set:set命令
$4:key长度
name:key实际值
$8:value长度
zhangsan:value实际值
*/
public static void main(String[] args) {
final byte[] LINE = {13, 10};// 回车和换行
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes("*3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("set".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$4".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("name".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$8".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("zhangsan".getBytes());
buf.writeBytes(LINE);
ctx.writeAndFlush(buf);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
  • 先启动redis,再启动此客户端后,控制台打印如下:

    发现key=name,value=zhangsan的数据成功插入redis,并且redis返回”ok”。

4.3.2 Http协议

  • netty已经帮我们封装好专门处理HTTP请求的编解码器HttpServerCodec。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    @Slf4j
    public class TestHttp {
    public static void main(String[] args) {
    NioEventLoopGroup boss = new NioEventLoopGroup();
    NioEventLoopGroup worker = new NioEventLoopGroup();
    try {
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.channel(NioServerSocketChannel.class);
    serverBootstrap.group(boss, worker);
    serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
    ch.pipeline().addLast(new HttpServerCodec());
    // 只关注HttpRequest类型的消息
    ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
    // 获取请求
    log.debug(msg.uri());

    // 返回响应
    DefaultFullHttpResponse response =
    new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);

    byte[] bytes = "<h1>Hello, world!</h1>".getBytes();

    response.headers().setInt(CONTENT_LENGTH, bytes.length);
    response.content().writeBytes(bytes);

    // 写回响应
    ctx.writeAndFlush(response);
    }
    });
    /*ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    log.debug("{}", msg.getClass());

    if (msg instanceof HttpRequest) { // 请求行,请求头

    } else if (msg instanceof HttpContent) { //请求体

    }
    }
    });*/
    }
    });
    ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
    channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
    log.error("server error", e);
    } finally {
    boss.shutdownGracefully();
    worker.shutdownGracefully();
    }
    }
    }
  • 运行此服务端程序后,浏览器访问8080端口,服务端控制台显示如下:

    浏览器显示如下:

    1638882959362

4.3.3 自定义协议

  • 自定义协议需要考虑的要素如下:
    • 魔数,用来在第一时间判定是否是无效数据包。
    • 版本号,可以支持协议的升级。
    • 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk。
    • 指令类型,是登录、注册、单聊、群聊等跟业务相关。
    • 请求序号,为了双工通信,提供异步能力。
    • 正文长度。
    • 消息正文。

根据上面的要素,设计一个登录请求消息和登录响应消息,并使用Netty完成收发。

  • 自定义编解码器。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    @Slf4j
    public class MessageCodec extends ByteToMessageCodec<Message> {
    @Override
    public void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
    // 1、4个字节的魔数
    out.writeBytes(new byte[]{1, 2, 3, 4});
    // 2、1个字节的版本
    out.writeByte(1);
    // 3、1个字节的序列化方式,0代表jdk, 1代表json
    out.writeByte(0);
    // 4、1个字节的指令类型
    out.writeByte(msg.getMessageType());
    // 5、4个字节的请求序号
    out.writeInt(msg.getSequenceId());
    // 无意义,对齐填充
    out.writeByte(0xff);
    // 6、获取内容的字节数组
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    ObjectOutputStream oos = new ObjectOutputStream(bos);
    oos.writeObject(msg);
    byte[] bytes = bos.toByteArray();
    // 7、4个字节的正文长度
    out.writeInt(bytes.length);
    // 8、写入正文内容
    out.writeBytes(bytes);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    int magicNum = in.readInt();
    byte version = in.readByte();
    byte serializerType = in.readByte();
    byte messageType = in.readByte();
    int sequenceId = in.readInt();
    in.readByte();
    int length = in.readInt();
    byte[] bytes = new byte[length];
    in.readBytes(bytes, 0, length);
    ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
    Message message = (Message) ois.readObject();
    log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
    log.debug("{}", message);
    // 将解析好的对象加入集合中给下个handler调用
    out.add(message);
    }
    }

    测试代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public class TestMessageCodec {
    public static void main(String[] args) throws Exception {
    EmbeddedChannel channel = new EmbeddedChannel(
    new LoggingHandler(),
    // 解决粘包半包问题
    new LengthFieldBasedFrameDecoder(
    1024, 12, 4, 0, 0),
    new MessageCodec()
    );
    // encode
    LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123");
    channel.writeOutbound(message);

    // decode
    ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
    // 将message的内容填充到buf中,后面将buf解码
    new MessageCodec().encode(null, message, buf);
    channel.writeInbound(buf);
    }
    }

    测试结果分析如下:

  • 验证LengthFieldBasedFrameDecoder解决了半包问题:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class TestMessageCodec {
    public static void main(String[] args) throws Exception {
    EmbeddedChannel channel = new EmbeddedChannel(
    new LoggingHandler(),
    // 解决粘包半包问题
    new LengthFieldBasedFrameDecoder(
    1024, 12, 4, 0, 0),
    new MessageCodec()
    );
    LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123");
    // decode
    ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
    // 将message的内容填充到buf中,后面将buf解码
    new MessageCodec().encode(null, message, buf);

    ByteBuf s1 = buf.slice(0, 100);
    ByteBuf s2 = buf.slice(100, buf.readableBytes() - 100);
    s1.retain(); // 引用计数加一,即1+1=2
    channel.writeInbound(s1); // writeInbound执行后会调用release方法释放内存,即2-1=1
    channel.writeInbound(s2);
    }
    }

    如果不加LengthFieldBasedFrameDecoder则会报错如下:

handler共享

  • 当handler不保存状态时,就可以安全地在多线程下被共享。

  • netty将可以共享的handler上会作个标记,即加了@Sharable注解,例如LoggingHandler。

  • 但要注意对于编解码器类,不能继承ByteToMessageCodec或CombinedChannelDuplexHandler父类,他们的构造方法对@Sharable有限制。所以上面我们自定义的MessageCodec不能被共享,因为其继承了ByteToMessageCodec类。

    所以为了让MessageCodec能够共享(确保编解码器不会保存状态),我们可以继承另一个父类MessageToMessageCodec。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    @Slf4j
    @ChannelHandler.Sharable
    /**
    * 必须和LengthFieldBasedFrameDecoder一起使用,确保接到的ByteBuf消息是完整的
    */
    public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
    ByteBuf out = ctx.alloc().buffer();
    // 1、4个字节的魔数
    out.writeBytes(new byte[]{1, 2, 3, 4});
    // 2、1个字节的版本
    out.writeByte(1);
    // 3、1个字节的序列化方式,0代表jdk, 1代表json
    out.writeByte(0);
    // 4、1个字节的指令类型
    out.writeByte(msg.getMessageType());
    // 5、4个字节的请求序号
    out.writeInt(msg.getSequenceId());
    // 无意义,对齐填充
    out.writeByte(0xff);
    // 6、获取内容的字节数组
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    ObjectOutputStream oos = new ObjectOutputStream(bos);
    oos.writeObject(msg);
    byte[] bytes = bos.toByteArray();
    // 7、4个字节的正文长度
    out.writeInt(bytes.length);
    // 8、写入正文内容
    out.writeBytes(bytes);
    outList.add(out);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    int magicNum = in.readInt();
    byte version = in.readByte();
    byte serializerType = in.readByte();
    byte messageType = in.readByte();
    int sequenceId = in.readInt();
    in.readByte();
    int length = in.readInt();
    byte[] bytes = new byte[length];
    in.readBytes(bytes, 0, length);
    ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
    Message message = (Message) ois.readObject();
    log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
    log.debug("{}", message);
    // 将解析好的对象加入集合中给下个handler调用
    out.add(message);
    }
    }

5、聊天室案例

5.1 聊天室业务介绍

  • 消息类型抽象父类。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    @Data
    public abstract class Message implements Serializable {

    /**
    * 根据消息类型字节,获得对应的消息 class
    * @param messageType 消息类型字节
    * @return 消息 class
    */
    public static Class<? extends Message> getMessageClass(int messageType) {
    return messageClasses.get(messageType);
    }

    private int sequenceId;

    private int messageType;

    public abstract int getMessageType();

    public static final int LoginRequestMessage = 0;
    public static final int LoginResponseMessage = 1;
    public static final int ChatRequestMessage = 2;
    public static final int ChatResponseMessage = 3;
    public static final int GroupCreateRequestMessage = 4;
    public static final int GroupCreateResponseMessage = 5;
    public static final int GroupJoinRequestMessage = 6;
    public static final int GroupJoinResponseMessage = 7;
    public static final int GroupQuitRequestMessage = 8;
    public static final int GroupQuitResponseMessage = 9;
    public static final int GroupChatRequestMessage = 10;
    public static final int GroupChatResponseMessage = 11;
    public static final int GroupMembersRequestMessage = 12;
    public static final int GroupMembersResponseMessage = 13;
    public static final int PingMessage = 14;
    public static final int PongMessage = 15;
    /**
    * 请求类型 byte 值
    */
    public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
    /**
    * 响应类型 byte 值
    */
    public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;

    private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();

    static {
    messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
    messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
    messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
    messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
    messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
    messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
    messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
    messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
    messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
    messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
    messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
    messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
    messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
    messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);
    messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
    messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
    }
    }
  • 用户管理接口与实现类。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /**
    * 用户管理接口
    */
    public interface UserService {

    /**
    * 登录
    * @param username 用户名
    * @param password 密码
    * @return 登录成功返回 true, 否则返回 false
    */
    boolean login(String username, String password);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public class UserServiceMemoryImpl implements UserService {
    private Map<String, String> allUserMap = new ConcurrentHashMap<>();

    {
    allUserMap.put("zhangsan", "123");
    allUserMap.put("lisi", "123");
    allUserMap.put("wangwu", "123");
    allUserMap.put("zhaoliu", "123");
    allUserMap.put("qianqi", "123");
    }

    @Override
    public boolean login(String username, String password) {
    String pass = allUserMap.get(username);
    if (pass == null) {
    return false;
    }
    return pass.equals(password);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    public abstract class UserServiceFactory {

    private static UserService userService = new UserServiceMemoryImpl();

    public static UserService getUserService() {
    return userService;
    }
    }
  • 会话管理接口与实现。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    /**
    * 会话管理接口
    */
    public interface Session {

    /**
    * 绑定会话
    * @param channel 哪个 channel 要绑定会话
    * @param username 会话绑定用户
    */
    void bind(Channel channel, String username);

    /**
    * 解绑会话
    * @param channel 哪个 channel 要解绑会话
    */
    void unbind(Channel channel);

    /**
    * 获取属性
    * @param channel 哪个 channel
    * @param name 属性名
    * @return 属性值
    */
    Object getAttribute(Channel channel, String name);

    /**
    * 设置属性
    * @param channel 哪个 channel
    * @param name 属性名
    * @param value 属性值
    */
    void setAttribute(Channel channel, String name, Object value);

    /**
    * 根据用户名获取 channel
    * @param username 用户名
    * @return channel
    */
    Channel getChannel(String username);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    public class SessionMemoryImpl implements Session {

    private final Map<String, Channel> usernameChannelMap = new ConcurrentHashMap<>();
    private final Map<Channel, String> channelUsernameMap = new ConcurrentHashMap<>();
    private final Map<Channel,Map<String,Object>> channelAttributesMap = new ConcurrentHashMap<>();

    @Override
    public void bind(Channel channel, String username) {
    usernameChannelMap.put(username, channel);
    channelUsernameMap.put(channel, username);
    channelAttributesMap.put(channel, new ConcurrentHashMap<>());
    }

    @Override
    public void unbind(Channel channel) {
    String username = channelUsernameMap.remove(channel);
    usernameChannelMap.remove(username);
    channelAttributesMap.remove(channel);
    }

    @Override
    public Object getAttribute(Channel channel, String name) {
    return channelAttributesMap.get(channel).get(name);
    }

    @Override
    public void setAttribute(Channel channel, String name, Object value) {
    channelAttributesMap.get(channel).put(name, value);
    }

    @Override
    public Channel getChannel(String username) {
    return usernameChannelMap.get(username);
    }

    @Override
    public String toString() {
    return usernameChannelMap.toString();
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    public abstract class SessionFactory {

    private static Session session = new SessionMemoryImpl();

    public static Session getSession() {
    return session;
    }
    }

  • 聊天组会话管理接口与实现。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    /**
    * 聊天组会话管理接口
    */
    public interface GroupSession {

    /**
    * 创建一个聊天组, 如果不存在才能创建成功, 否则返回 null
    * @param name 组名
    * @param members 成员
    * @return 成功时返回组对象, 失败返回 null
    */
    Group createGroup(String name, Set<String> members);

    /**
    * 加入聊天组
    * @param name 组名
    * @param member 成员名
    * @return 如果组不存在返回 null, 否则返回组对象
    */
    Group joinMember(String name, String member);

    /**
    * 移除组成员
    * @param name 组名
    * @param member 成员名
    * @return 如果组不存在返回 null, 否则返回组对象
    */
    Group removeMember(String name, String member);

    /**
    * 移除聊天组
    * @param name 组名
    * @return 如果组不存在返回 null, 否则返回组对象
    */
    Group removeGroup(String name);

    /**
    * 获取组成员
    * @param name 组名
    * @return 成员集合, 如果群不存在或没有成员会返回 empty set
    */
    Set<String> getMembers(String name);

    /**
    * 获取组成员的 channel 集合, 只有在线的 channel 才会返回
    * @param name 组名
    * @return 成员 channel 集合
    */
    List<Channel> getMembersChannel(String name);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    public class GroupSessionMemoryImpl implements GroupSession {
    private final Map<String, Group> groupMap = new ConcurrentHashMap<>();

    @Override
    public Group createGroup(String name, Set<String> members) {
    Group group = new Group(name, members);
    return groupMap.putIfAbsent(name, group);
    }

    @Override
    public Group joinMember(String name, String member) {
    return groupMap.computeIfPresent(name, (key, value) -> {
    value.getMembers().add(member);
    return value;
    });
    }

    @Override
    public Group removeMember(String name, String member) {
    return groupMap.computeIfPresent(name, (key, value) -> {
    value.getMembers().remove(member);
    return value;
    });
    }

    @Override
    public Group removeGroup(String name) {
    return groupMap.remove(name);
    }

    @Override
    public Set<String> getMembers(String name) {
    return groupMap.getOrDefault(name, Group.EMPTY_GROUP).getMembers();
    }

    @Override
    public List<Channel> getMembersChannel(String name) {
    return getMembers(name).stream()
    .map(member -> SessionFactory.getSession().getChannel(member))
    .filter(Objects::nonNull)
    .collect(Collectors.toList());
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    public abstract class GroupSessionFactory {

    private static GroupSession session = new GroupSessionMemoryImpl();

    public static GroupSession getGroupSession() {
    return session;
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    /**
    * 聊天组,即聊天室
    */
    public class Group {
    // 聊天室名称
    private String name;
    // 聊天室成员
    private Set<String> members;

    public static final Group EMPTY_GROUP = new Group("empty", Collections.emptySet());

    public Group(String name, Set<String> members) {
    this.name = name;
    this.members = members;
    }
    }

  • 自定义编解码器类。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public class ProtocolFrameDecoder extends LengthFieldBasedFrameDecoder {

    public ProtocolFrameDecoder() {
    this(1024, 12, 4, 0, 0);
    }

    public ProtocolFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
    super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
    }
    }

5.2 聊天室业务-登录

  • 登录请求消息实现类。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    @Data
    @ToString(callSuper = true)
    public class LoginRequestMessage extends Message {
    private String username;
    private String password;

    public LoginRequestMessage() {
    }

    public LoginRequestMessage(String username, String password) {
    this.username = username;
    this.password = password;
    }

    @Override
    public int getMessageType() {
    return LoginRequestMessage;
    }
    }
  • 登录响应消息实现类。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @Data
    @ToString(callSuper = true)
    public abstract class AbstractResponseMessage extends Message {
    private boolean success;
    private String reason;

    public AbstractResponseMessage() {
    }

    public AbstractResponseMessage(boolean success, String reason) {
    this.success = success;
    this.reason = reason;
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Data
    @ToString(callSuper = true)
    public class LoginResponseMessage extends AbstractResponseMessage {

    public LoginResponseMessage(boolean success, String reason) {
    super(success, reason);
    }

    @Override
    public int getMessageType() {
    return LoginResponseMessage;
    }
    }
  • 登录请求处理器:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    @ChannelHandler.Sharable
    public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
    String username = msg.getUsername();
    String password = msg.getPassword();
    boolean login = UserServiceFactory.getUserService().login(username, password);
    LoginResponseMessage message;
    if (login) {
    // 将用户信息保存到服务器,即把用户名和channel做个绑定
    SessionFactory.getSession().bind(ctx.channel(), username);
    message = new LoginResponseMessage(true, "登录成功");
    } else {
    message = new LoginResponseMessage(false, "用户名或密码不正确");
    }
    ctx.writeAndFlush(message);
    }
    }
  • 服务器端:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    @Slf4j
    public class ChatServer {
    public static void main(String[] args) {
    NioEventLoopGroup boss = new NioEventLoopGroup();
    NioEventLoopGroup worker = new NioEventLoopGroup();
    LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
    MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
    // 登录请求处理器
    LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();
    try {
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.channel(NioServerSocketChannel.class);
    serverBootstrap.group(boss, worker);
    serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(new ProtocolFrameDecoder());
    // ch.pipeline().addLast(LOGGING_HANDLER);
    ch.pipeline().addLast(MESSAGE_CODEC);
    ch.pipeline().addLast(LOGIN_HANDLER);
    }
    });
    Channel channel = serverBootstrap.bind(8080).sync().channel();
    channel.closeFuture().sync();
    } catch (InterruptedException e) {
    log.error("server error", e);
    } finally {
    boss.shutdownGracefully();
    worker.shutdownGracefully();
    }
    }
    }
  • 客户端:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    @Slf4j
    public class ChatClient {
    public static void main(String[] args) {
    NioEventLoopGroup group = new NioEventLoopGroup();
    LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
    MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
    Scanner scanner = new Scanner(System.in);
    // 用于NIO线程和system in线程的相互通信
    CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
    // 表示是否登录成功
    AtomicBoolean LOGIN = new AtomicBoolean(false);
    // 表示是否退出
    AtomicBoolean EXIT = new AtomicBoolean(false);
    try {
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.channel(NioSocketChannel.class);
    bootstrap.group(group);
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(new ProtocolFrameDecoder());
    // ch.pipeline().addLast(LOGGING_HANDLER);
    ch.pipeline().addLast(MESSAGE_CODEC);
    ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
    // 在连接建立后触发active事件
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // 负责接收用户在控制台的输入,负责向服务器发送各种消息
    new Thread(() -> {
    System.out.println("请输入用户名:");
    String username = scanner.nextLine();
    if(EXIT.get()){
    return;
    }
    System.out.println("请输入密码:");
    String password = scanner.nextLine();
    if(EXIT.get()){
    return;
    }
    // 构造消息对象
    LoginRequestMessage message = new LoginRequestMessage(username, password);
    System.out.println(message);
    // 发送消息
    ctx.writeAndFlush(message);
    System.out.println("等待后续操作...");
    try {
    System.in.read();
    } catch (IOException e) {
    e.printStackTrace();
    }
    try {
    // 阻塞直至登录成功与否消息返回
    WAIT_FOR_LOGIN.await();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    // 如果登录失败
    if (!LOGIN.get()) {
    ctx.channel().close();
    return;
    }
    while (true) {
    System.out.println("==================================");
    System.out.println("send [username] [content]");
    System.out.println("gsend [group name] [content]");
    System.out.println("gcreate [group name] [m1,m2,m3...]");
    System.out.println("gmembers [group name]");
    System.out.println("gjoin [group name]");
    System.out.println("gquit [group name]");
    System.out.println("quit");
    System.out.println("==================================");
    String command = null;
    try {
    command = scanner.nextLine();
    } catch (Exception e) {
    break;
    }
    if(EXIT.get()){
    return;
    }
    }

    }, "system in").start();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    log.debug("msg: {}", msg);
    if ((msg instanceof LoginResponseMessage)) {
    LoginResponseMessage response = (LoginResponseMessage) msg;
    if (response.isSuccess()) {
    // 如果登录成功
    LOGIN.set(true);
    }
    // 唤醒system in线程
    WAIT_FOR_LOGIN.countDown();
    }
    }
    });
    }
    });
    Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
    channel.closeFuture().sync();
    } catch (Exception e) {
    log.error("client error", e);
    } finally {
    group.shutdownGracefully();
    }
    }
    }
  • 启动服务端再启动客户端,假设输入错误的账号密码,客户端打印如下:

    服务器端打印如下:

  • 假设输入正确的账号密码,客户端打印如下:

    服务器端打印如下:

5.3 聊天室业务-单聊

  • 单聊请求消息实现类。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    @Data
    @ToString(callSuper = true)
    public class ChatRequestMessage extends Message {
    private String content;
    private String to;
    private String from;

    public ChatRequestMessage() {
    }

    public ChatRequestMessage(String from, String to, String content) {
    this.from = from;
    this.to = to;
    this.content = content;
    }

    @Override
    public int getMessageType() {
    return ChatRequestMessage;
    }
    }
  • 单聊响应消息实现类。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    @Data
    @ToString(callSuper = true)
    public class ChatResponseMessage extends AbstractResponseMessage {

    private String from;
    private String content;

    public ChatResponseMessage(boolean success, String reason) {
    super(success, reason);
    }

    public ChatResponseMessage(String from, String content) {
    this.from = from;
    this.content = content;
    }

    @Override
    public int getMessageType() {
    return ChatResponseMessage;
    }
    }
  • 单聊处理器。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @ChannelHandler.Sharable
    public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {
    String to = msg.getTo();
    Channel channel = SessionFactory.getSession().getChannel(to);
    // 对方在线
    if (channel != null) {
    channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));
    }
    // 对方不在线
    else {
    ctx.writeAndFlush(new ChatResponseMessage(false, "对方用户不存在或者不在线"));
    }
    }
    }
  • 服务器端添加单聊处理器。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    @Slf4j
    public class ChatServer {
    public static void main(String[] args) {
    …………
    // 单聊处理器
    ChatRequestMessageHandler CHAT_HANDLER = new ChatRequestMessageHandler();
    …………
    try {
    …………
    serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    …………
    // 添加单聊处理器
    ch.pipeline().addLast(CHAT_HANDLER);
    …………
    }
    });
    …………
    }
    …………
    }
    }
  • 客户端添加单聊处理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    @Slf4j
    public class ChatClient {
    public static void main(String[] args) {
    …………
    try {
    …………
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    …………
    ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
    // 在连接建立后触发active事件
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // 负责接收用户在控制台的输入,负责向服务器发送各种消息
    new Thread(() -> {
    …………
    while (true) {
    …………
    String[] s = command.split(" ");
    switch (s[0]){
    case "send":
    ctx.writeAndFlush(new ChatRequestMessage(username, s[1], s[2]));
    break;
    }
    }
    }, "system in").start();
    }
    …………
    });
    }
    });
    …………
    }
    …………
    }
    }
  • 启动服务端再启动两个客户端并输入正确的账号密码成功登录,此时两个客户端分别打印如下:

  • zhangsan发送给lisi消息:

  • lisi收到消息:

5.4 聊天室业务-群聊

5.4.1 创建群聊

  • 群聊创建请求消息实现类。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @Data
    @ToString(callSuper = true)
    public class GroupCreateRequestMessage extends Message {
    private String groupName;
    private Set<String> members;

    public GroupCreateRequestMessage(String groupName, Set<String> members) {
    this.groupName = groupName;
    this.members = members;
    }

    @Override
    public int getMessageType() {
    return GroupCreateRequestMessage;
    }
    }
  • 群聊创建响应消息实现类。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Data
    @ToString(callSuper = true)
    public class GroupCreateResponseMessage extends AbstractResponseMessage {

    public GroupCreateResponseMessage(boolean success, String reason) {
    super(success, reason);
    }

    @Override
    public int getMessageType() {
    return GroupCreateResponseMessage;
    }
    }
  • 创建群聊处理器。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @ChannelHandler.Sharable
    public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception {
    String groupName = msg.getGroupName();
    Set<String> members = msg.getMembers();
    // 群管理器
    GroupSession groupSession = GroupSessionFactory.getGroupSession();
    Group group = groupSession.createGroup(groupName, members);
    if (group == null) {
    // 发生成功消息
    ctx.writeAndFlush(new GroupCreateResponseMessage(true, groupName + "创建成功"));
    // 发送拉群消息
    List<Channel> channels = groupSession.getMembersChannel(groupName);
    for (Channel channel : channels) {
    channel.writeAndFlush(new GroupCreateResponseMessage(true, "您已被拉入" + groupName));
    }
    } else {
    ctx.writeAndFlush(new GroupCreateResponseMessage(false, groupName + "已经存在"));
    }
    }
    }
  • 服务器端添加创建群聊处理器。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    @Slf4j
    public class ChatServer {
    public static void main(String[] args) {
    …………
    // 创建群聊处理器
    GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER = new GroupCreateRequestMessageHandler();
    …………
    try {
    …………
    serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    …………
    // 添加创建群聊处理器
    ch.pipeline().addLast(GROUP_CREATE_HANDLER);
    …………
    }
    });
    …………
    }
    …………
    }
    }
  • 客户端添加创建群聊处理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    @Slf4j
    public class ChatClient {
    public static void main(String[] args) {
    …………
    try {
    …………
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    …………
    ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
    // 在连接建立后触发active事件
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // 负责接收用户在控制台的输入,负责向服务器发送各种消息
    new Thread(() -> {
    …………
    while (true) {
    …………
    String[] s = command.split(" ");
    switch (s[0]){
    case "gcreate":
    Set<String> set = new HashSet<>(Arrays.asList(s[2].split(",")));
    set.add(username); // 加入自己
    ctx.writeAndFlush(new GroupCreateRequestMessage(s[1], set));
    break;
    }
    }
    }, "system in").start();
    }
    …………
    });
    }
    });
    …………
    }
    …………
    }
    }
  • 先启动服务端,再启动三个客户端并登录完毕,由zhangsan拉入lisi和wangwu进入群聊后三个客户端控制台打印如下:

  • 服务器端打印如下:

5.4.2 发送群聊消息

  • 群聊聊天请求消息实现类。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    @Data
    @ToString(callSuper = true)
    public class GroupChatRequestMessage extends Message {
    private String content;
    private String groupName;
    private String from;

    public GroupChatRequestMessage(String from, String groupName, String content) {
    this.content = content;
    this.groupName = groupName;
    this.from = from;
    }

    @Override
    public int getMessageType() {
    return GroupChatRequestMessage;
    }
    }
  • 群聊聊天响应消息实现类。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    @Data
    @ToString(callSuper = true)
    public class ChatResponseMessage extends AbstractResponseMessage {

    private String from;
    private String content;

    public ChatResponseMessage(boolean success, String reason) {
    super(success, reason);
    }

    public ChatResponseMessage(String from, String content) {
    this.from = from;
    this.content = content;
    }

    @Override
    public int getMessageType() {
    return ChatResponseMessage;
    }
    }
  • 群聊发送消息处理器。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @ChannelHandler.Sharable
    public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception {
    List<Channel> channels = GroupSessionFactory.getGroupSession()
    .getMembersChannel(msg.getGroupName());
    // 向群内所有的成员发消息
    for (Channel channel : channels) {
    channel.writeAndFlush(new GroupChatResponseMessage(msg.getFrom(), msg.getContent()));
    }
    }
    }
  • 服务器端添加创建群聊处理器。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    @Slf4j
    public class ChatServer {
    public static void main(String[] args) {
    …………
    // 往群聊发送消息处理器
    GroupChatRequestMessageHandler GROUP_CHAT_HANDLER = new GroupChatRequestMessageHandler();
    …………
    try {
    …………
    serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    …………
    // 添加群聊发送消息处理器
    ch.pipeline().addLast(GROUP_CHAT_HANDLER);
    …………
    }
    });
    …………
    }
    …………
    }
    }
  • 客户端添加创建群聊处理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    @Slf4j
    public class ChatClient {
    public static void main(String[] args) {
    …………
    try {
    …………
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    …………
    ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
    // 在连接建立后触发active事件
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // 负责接收用户在控制台的输入,负责向服务器发送各种消息
    new Thread(() -> {
    …………
    while (true) {
    …………
    String[] s = command.split(" ");
    switch (s[0]){
    case "gsend":
    ctx.writeAndFlush(new GroupChatRequestMessage(username, s[1], s[2]));
    break;
    }
    }
    }, "system in").start();
    }
    …………
    });
    }
    });
    …………
    }
    …………
    }
    }

5.4.3 加入群聊

  • 加入群聊请求消息实现类。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    @Data
    @ToString(callSuper = true)
    public class GroupJoinRequestMessage extends Message {
    private String groupName;

    private String username;

    public GroupJoinRequestMessage(String username, String groupName) {
    this.groupName = groupName;
    this.username = username;
    }

    @Override
    public int getMessageType() {
    return GroupJoinRequestMessage;
    }
    }
  • 加入群聊响应消息实现类。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Data
    @ToString(callSuper = true)
    public class GroupJoinResponseMessage extends AbstractResponseMessage {

    public GroupJoinResponseMessage(boolean success, String reason) {
    super(success, reason);
    }

    @Override
    public int getMessageType() {
    return GroupJoinResponseMessage;
    }
    }
  • 加入群聊处理器。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @ChannelHandler.Sharable
    public class GroupJoinRequestMessageHandler extends SimpleChannelInboundHandler<GroupJoinRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, GroupJoinRequestMessage msg) throws Exception {
    Group group = GroupSessionFactory.getGroupSession().joinMember(msg.getGroupName(), msg.getUsername());
    if (group != null) {
    ctx.writeAndFlush(new GroupJoinResponseMessage(true, msg.getGroupName() + "群加入成功"));
    } else {
    ctx.writeAndFlush(new GroupJoinResponseMessage(true, msg.getGroupName() + "群不存在"));
    }
    }
    }
  • 服务器端添加加入群聊处理器。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @Slf4j
    public class ChatServer {
    public static void main(String[] args) {
    …………
    // 加群处理器
    GroupJoinRequestMessageHandler GROUP_JOIN_HANDLER = new GroupJoinRequestMessageHandler();
    …………
    try {
    …………
    serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    …………
    ch.pipeline().addLast(GROUP_JOIN_HANDLER);
    …………
    }
    });
    …………
    }
    …………
    }
    }
  • 客户端添加加入群聊处理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    @Slf4j
    public class ChatClient {
    public static void main(String[] args) {
    …………
    try {
    …………
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    …………
    ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
    // 在连接建立后触发active事件
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // 负责接收用户在控制台的输入,负责向服务器发送各种消息
    new Thread(() -> {
    …………
    while (true) {
    …………
    String[] s = command.split(" ");
    switch (s[0]){
    case "gjoin":
    ctx.writeAndFlush(new GroupJoinRequestMessage(username, s[1]));
    break;
    }
    }
    }, "system in").start();
    }
    …………
    });
    }
    });
    …………
    }
    …………
    }
    }

5.4.4 退出群聊

  • 退出群聊请求消息实现类。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    @Data
    @ToString(callSuper = true)
    public class GroupQuitRequestMessage extends Message {
    private String groupName;

    private String username;

    public GroupQuitRequestMessage(String username, String groupName) {
    this.groupName = groupName;
    this.username = username;
    }

    @Override
    public int getMessageType() {
    return GroupQuitRequestMessage;
    }
    }
  • 退出群聊响应消息实现类。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Data
    @ToString(callSuper = true)
    public class GroupQuitResponseMessage extends AbstractResponseMessage {
    public GroupQuitResponseMessage(boolean success, String reason) {
    super(success, reason);
    }

    @Override
    public int getMessageType() {
    return GroupQuitResponseMessage;
    }
    }
  • 退出群聊处理器。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @ChannelHandler.Sharable
    public class GroupQuitRequestMessageHandler extends SimpleChannelInboundHandler<GroupQuitRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, GroupQuitRequestMessage msg) throws Exception {
    Group group = GroupSessionFactory.getGroupSession().removeMember(msg.getGroupName(), msg.getUsername());
    if (group != null) {
    ctx.writeAndFlush(new GroupQuitResponseMessage(true, "已退出群" + msg.getGroupName()));
    } else {
    ctx.writeAndFlush(new GroupQuitResponseMessage(true, msg.getGroupName() + "群不存在"));
    }
    }
    }
  • 服务器端添加退出群聊处理器。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @Slf4j
    public class ChatServer {
    public static void main(String[] args) {
    …………
    // 退群处理器
    GroupQuitRequestMessageHandler GROUP_QUIT_HANDLER = new GroupQuitRequestMessageHandler();
    …………
    try {
    …………
    serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    …………
    ch.pipeline().addLast(GROUP_QUIT_HANDLER);
    …………
    }
    });
    …………
    }
    …………
    }
    }
  • 客户端添加退出群聊处理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    @Slf4j
    public class ChatClient {
    public static void main(String[] args) {
    …………
    try {
    …………
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    …………
    ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
    // 在连接建立后触发active事件
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // 负责接收用户在控制台的输入,负责向服务器发送各种消息
    new Thread(() -> {
    …………
    while (true) {
    …………
    String[] s = command.split(" ");
    switch (s[0]){
    case "gquit":
    ctx.writeAndFlush(new GroupQuitRequestMessage(username, s[1]));
    break;
    }
    }
    }, "system in").start();
    }
    …………
    });
    }
    });
    …………
    }
    …………
    }
    }

5.4.5 获取群员

  • 获取聊员请求消息实现类。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @Data
    @ToString(callSuper = true)
    public class GroupMembersRequestMessage extends Message {
    private String groupName;

    public GroupMembersRequestMessage(String groupName) {
    this.groupName = groupName;
    }

    @Override
    public int getMessageType() {
    return GroupMembersRequestMessage;
    }
    }
  • 获取聊员响应消息实现类。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @Data
    @ToString(callSuper = true)
    public class GroupMembersResponseMessage extends Message {

    private Set<String> members;

    public GroupMembersResponseMessage(Set<String> members) {
    this.members = members;
    }

    @Override
    public int getMessageType() {
    return GroupMembersResponseMessage;
    }
    }
  • 获取群成员信息处理器。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @ChannelHandler.Sharable
    public class GroupMembersRequestMessageHandler extends SimpleChannelInboundHandler<GroupMembersRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, GroupMembersRequestMessage msg) throws Exception {
    Set<String> members = GroupSessionFactory.getGroupSession()
    .getMembers(msg.getGroupName());
    ctx.writeAndFlush(new GroupMembersResponseMessage(members));
    }
    }
  • 服务器端省略。

  • 客户端。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    @Slf4j
    public class ChatClient {
    public static void main(String[] args) {
    …………
    try {
    …………
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    …………
    ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
    // 在连接建立后触发active事件
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // 负责接收用户在控制台的输入,负责向服务器发送各种消息
    new Thread(() -> {
    …………
    while (true) {
    …………
    String[] s = command.split(" ");
    switch (s[0]){
    case "gmembers":
    ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));
    break;
    }
    }
    }, "system in").start();
    }
    …………
    });
    }
    });
    …………
    }
    …………
    }
    }

5.5 聊天室业务-退出

  • 退出登录处理器。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    @Slf4j
    @ChannelHandler.Sharable
    public class QuitHandler extends ChannelInboundHandlerAdapter {
    // 当连接断开时触发inactive事件
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    SessionFactory.getSession().unbind(ctx.channel());
    log.debug("{} 已经断开", ctx.channel());
    }

    // 当出现异常时触发
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    SessionFactory.getSession().unbind(ctx.channel());
    log.debug("{} 已经异常断开 异常是{}", ctx.channel(), cause.getMessage());
    }
    }
  • 服务器端省略。

  • 客户端。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    @Slf4j
    public class ChatClient {
    public static void main(String[] args) {
    …………
    try {
    …………
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    …………
    ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
    // 在连接建立后触发active事件
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // 负责接收用户在控制台的输入,负责向服务器发送各种消息
    new Thread(() -> {
    …………
    while (true) {
    …………
    String[] s = command.split(" ");
    switch (s[0]){
    case "quit":
    ctx.channel().close();
    return;
    }
    }
    }, "system in").start();
    }
    …………
    });
    }
    });
    …………
    }
    …………
    }
    }

5.6 聊天室业务-空闲检测

  • 在网络编程中可能会出现连接假死的现象,可能的原因如下:

    • 网络设备出现故障,例如网卡,机房等,底层的TCP连接已经断开了,但应用程序没有感知到,仍然占用着资源。(TCP连接只是主机运输层维护的状态,如果物理层断开,运输层是无法感知的,也就是说物理层不维护TCP连接)
    • 公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,就这么一直耗着。
    • 应用程序线程阻塞,无法进行数据读写。
  • 会导致如下问题:

    • 假死的连接占用的资源不能自动释放。
    • 向假死的连接发送数据,得到的反馈是发送超时。
  • 服务器端解决:怎么判断客户端连接是否假死呢?如果能收到客户端数据,说明没有假死。因此策略就可以定为,每隔一段时间就检查这段时间内是否接收到客户端数据,没有就可以判定为连接假死。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    ch.pipeline().addLast(MESSAGE_CODEC);

    // 用来判断是不是读空闲时间过长,或写空闲时间过长
    // 5s内如果没有收到channel的数据,会触发一个IdleState#READER_IDLE事件
    ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
    // ChannelDuplexHandler可以同时作为入站和出站处理器
    ch.pipeline().addLast(new ChannelDuplexHandler() {
    // 用来触发特殊事件
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
    IdleStateEvent event = (IdleStateEvent) evt;
    // 触发了读空闲事件
    if (event.state() == IdleState.READER_IDLE) {
    log.debug("已经5s没有读到数据了");
    ctx.channel().close();
    }
    }
    });

    ch.pipeline().addLast(LOGIN_HANDLER);
  • 客户端定时心跳:客户端可以定时向服务器端发送数据,只要这个时间间隔小于服务器定义的空闲检测的时间间隔,那么就能防止前面提到的误判,客户端可以定义如下心跳处理器。

    1
    2
    3
    4
    5
    6
    public class PingMessage extends Message {
    @Override
    public int getMessageType() {
    return PingMessage;
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    ch.pipeline().addLast(MESSAGE_CODEC);

    // 用来判断是不是读空闲时间过长,或写空闲时间过长
    // 3s内如果没有向服务器写数据,会触发一个IdleState#WRITER_IDLE事件
    ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
    // ChannelDuplexHandler可以同时作为入站和出站处理器
    ch.pipeline().addLast(new ChannelDuplexHandler() {
    // 用来触发特殊事件
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
    IdleStateEvent event = (IdleStateEvent) evt;
    // 触发了写空闲事件
    if (event.state() == IdleState.WRITER_IDLE) {
    log.debug("3s没有写数据了,发送一个心跳包");
    ctx.writeAndFlush(new PingMessage());
    }
    }
    });