高性能架构的Netty
1、Netty概述
原生NIO存在的问题
- NIO的类库和API繁杂,使用麻烦:需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
- 需要具备其他的额外技能:要熟悉Java多线程编程,因为NIO编程涉及到Reactor模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的NIO程序。
- 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。
- JDK NIO的Bug:例如臭名昭著的Epoll Bug,它会导致Selector空轮询,最终导致CPU100%。直到 JDK1.7版本该问题仍旧存在,没有被根本解决。
Netty介绍
Netty是由JBOSS提供的一个Java开源框架。Netty提供异步(采用多线程,基于多路复用)的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络IO程序。
Netty可以帮助你快速、简单的开发出一个网络应用,相当于简化和流程化了NIO的开发过程。
Netty是目前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,知名的Elasticsearch、Dubbo框架内部都采用了Netty。
其架构如下图:
Netty优点
:Netty对JDK自带的NIO的API进行了封装,解决了下述问题。- 将一些基础的网络协议定义好直接使用。
- 解决TCP传输问题,如粘包、半包。
- 解决了epoll空轮询导致CPU100%的问题。
- 对API进行增强,使之更易用,如FastThreadLocal=>ThreadLocal,ByteBuf=>ByteBuffer。
- 设计优雅:适用于各种传输类型的统一API阻塞和非阻塞Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型-单线程,一个或多个线程池。
- 使用方便:详细记录的Javadoc,用户指南和示例;没有其他依赖项,JDK5(Netty3.x)或 6(Netty4.x)就足够了。
- 高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。
- 安全:完整的SSL/TLS和StartTLS支持。
- 社区活跃、不断更新:社区活跃,版本迭代周期短,发现的Bug可以被及时修复,同时,更 多的新功能会被加入。
Netty在Java网络应用框架中的地位就好比:Spring框架在JavaEE开发中的地位。以下的框架都使用了 Netty,因为它们有网络通信需求!
- Cassandra - nosql数据库
- Spark - 大数据分布式计算框架
- Hadoop - 大数据分布式存储框架
- RocketMQ - ali开源的消息队列
- ElasticSearch - 搜索引擎
- gRPC - rpc框架
- Dubbo - rpc框架
- Spring 5.x - flux api完全抛弃了tomcat,使用netty作为服务器端
- Zookeeper - 分布式协调框架
最简单的netty客户端和服务端程序:
客户端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public class HelloClient {
public static void main(String[] args) throws InterruptedException {
// 1、启动类
new Bootstrap()
// 2、添加EventLoop
.group(new NioEventLoopGroup())
// 3、选择客户端channel实现
.channel(NioSocketChannel.class)
// 4、添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
// 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
// 5、连接到服务器
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel()
// 6、向服务器发送数据
.writeAndFlush("hello, world");
}
}服务端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28public class HelloServer {
public static void main(String[] args) {
// 1、启动器,负责组装netty组件,启动服务器
new ServerBootstrap()
// 2、NioEventLoopGroup包含BossEventLoop和WorkerEventLoop(selector,thread), 即为group组
.group(new NioEventLoopGroup())
// 3、选择服务器的ServerSocketChannel实现
.channel(NioServerSocketChannel.class)
// 4、boss负责处理连接,worker(child)负责处理读写,决定了worker(child)能执行哪些操作(handler)
.childHandler(
// 5、channel代表和客户端进行数据读写的通道,Initializer为初始化器,ChannelInitializer其实也是个特殊的handler,负责添加别的handler
new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) throws Exception {
// 6、添加具体handler
ch.pipeline().addLast(new StringDecoder()); // 将ByteBuf转换为字符串
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { // 自定义handler
// 读事件
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg); // 打印上一步转换好的字符串
}
});
}
})
// 7、绑定监听端口
.bind(8080);
}
}流程解析如下图:
- 把channel理解为数据的通道。
- 把msg理解为流动的数据,最开始输入是ByteBuf,但经过pipeline的加工,会变成其它类型对象,最后输出又变成ByteBuf。
- 把handler理解为数据的处理工序。
- 工序有多道,合在一起就是pipeline,pipeline负责发布事件(读、读取完成…)传播给每个handler,handler对自己感兴趣的事件进行处理(重写了相应事件处理方法)。
- handler分Inbound和Outbound两类。
- 把eventLoop(内部使用单线程的线程池实现)理解为处理数据的工人。
- 工人可以管理多个channel的io操作,并且一旦工人负责了某个channel,就要负责到底(绑定)。
- 工人既可以执行io操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个channel的待处理任务,任务分为普通任务、定时任务。
- 工人按照pipeline顺序,依次按照handler的规划(代码)处理数据,可以为每道工序指定不同的工人。
2、Netty组件
2.1 EventLoop
2.1.1 事件循环对象与组
事件循环对象
- EventLoop(事件循环对象)本质是一个单线程执行器(同时维护了一个Selector),里面有run方法处理Channel上源源不断的io事件。它的继承关系比较复杂:
- 一条线是继承自j.u.c.ScheduledExecutorService,因此包含了线程池中所有的方法。
- 另一条线是继承自netty自己的OrderedEventExecutor。
- 提供了boolean inEventLoop(Thread thread)方法判断一个线程是否属于此EventLoop。
- 提供了parent方法来看看自己属于哪个EventLoopGroup。
事件循环组
EventLoopGroup是一组EventLoop,Channel一般会调用EventLoopGroup的register方法来绑定其中一个EventLoop,后续这个Channel上的io事件都由此EventLoop来处理(保证了io事件处理时的线程安全)。它继承自netty自己的EventExecutorGroup。
- 实现了Iterable接口提供遍历EventLoop的能力。
- 另有next方法获取集合中下一个EventLoop。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class TestEventLoop {
public static void main(String[] args) throws InterruptedException {
// 1、创建事件循环组,默认的线程数是当前系统CPU核心数*2
EventLoopGroup group = new NioEventLoopGroup(2); // 能处理io事件,普通任务或者定时任务
// EventLoopGroup group = new DefaultEventLoopGroup(); // 只能处理普通任务,定时任务
// 2、获取下一个事件循环对象
System.out.println(group.next());// io.netty.channel.nio.NioEventLoop@2ef5e5e3
System.out.println(group.next());// io.netty.channel.nio.NioEventLoop@36d4b5c
System.out.println(group.next());// io.netty.channel.nio.NioEventLoop@2ef5e5e3
System.out.println(group.next());// io.netty.channel.nio.NioEventLoop@36d4b5c
// 3、执行普通任务
group.next().execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("测试执行普通任务ok");
});
Thread.sleep(2000);
// 4、执行定时任务
group.next().scheduleAtFixedRate(() -> {
log.debug("测试执行定时任务ok");
}, 0, 1, TimeUnit.SECONDS);
}
}
2.1.2 NioEventLoop处理IO事件
客户端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup(1))
.handler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.channel(NioSocketChannel.class).connect("localhost", 8080)
.sync()
.channel();
channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("1".getBytes()));
Thread.sleep(2000);
channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("2".getBytes()));
}
}服务端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class EventLoopServer {
public static void main(String[] args) {
// 细分2:创建一个独立的EventLoopGroup,用来处理耗时较长的操作
EventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
// 划分为boss和worker
// 细分1:boss只负责ServerSocketChannel上的accept事件,worker只负责socketChannel上的读写
// NioServerSocketChannel只会跟一个NioEventLoop绑定
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
// ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
ctx.fireChannelRead(msg); // 让消息传递给下一个handler
}
}).addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
// ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
}先开启服务端,再开启三个客户端后,服务器端打印如下:
- 一个NioEventLoop可以管理多个SocketChannel,但一个SocketChannel一旦绑定了一个NioEventLoop,那么接下来在此SocketChannel上的读写都由这个NioEventLoop来处理。
那么handler在执行中是如何换人的呢?在handler的执行过程中会调用以下方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 下一个handler的事件循环是否与当前的事件循环是同一个线程
EventExecutor executor = next.executor();// 返回下一个handler的eventLoop
// 判断当前handler中的线程是否和下一个handler中的eventLoop是同个线程,如果是则直接调用
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
}
// 如果不是则将要执行的代码作为任务提交给下一个事件循环处理(换人)
else {
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}- 如果两个handler绑定的是同一个线程,那么就直接调用。
- 否则,把要调用的代码封装为一个任务对象,由下一个handler的线程来调用。
2.2 Channel
2.2.1 Channel简介
channel的主要作用:
- close()可以用来关闭channel。
- closeFuture()用来处理channel的关闭。
- sync方法作用是同步等待channel关闭。
- 而addListener方法是异步等待channel关闭。
- pipeline()方法添加处理器。
- write()方法将数据写入。
- writeAndFlush()方法将数据写入并刷出。
2.2.2 ChannelFuture
一段客户端代码拆开后如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public class NettyClient {
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
// main线程发起调用,真正执行的是NioEventLoop线程,是异步非阻塞的
.connect("127.0.0.1", 8080); // 1
//channelFuture.sync(); 阻塞当前线程直至nio线程连接建立完毕
Channel channel = channelFuture.channel();
channel.writeAndFlush("hello world!");
}
}将channelFuture.sync()注释后,发现客户端无法向服务端发送数据,原因是connect方法是异步非阻塞的,意味着不等连接建立,方法执行就返回了。因此channelFuture对象中不能立刻获得到正确的Channel对象。实验如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080);
log.debug("{}",channelFuture.channel()); // 1
channelFuture.sync(); // 2
log.debug("{}",channelFuture.channel()); // 3
}
}执行到1时,连接未建立,打印
[id: 0x7e782185]
。执行到2时,sync方法是同步等待连接建立完成。
执行到3时,连接肯定建立了,打印
[id: 0x7e782185, L:/127.0.0.1:59164 - R:/127.0.0.1:8080]
。
除了用sync方法可以让异步操作同步以外,还可以使用回调的方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080); // 1
channelFuture.addListener(new ChannelFutureListener() {
// 在nio线程连接建立好后就会调用此方法
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = future.channel();
log.debug("{}", channel);
channel.writeAndFlush("hello world!");
}
});
}
}
2.2.3 CloseFuture
可以使用CloseFuture实现在连接关闭后做一些善后操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class CloseFutureClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
// 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
Channel channel = channelFuture.sync().channel();
log.debug("{}", channel);
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close(); // close是异步操作
// log.debug("处理关闭之后的操作"); // 不能在这里善后
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
// 获取CloseFuture对象
ChannelFuture closeFuture = channel.closeFuture();
// 同步处理关闭
log.debug("waiting close...");
closeFuture.sync();
log.debug("处理关闭之后的操作");
}
}还可以使用异步处理的方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class CloseFutureClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
// 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
Channel channel = channelFuture.sync().channel();
log.debug("{}", channel);
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close(); // close是异步操作
// log.debug("处理关闭之后的操作"); // 不能在这里善后
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
// 获取CloseFuture对
ChannelFuture closeFuture = channel.closeFuture();
// 异步处理关闭
closeFuture.addListener((ChannelFutureListener) future -> {
log.debug("处理关闭之后的操作");
// 使用shutdownGracefully方法优雅关闭线程。该方法会首先切换EventLoopGroup到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行,从而确保整体应用是在正常有序的状态下退出的。
group.shutdownGracefully();
});
}
}
2.3 Future与Promise
在异步处理时,经常用到这两个接口:Future和Promise。首先要说明netty中的Future与jdk中的Future同名,但是是两个接口,netty的Future继承自jdk的Future,而Promise又对netty Future进行了扩展。
功能/名称 jdk Future netty Future Promise cancel 取消任务 - - isCanceled 任务是否取消 - - isDone 任务是否完成,不能区分成功失败 - - get 获取任务结果,阻塞等待 - - getNow - 获取任务结果,非阻塞,还未产生结果时返回 null - await - 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 - sync - 等待任务结束,如果任务失败,抛出异常 - isSuccess - 判断任务是否成功 - cause - 获取失败信息,非阻塞,如果没有失败,返回null - addLinstener - 添加回调,异步接收结果 - setSuccess - - 设置成功结果 setFailure - - 设置失败结果
2.3.1 jdk Future
jdk Future只能同步等待任务结束(或成功、或失败)才能得到结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class TestJdkFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建线程池
ExecutorService service = Executors.newFixedThreadPool(2);
// 提交任务
Future<Integer> future = service.submit(new Callable<Integer>() {
public Integer call() throws Exception {
log.debug("执行计算");
Thread.sleep(1000);
return 50;
}
});
// 主线程通过future来获取结果
log.debug("等待结果");
log.debug("结果是 {}", future.get());
}
}
2.3.2 netty Future
netty Future可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束。
同步等待任务结束得到结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class TestNettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
// 每个EventLoop中只有一个线程
EventLoop eventLoop = group.next();
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
public Integer call() throws Exception {
log.debug("执行计算");
Thread.sleep(1000);
return 70;
}
});
log.debug("等待结果");
log.debug("结果是 {}", future.get());
}
}异步方式得到结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class TestNettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
// 每个EventLoop中只有一个线程
EventLoop eventLoop = group.next();
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
public Integer call() throws Exception {
log.debug("执行计算");
Thread.sleep(1000);
return 70;
}
});
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
public void operationComplete(Future<? super Integer> future) throws Exception {
log.debug("接收结果:{}", future.getNow());
}
});
}
}
2.3.3 netty Promise
netty Promise不仅有netty Future的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class TestNettyPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 准备EventLoop对象
EventLoop eventLoop = new NioEventLoopGroup().next();
// 可以主动创建promise, 结果容器
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
new Thread(() -> {
// 创建任意一个线程执行计算,计算完毕后向promise填充结果
log.debug("开始计算...");
try {
int i = 1 / 0;
Thread.sleep(1000);
promise.setSuccess(80);
} catch (Exception e) {
e.printStackTrace();
promise.setFailure(e);
}
}).start();
// 接收结果的线程
log.debug("等待结果...");
log.debug("结果是: {}", promise.get());
}
}
2.4 Handler与Pipeline
ChannelHandler用来处理Channel上的各种事件,分为入站、出站两种。所有ChannelHandler被连成一串,就是Pipeline。打个比喻,每个Channel是一个产品的加工车间,Pipeline是车间中的流水线,ChannelHandler就是流水线上的各道工序,而后面要讲的ByteBuf是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品。
入站处理器通常是ChannelInboundHandlerAdapter的子类,主要用来读取客户端数据,写回结果。
出站处理器通常是ChannelOutboundHandlerAdapter的子类,主要对写回结果进行加工。
客户端使用之前的CloseFutureClient,服务端代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class TestPipeline {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) throws Exception {
// 1、通过channel拿到pipeline
ChannelPipeline pipeline = ch.pipeline();
// 2、添加处理器head -> h1 -> h2 -> h3 -> h4 -> h5 -> h6 -> tail
pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
});
pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
super.channelRead(ctx, msg); // 将数据传递给下个handler,如果不调用,调用链会断开 或者调用ctx.fireChannelRead(student);
}
});
pipeline.addLast("h3", new ChannelInboundHandlerAdapter(){
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("3");
// 因为h3后没有入站处理器了,所以可以不调用此方法
super.channelRead(ctx, msg);
// 为了触发OutboundHandler的执行
ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
}
});
pipeline.addLast("h4", new ChannelOutboundHandlerAdapter(){
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h5", new ChannelOutboundHandlerAdapter(){
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("5");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h6", new ChannelOutboundHandlerAdapter(){
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("6");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
}
}
客户端往服务端发消息后,服务端控制台打印如下:
可以看到,ChannelInboundHandlerAdapter是按照addLast的顺序执行的,而 ChannelOutboundHandlerAdapter是按照addLast的逆序执行的。实际上ChannelPipeline的实现是一个ChannelHandlerContext(包装了ChannelHandler) 组成的双向链表。
演示msg在各个入站处理器中的互传。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class TestPipeline {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) throws Exception {
// 1、通过channel拿到pipeline
ChannelPipeline pipeline = ch.pipeline();
// 2、添加处理器head -> h1 -> h2 -> h3 -> h4 -> h5 -> h6 -> tail
pipeline.addLast("h1", new ChannelInboundHandlerAdapter() {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
ByteBuf byteBuf = (ByteBuf) msg;
String name = byteBuf.toString(Charset.defaultCharset());
super.channelRead(ctx, name);
}
});
pipeline.addLast("h2", new ChannelInboundHandlerAdapter() {
public void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {
log.debug("2");
Student student = new Student(name.toString());
super.channelRead(ctx, student); // 将数据传递给下个handler,如果不调用,调用链会断开。或者调用ctx.fireChannelRead(student);
}
});
pipeline.addLast("h3", new ChannelInboundHandlerAdapter() {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("3,结果是:{},类型是:{}", msg, msg.getClass());
// super.channelRead(ctx, msg);由于没有下一个入站处理器,所以可以不写
ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
}
});
pipeline.addLast("h4", new ChannelOutboundHandlerAdapter() {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h5", new ChannelOutboundHandlerAdapter() {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("5");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("h6", new ChannelOutboundHandlerAdapter() {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("6");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
}
static class Student {
private String name;
}
}如果注释了handler2中的super.channelRead(ctx, student),则服务端只会打印1和2,因为无法传给handler3,也就无法触发写流程逻辑;如果将handler3中的ch.writeAndFlush()换成ctx.writeAndFlush()方法,则只会打印1,2,3,而其余三个出站处理器6,5,4不会打印,因为ctx.writeAndFlush()是从当前节点找上一个出站处理器,而ch.writeAndFlush()是从尾部开始查找。
为了快速测试netty的入站和出站操作,netty提供了便捷的EmbeddedChannel类来模拟入站和出站操作。
模拟入站操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class TestEmbeddedChannel {
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("3");
super.write(ctx, msg, promise);
}
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
};
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
// 模拟入站操作
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
}
}模拟出站操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class TestEmbeddedChannel {
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("3");
super.write(ctx, msg, promise);
}
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
};
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
// 模拟出站操作
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes()));
}
}
2.5 ByteBuf
ByteBuf是对字节数据的封装,它具有以下优势:
- 池化,可以重用池中ByteBuf实例,更节约内存,减少内存溢出的可能。
- 读写指针分离,不需要像ByteBuffer一样切换读写模式。
- 可以自动扩容。
- 支持链式调用,使用更流畅。
- 很多地方体现零拷贝,例如slice、duplicate、CompositeByteBuf。
2.5.1 创建
创建一个默认的ByteBuf(池化基于直接内存的ByteBuf),初始容量是10。
1
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public class TestByteBuf {
public static void main(String[] args) {
// 初始容量默认为256
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
log(buf);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 32; i++) {
sb.append("a");
}
buf.writeBytes(sb.toString().getBytes());
log(buf);
}
public static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
}
2.5.2 直接内存与堆内存
可以使用下面的代码来创建池化基于堆的ByteBuf。(读写性能低,分配效率高)
1
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
也可以使用下面的代码来创建池化基于直接内存的ByteBuf。(读写性能高,分配效率低)
1
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
- 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用。
- 直接内存对GC压力小,因为这部分内存不受JVM垃圾回收的管理,但也要注意及时主动释放。
2.5.3 池化与非池化
池化的最大意义在于可以重用ByteBuf,优点有:
- 没有池化,则每次都得创建新的ByteBuf实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加GC压力。
- 有了池化,则可以重用池中ByteBuf实例,并且采用了与jemalloc类似的内存分配算法提升分配效率。
- 高并发时,池化功能更节约内存,减少内存溢出的可能。
池化功能是否开启,可以通过下面的系统环境变量来设置。
1
-Dio.netty.allocator.type={unpooled|pooled}
4.1以后,非Android平台默认启用池化实现,Android平台启用非池化实现。
4.1之前,池化功能还不成熟,默认是非池化实现。
1
2
3
4
5
6
7
8
9public class TestByteBuf {
public static void main(String[] args) {
// 初始容量默认为256
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
System.out.println(buf.getClass());
buf = ByteBufAllocator.DEFAULT.heapBuffer();
System.out.println(buf.getClass());
}
}
2.5.4 组成
ByteBuf由四部分组成(最开始读写指针都在0位置):
- 与NIO中的ByteBuffer区别如下:
- ByteBuffer共用一个读写指针,因此需要在读写模式下切换,而ByteBuf分出了读写指针,十分便利。
- ByteBuffer没有自动扩容机制,而ByteBuf有。
- 与NIO中的ByteBuffer区别如下:
2.5.5 写入
方法列表如下(还有一类方法是set开头的一系列方法,也可以写入数据,但不会改变写指针位置):
方法签名 含义 备注 writeBoolean(boolean value) 写入 boolean 值 用一字节 01|00 代表 true|false writeByte(int value) 写入 byte 值 writeShort(int value) 写入 short 值 writeInt(int value) 写入 int 值 Big Endian,即 0x250,写入后 00 00 02 50 writeIntLE(int value) 写入 int 值 Little Endian,即 0x250,写入后 50 02 00 00 writeLong(long value) 写入 long 值 writeChar(int value) 写入 char 值 writeFloat(float value) 写入 float 值 writeDouble(double value) 写入 double 值 writeBytes(ByteBuf src) 写入 netty 的 ByteBuf writeBytes(byte[] src) 写入 byte[] writeBytes(ByteBuffer src) 写入 nio 的 ByteBuffer int writeCharSequence(CharSequence sequence, Charset charset) 写入字符串 这些方法的未指明返回值的,其返回值都是ByteBuf,意味着可以链式调用。
网络传输,默认习惯是Big Endian。
1 | public class TestByteBuf { |
2.5.6 扩容
扩容规则是:
- 如果写入后数据大小未超过512,则选择下一个16的整数倍,例如写入后大小为12,则扩容后capacity是16。
- 如果写入后数据大小超过512,则选择下一个2^n,例如写入后大小为513,则扩容后capacity是2^10=1024(2^9=512已经不够了)。
- 扩容不能超过max capacity,否则会报错。
1
2
3
4
5
6
7
8
9
10
11
12
13public class TestByteBuf {
public static void main(String[] args) {
// 初始容量默认为256
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
buf.writeBytes(new byte[]{1, 2, 3, 4});
log(buf);
buf.writeInt(5);
log(buf);
// 触发扩容
buf.writeInt(6);
log(buf);
}
}
2.5.7 读取
例如读了4次,每次一个字节,那么读过的内容,就属于废弃部分了,再读只能读那些尚未读取的部分。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public class TestByteBuf {
public static void main(String[] args) {
// 初始容量默认为256
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
buf.writeBytes(new byte[]{1, 2, 3, 4});
log(buf);
buf.writeInt(5);
log(buf);
buf.writeInt(6);
log(buf);
System.out.println(buf.readByte());
System.out.println(buf.readByte());
System.out.println(buf.readByte());
System.out.println(buf.readByte());
log(buf);
}
}如果需要重复读取int整数5,可以使用标记mark。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class TestByteBuf {
public static void main(String[] args) {
// 初始容量默认为256
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
buf.writeBytes(new byte[]{1, 2, 3, 4});
log(buf);
buf.writeInt(5);
log(buf);
buf.writeInt(6);
log(buf);
System.out.println(buf.readByte());
System.out.println(buf.readByte());
System.out.println(buf.readByte());
System.out.println(buf.readByte());
log(buf);
buf.markReaderIndex();
System.out.println(buf.readInt());
log(buf);
}
}这时要重复读取的话,重置到标记位置reset。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class TestByteBuf {
public static void main(String[] args) {
// 初始容量默认为256
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
buf.writeBytes(new byte[]{1, 2, 3, 4});
log(buf);
buf.writeInt(5);
log(buf);
buf.writeInt(6);
log(buf);
System.out.println(buf.readByte());
System.out.println(buf.readByte());
System.out.println(buf.readByte());
System.out.println(buf.readByte());
log(buf);
buf.markReaderIndex();
System.out.println(buf.readInt());
log(buf);
buf.resetReaderIndex();
log(buf);
}
}还有get()开头的一系列方法,这些方法不会改变read index。
2.5.8 内存释放
由于Netty中有堆外内存的ByteBuf实现,堆外内存最好是手动来释放,而不是等GC垃圾回收。
- UnpooledHeapByteBuf使用的是JVM内存,只需等GC回收内存即可。
- UnpooledDirectByteBuf使用的就是直接内存了,需要特殊的方法来回收内存。
- PooledByteBuf和它的子类使用了池化机制,需要更复杂的规则来回收内存。
Netty这里采用了引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口。
- 每个ByteBuf对象的初始计数为1。
- 调用release方法计数减1,如果计数为0,ByteBuf内存被回收。
- 调用retain方法计数加1,表示调用者没用完之前,其它handler即使调用了release也不会造成回收。
- 当计数为0时,底层内存会被回收,这时即使ByteBuf对象还在,其各个方法均无法正常使用。
不能简单地在handler里使用try catch finally来释放,因为pipeline的存在,一般需要将ByteBuf传递给下一个ChannelHandler,如果在finally中release了,就失去了传递性(当然,如果在这个ChannelHandler内这个ByteBuf已完成了它的使命,那么便无须再传递),所以基本规则是,谁是最后使用者,谁负责release,详细分析如下:
起点,对于NIO实现来讲,在io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read方法中首次创建ByteBuf放入pipeline(line 163 pipeline.fireChannelRead(byteBuf))。
入站ByteBuf处理原则。
对原始ByteBuf不做处理,调用ctx.fireChannelRead(msg)向后传递,这时无须release。
将原始ByteBuf转换为其它类型的Java对象,这时ByteBuf就没用了,必须release。
如果不调用ctx.fireChannelRead(msg)向后传递,那么也必须release。
注意各种异常,如果ByteBuf没有成功传递到下一个 ChannelHandler,必须release。
假设消息一直向后传,那么TailContext会负责释放未处理消息(原始的ByteBuf)。
TailContext释放未处理消息逻辑:
出站ByteBuf处理原则。
出站消息最终都会转为ByteBuf输出,一直向前传,由HeadContext flush后release。
异常处理原则。
- 有时候不清楚ByteBuf被引用了多少次,但又必须彻底释放,可以循环调用release直到返回true。
2.5.9 slice
slice是零拷贝(这里指减少数据复制)的体现之一,对原始ByteBuf进行切片成多个ByteBuf,切片后的ByteBuf并没有发生内存复制,还是使用原始ByteBuf的内存,切片后的ByteBuf维护独立的read,write指针。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public class TestSlice {
public static void main(String[] args) {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
buf.writeBytes(new byte[]{'a','b','c','d','e','f','g','h','i','j'});
log(buf);
// 在切片过程中,没有发生数据复制
ByteBuf f1 = buf.slice(0, 5);
ByteBuf f2 = buf.slice(5, 5);
log(f1);
log(f2);
System.out.println("====================");
f1.setByte(0, 'b');
log(f1);
log(buf);
}
}切片后如果追加数据会报错。
1
2
3
4
5
6
7
8
9
10
11
12
13public class TestSlice {
public static void main(String[] args) {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
buf.writeBytes(new byte[]{'a','b','c','d','e','f','g','h','i','j'});
log(buf);
// 在切片过程中,没有发生数据复制
ByteBuf f1 = buf.slice(0, 5);
ByteBuf f2 = buf.slice(5, 5);
log(f1);
log(f2);
f1.writeByte(5);
}
}如果切片后释放了原ByteBuf的内存,再使用切片后的ByteBuf也会报错,因为底层都是同一块内存,可以使用f1.retain()使引用计数加1。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public class TestSlice {
public static void main(String[] args) {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
buf.writeBytes(new byte[]{'a','b','c','d','e','f','g','h','i','j'});
log(buf);
// 在切片过程中,没有发生数据复制
ByteBuf f1 = buf.slice(0, 5);
ByteBuf f2 = buf.slice(5, 5);
log(f1);
log(f2);
System.out.println("释放原有byteBuf内存");
buf.release();
log(f1);
}
}
2.5.10 duplicate
duplicate是零拷贝(这里指减少数据复制)的体现之一,就好比截取了原始ByteBuf所有内容,并且没有max capacity的限制,也是与原始ByteBuf使用同一块底层内存,只是读写指针是独立的。
2.5.11 copy
- 会将底层内存数据进行深拷贝,因此无论读写,都与原始ByteBuf无关。
2.5.12 CompositeByteBuf
也是零拷贝(这里指减少数据复制)的体现之一,可以将多个ByteBuf合并为一个逻辑上的 ByteBuf,避免拷贝。
CompositeByteBuf是一个组合的ByteBuf,它内部维护了一个Component数组,每个Component管理一个ByteBuf,记录了这个ByteBuf相对于整体偏移量等信息,代表着整体中某一段的数据。
- 优点,对外是一个虚拟视图,组合这些ByteBuf不会产生内存复制。
- 缺点,复杂了很多,多次操作会带来性能的损耗。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public class TestCompositeByteBuf {
public static void main(String[] args) {
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
// 以下方式会发生数据复制,对性能造成影响
// ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// buffer.writeBytes(buf1).writeBytes(buf2);
// log(buffer);
CompositeByteBuf buffer = ByteBufAllocator.DEFAULT.compositeBuffer();
// 为true表示自动增长写指针
buffer.addComponents(true, buf1, buf2);
log(buffer);
}
}
2.5.13 Unpooled
Unpooled是一个工具类,类如其名,提供了非池化的ByteBuf创建、组合、复制等操作。它有着和零拷贝相关的wrappedBuffer方法,可以用来包装ByteBuf。
1
2
3
4
5
6
7
8
9
10
11
12public class TestUnpooled {
public static void main(String[] args) {
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
// 当包装ByteBuf个数超过一个时, 底层使用了CompositeByteBuf
ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));
}
}也可以用来包装普通字节数组,底层也不会有拷贝操作。
1
2
3
4
5
6
7public class TestUnpooled {
public static void main(String[] args) {
ByteBuf buf4 = Unpooled.wrappedBuffer(new byte[]{1, 2, 3}, new byte[]{4, 5, 6});
System.out.println(buf4.getClass());
System.out.println(ByteBufUtil.prettyHexDump(buf4));
}
}
3、Netty架构
3.1 线程模型
- 目前存在的线程模型有:传统阻塞I/O服务模型和Reactor模式。
- 根据Reactor的数量和处理资源池线程的数量不同,有3种典型的实现。
- 单Reactor单线程。
- 单Reactor多线程。
- 主从Reactor多线程。
- Netty线程模式(Netty主要基于主从Reactor多线程模型做了一定的改进,其中主从Reactor多线程模型有多个Reactor)
3.1.1 传统阻塞I/O服务模型
工作原理图如下:
- 采用阻塞IO模式获取输入的数据。
- 每个连接都需要独立的线程完成数据的输入,业务处理,数据返回。
- 当并发数很大,就会创建大量的线程,占用很大系统资源。
- 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在read操作,造成线程资源浪费。
3.1.2 Reactor模式
针对传统阻塞I/0服务模型的2个缺点,解决方案如下:
基于I/O复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。Reactor对应的叫法:
- 反应器模式。
- 分发者模式(Dispatcher)
- 通知者模式(Notifier)
基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。
- Reactor模式,通过一个或多个输入同时传递给服务处理器(ServiceHandler)的模式(基于事件驱动)
- 服务器端程序处理传入的多个请求,并将它们同步分派到相应的处理线程,因此Reactor模式也叫Dispatcher模式。
- Reactor模式使用IO复用监听事件,收到事件后,分发给某个线程(进程),这点就是网络服务器高并发处理关键。
Reactor模式中核心组成:
- Reactor:Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对IO事件做出反应。它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人。
- Handlers:处理程序执行I/O事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor通过调度适当的处理程序来响应I/O事件,处理程序执行非阻塞操作。
Reactor模式具有如下的优点:
- 响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的。
- 可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销。
- 扩展性好,可以方便的通过增加Reactor实例个数来充分利用CPU资源。
- 复用性好,Reactor模型本身与具体事件处理逻辑无关,具有很高的复用性。
3.1.2.1 单Reactor单线程
工作原理示意图:
- select是前面I/O复用模型介绍的标准网络编程API,可以实现应用程序通过一个阻塞对象监听多路连接请求。
- Reactor对象通过Select监控客户端请求事件,收到事件后通过Dispatch进行分发。
- 如果是建立连接请求事件,则由Acceptor通过Accept处理连接请求,然后创建一个Handler对象处理连接完成后的后续业务处理。
- 如果不是建立连接事件,则Reactor会分发调用连接对应的Handler来响应。
- Handler会完成Read→业务处理→Send的完整业务流程。
- 服务器端用一个线程通过多路复用搞定所有的IO操作(包括连接,读、写等),编码简单,清晰明了,但是如果客户端连接数量较多,将无法支撑,前面的NIO案例就属于这种模型。
优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成。
缺点:
- 性能问题,只有一个线程,无法完全发挥多核CPU的性能。Handler在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈。
- 可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。
使用场景:客户端的数量有限,业务处理非常快速,比如Redis在业务处理的时间复杂度O(1)的情况。
3.1.2.2 单Reactor多线程
工作原理示意图:
- Reactor对象通过Select监控客户端请求事件,收到事件后,通过Dispatch进行分发。
- 如果建立连接请求,则由Acceptor通过accept处理连接请求,然后创建一个Handler对象处理完成连接后的各种事件。
- 如果不是连接请求,则由Reactor分发调用连接对应的handler来处理。
- handler只负责响应事件,不做具体的业务处理,通过read读取数据后,会分发给后面的worker线程池的某个线程处理业务。
- worker线程池会分配独立线程完成真正的业务,并将结果返回给handler。
- handler收到响应后,通过send将结果返回给client。
优点:可以充分的利用多核cpu的处理能力。
缺点:多线程数据共享和访问比较复杂,Reactor处理所有的事件的监听和响应,在单线程运行,在高并发场景容易出现性能瓶颈。
3.1.2.3 主从Reactor多线程
工作原理示意图:
针对单Reactor多线程模型中,Reactor在单线程中运行,高并发场景下容易成为性能瓶颈,可以让Reactor在多线程中运行。
- Reactor主线程的MainReactor对象通过select监听连接事件,收到事件后,通过Acceptor处理连接事件。
- 当Acceptor处理连接事件后,MainReactor将连接分配给SubReactor。
- SubReactor将连接加入到连接队列进行监听,并创建handler进行各种事件处理。
- 当有新事件发生时,SubReactor就会调用对应的handler处理。
- handler通过read读取数据,分发给后面的worker线程处理。
- worker线程池分配独立的worker线程进行业务处理,并返回结果。
- handler收到响应的结果后,再通过send将结果返回给client。
- Reactor主线程可以对应多个Reactor子线程,即MainRecator可以关联多个SubReactor。
优点:
- 父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。
- 父线程与子线程的数据交互简单,Reactor主线程只需要把新连接传给子线程,子线程无需返回数据。
缺点:编程复杂度较高。
这种模型在许多项目中广泛使用,包括Nginx主从Reactor多进程模型,Memcached主从多线程,Netty主从多线程模型的支持。
3.2 Netty模型
Netty主要基于主从Reactors多线程模型(如图)做了一定的改进,其中主从Reactor多线程模型有多个Reactor。
- BossGroup线程维护Selector,只关注Accept。
- 当接收到Accept事件,获取到对应的SocketChannel,封装成NIOScoketChannel并注册到Worker线程(事件循环),并进行维护。
- 当Worker线程监听到Selector中通道发生自己感兴趣的事件后,就进行处理(就由handler),注意handler已经加入到通道。
进阶图示:
详细图示:
- Netty抽象出两组线程池:BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写。
- BossGroup和WorkerGroup类型都是NioEventLoopGroup。
- NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是NioEventLoop。
- NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个Selector,用于监听绑定在其上的socket的网络通讯。
- NioEventLoopGroup可以有多个线程,即可以含有多个NioEventLoop。
- BossGroup的每个NioEventLoop循环执行的步骤有3步。
- 轮询accept事件。
- 处理accept事件,与client建立连接,生成NioScocketChannel,并将其注册到WorkerGroup的某个NIOEventLoop上的Selector。
- 处理任务队列的任务,即runAllTasks。
- WorkerGroup的每个NIOEventLoop循环执行的步骤:
- 轮询read,write事件。
- 处理I/O事件,即read,write事件,在对应NioScocketChannel处理。
- 处理任务队列的任务,即runAllTasks。
- WorkerGroup的每个NIOEventLoop处理业务时,会使用pipeline(管道),pipeline中包含了channel,即通过pipeline可以获取到对应通道,管道中维护了很多的处理器。
- 注意:
- NioEventLoopGroup下包含多个NioEventLoop。
- 每个NioEventLoop中包含有一个Selector,一个taskQueue。
- 每个NioEventLoop的Selector上可以注册监听多个NioChannel。
- 每个NioChannel只会绑定在唯一的NioEventLoop上。
- 每个NioChannel都绑定有一个自己的ChannelPipeline。
Netty实例:
Netty服务器在6668端口监听,客户端能发送消息给服务器”hello,服务器~”。
服务器可以回复消息给客户端”hello,客户端~”。
服务器端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80public class NettyServer {
public static void main(String[] args) throws Exception {
//1. 创建两个线程组 bossGroup 和 workerGroup
//2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成
//3. 两个都是无限循环
//4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数默认是 cpu逻辑核数 * 2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来进行设置
bootstrap.group(bossGroup, workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
//给pipeline 设置处理器
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("客户socketChannel hashcode=" + ch.hashCode()); //可以使用一个集合管理 SocketChannel, 在推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue
ch.pipeline().addLast(new NettyServerHandler());
}
}); // 给workerGroup 的 EventLoop 对应的管道设置处理器
System.out.println("服务器 is ready...");
//绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
//启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(6668).sync();
//对关闭通道进行监听
cf.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
/**
* 我们自定义一个Handler需要继承netty规定好的某个HandlerAdapter(规范)
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//读取数据事件(这里我们可以读取客户端发送的消息)
/**
* @param ctx 上下文对象, 含有管道pipeline、通道channel、地址
* @param msg 就是客户端发送的数据,默认是Object
* @throws Exception 异常
*/
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channel =" + ctx.channel());
System.out.println("server ctx =" + ctx);
System.out.println("看看channel 和 pipeline的关系");
Channel channel = ctx.channel();
ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站
//将 msg 转成一个 ByteBuf
//ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:" + channel.remoteAddress());
}
//数据读取完毕
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush 是 write + flush
//将数据写入到缓存,并刷新
//一般讲,我们对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~", CharsetUtil.UTF_8));
}
//处理异常, 一般是需要关闭通道
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客户端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器
}
});
System.out.println("客户端 ok..");
//启动客户端去连接服务器端
//关于 ChannelFuture 要分析,涉及到netty的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//当通道就绪就会触发该方法
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server", CharsetUtil.UTF_8));
}
//当通道有读取事件时,会触发
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址: "+ ctx.channel().remoteAddress());
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
3.2.1 任务队列
NioEventLoop中的TaskQueue有3种典型使用场景:
用户程序自定义的普通任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//比如这里我们有一个非常耗时长的业务-> 异步执行 -> 提交该channel 对应的
//NIOEventLoop 的 taskQueue中,
//解决方案1 用户程序自定义的普通任务
ctx.channel().eventLoop().execute(new Runnable() {
public void run() {
try {
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));
System.out.println("channel code=" + ctx.channel().hashCode());
} catch (Exception ex) {
System.out.println("发生异常" + ex.getMessage());
}
}
});
System.out.println("go on ...");
}
//数据读取完毕
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush 是 write + flush
//将数据写入到缓存,并刷新
//一般讲,我们对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
}
//处理异常, 一般是需要关闭通道
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
用户自定义定时任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//解决方案2 : 用户自定义定时任务 -》 该任务是提交到 scheduleTaskQueue中
ctx.channel().eventLoop().schedule(new Runnable() {
public void run() {
try {
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵4", CharsetUtil.UTF_8));
System.out.println("channel code=" + ctx.channel().hashCode());
} catch (Exception ex) {
System.out.println("发生异常" + ex.getMessage());
}
}
}, 5, TimeUnit.SECONDS);
System.out.println("go on ...");
}
//数据读取完毕
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush 是 write + flush
//将数据写入到缓存,并刷新
//一般讲,我们对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
}
//处理异常, 一般是需要关闭通道
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}非当前Reactor线程调用Channel的各种方法,例如在推送系统的业务线程里面,根据用户的标识,找到对应的Channel引用,然后调用Write类方法向该用户推送消息,就会进入到这种场景。最终的Write会提交到任务队列中后被异步消费。
3.2.2 异步模型
异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。
Netty中的I/O操作是异步的,包括Bind、Write、Connect等操作会简单的返回一个ChannelFuture。
调用者并不能立刻获得结果,而是通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。
当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。
常见有如下操作:
通过isDone方法来判断当前操作是否完成。
通过isSuccess方法来判断已完成的当前操作是否成功。
通过getCause方法来获取已完成的当前操作失败的原因。
通过isCancelled方法来判断已完成的当前操作是否被取消。
通过addListener方法来注册监听器,当操作已完成(isDone方法返回完成),将会通知指定的监听器;如果Future对象已完成,则通知指定的监听器。
例如给前面服务器端可以在绑定端口后进行监听:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15//绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
//启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(6668).sync();
//给cf 注册监听器,监控我们关心的事件
cf.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口 6668 成功");
} else {
System.out.println("监听端口 6668 失败");
}
}
});
Netty的异步模型是建立在future和callback之上的。callback就是回调。重点说Future,它的核心思想是:假设一个方法fun,计算过程可能非常耗时,等待fun返回显然不合适。那么可以在调用fun的时候,立马返回一个Future,后续可以通过Future去监控方法fun的处理过程(即:Future-Listener机制)
Future说明:
- 表示异步的执行结果,可以通过它提供的方法来检测执行是否完成,比如检索计算等等。
- ChannelFuture是一个接口:public interface ChannelFuture extends Future,我们可以添加监听器,当监听的事件发生时,就会通知到监听器。
工作原理示意图:
- 在使用Netty进行编程时,拦截操作和转换出入站数据只需要您提供callback或利用 future 即可。这使得链式操作简单、高效, 并有利于编写可重用的、通用的代码。
- Netty框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来、解脱出来。
- 相比传统阻塞I/O,执行I/O操作后线程会被阻塞住, 直到操作完成;异步处理的好处是不会造成线程阻塞,线程在I/O操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量。
4、Netty进阶
4.1 粘包与半包现象
粘包现象。
客户端。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class HelloClient {
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
log.debug("connected...");
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 连接建立成功后会触发此事件
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
for (int i = 0; i < 10; i++) {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
}
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}服务端。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class HelloServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 连接建立成功后会触发此事件
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("connected {}", ctx.channel());
super.channelActive(ctx);
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.debug("disconnect {}", ctx.channel());
super.channelInactive(ctx);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
log.debug("{} binding...", channelFuture.channel());
channelFuture.sync();
log.debug("{} bound...", channelFuture.channel());
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
log.debug("stoped");
}
}
}服务器端的某次输出,可以看到一次就接收了160个字节,而非分10次接收,即为粘包现象。原因:
- 应用层:接收方ByteBuf设置太大(Netty 默认1024)。
- 滑动窗口:假设发送方256 bytes表示一个完整报文,但由于接收方处理不及时且窗口大小足够大,这256bytes字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会粘包。
- Nagle算法:会造成粘包。
半包现象。
客户端希望发送1个消息,这个消息是160字节,代码改为:
1
2
3
4
5ByteBuf buffer = ctx.alloc().buffer();
for (int i = 0; i < 10; i++) {
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
}
ctx.writeAndFlush(buffer);为现象明显,服务端修改一下接收缓冲区,其它代码不变。
1
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
服务器端的某次输出,可以看到接收的消息被分为两节,第一次20字节,第二次140字节。serverBootstrap.option(ChannelOption.SO_RCVBUF, 10)影响的底层接收缓冲区(即滑动窗口)大小,仅决定了netty读取的最小单位,netty实际每次读取的一般是它的整数倍。原因:
- 应用层:接收方ByteBuf小于实际发送数据量。
- 滑动窗口:假设接收方的窗口只剩了128bytes,发送方的报文大小是256bytes,这时放不下了,只能先发送前128bytes,等待ack后才能发送剩余部分,这就造成了半包。
- MSS限制:当发送的数据超过MSS限制后,会将数据切分发送,就会造成半包。
4.2 解决方案
4.2.1 短链接
客户端发送完整数据后,立即关闭连接。即发一个包建立一次连接,这样连接建立到连接断开之间就是消息的边界,缺点效率太低。以
解决粘包
为例:客户端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class HelloClient {
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
send();
}
}
private static void send() {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
log.debug("connected...");
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 连接建立成功后会触发此事件
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
// 发完即关
ctx.channel().close();
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}服务端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class HelloServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
log.debug("stoped");
}
}
}
半包用这种办法还是不好解决
,因为接收方的缓冲区大小是有限的。客户端在上方基础上多传输两个字节的数据,服务端修改netty接收缓冲区的大小,使得客户端一次性发送的数据大于此缓冲区,服务端代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class HelloServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
// 调整系统的接收缓冲区大小(滑动窗口)
// serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
// 调整netty的接收缓冲区(ByteBuf)
serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16, 16, 16));
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
log.debug("stoped");
}
}
}
4.2.2 固定长度
每一条消息采用固定长度,缺点是浪费空间。下面假设客户端每次发送10个字节的完整数据,服务端添加个解码器FixedLengthFrameDecoder,并固定长度为10,这样服务端就会每10个字节截取一次。
客户端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54public class Client {
static final Logger log = LoggerFactory.getLogger(Client.class);
public static void main(String[] args) {
send();
System.out.println("finish");
}
public static byte[] fill10Bytes(char c, int len) {
byte[] bytes = new byte[10];
Arrays.fill(bytes, (byte) '_');
for (int i = 0; i < len; i++) {
bytes[i] = (byte) c;
}
System.out.println(new String(bytes));
return bytes;
}
private static void send() {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 会在连接channel建立成功后,会触发active事件
public void channelActive(ChannelHandlerContext ctx) {
ByteBuf buf = ctx.alloc().buffer();
char c = '0';
Random r = new Random();
for (int i = 0; i < 10; i++) {
byte[] bytes = fill10Bytes(c, r.nextInt(10) + 1);
c++;
buf.writeBytes(bytes);
}
ctx.writeAndFlush(buf);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}服务端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class Server {
void start() {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
// 调整系统的接收缓冲区大小(滑动窗口)
// serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
// 调整netty的接收缓冲区(ByteBuf)
serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16, 16, 16));
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
// FixedLengthFrameDecoder需要放在LoggingHandler前面
ch.pipeline().addLast(new FixedLengthFrameDecoder(10));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
log.debug("stoped");
}
}
public static void main(String[] args) {
new Server().start();
}
}
1
2
3
4
5
6
{% asset_img 1638709147178.png 1638709147178 %}
{% asset_img 1638709176649.png 1638709176649 %}
可以看出即使客户端出现了粘包现象,但是服务端能够正确读取数据。
4.2.3 固定分隔符
每一条消息采用分隔符,例如\n,缺点是需要转义。
客户端在每条消息之后,加入\n分隔符。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53public class Client {
static final Logger log = LoggerFactory.getLogger(Client.class);
public static void main(String[] args) {
send();
System.out.println("finish");
}
public static StringBuilder makeString(char c, int len) {
StringBuilder sb = new StringBuilder(len + 2);
for (int i = 0; i < len; i++) {
sb.append(c);
}
sb.append("\n");
return sb;
}
private static void send() {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 会在连接channel建立成功后,会触发active事件
public void channelActive(ChannelHandlerContext ctx) {
ByteBuf buf = ctx.alloc().buffer();
char c = '0';
Random r = new Random();
for (int i = 0; i < 10; i++) {
StringBuilder sb = makeString(c, r.nextInt(256) + 1);
c++;
buf.writeBytes(sb.toString().getBytes());
}
ctx.writeAndFlush(buf);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}服务端加入解码器LineBasedFrameDecoder,即以\n或\r\n作为分隔符,如果超出指定长度仍未出现分隔符,则抛出异常。
除此之外还可以使用自定义分隔符解码器DelimiterBasedFrameDecoder:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Server {
void start() {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
// 调整系统的接收缓冲区大小(滑动窗口)
// serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
// 调整netty的接收缓冲区(ByteBuf)
serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16, 16, 16));
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
log.debug("stoped");
}
}
public static void main(String[] args) {
new Server().start();
}
}
4.2.4 预设长度
每一条消息分为head和body,head中包含body的长度。即在发送消息前,先约定用定长字节表示接下来数据的长度。
客户端在发送消息前,先约定用定长字节表示接下来数据的长度。这里使用到解码器LengthFieldBasedFrameDecoder。
这里使用EmbeddedChannel进行测试。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29public class TestLengthFieldDecoder {
public static void main(String[] args) {
EmbeddedChannel channel = new EmbeddedChannel(
/**
* int maxFrameLength,
* int lengthFieldOffset,
* int lengthFieldLength,
* int lengthAdjustment,
* int initialBytesToStrip
*/
new LengthFieldBasedFrameDecoder(
1024, 0, 4, 0,0),
new LoggingHandler(LogLevel.DEBUG)
);
// 客户端发4个字节的内容长度
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
send(buffer, "Hello, world");
send(buffer, "Hi!");
channel.writeInbound(buffer);
}
private static void send(ByteBuf buffer, String content) {
byte[] bytes = content.getBytes(); // 实际内容
int length = bytes.length; // 实际内容长度
buffer.writeInt(length);
buffer.writeBytes(bytes);
}
}如果要剥离头部,则方法调用改为:
1
new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)
如果在消息长度和内容间加了个字节代表版本号,即:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30public class TestLengthFieldDecoder {
public static void main(String[] args) {
EmbeddedChannel channel = new EmbeddedChannel(
/**
* int maxFrameLength,
* int lengthFieldOffset,
* int lengthFieldLength,
* int lengthAdjustment,
* int initialBytesToStrip
*/
new LengthFieldBasedFrameDecoder(
1024, 0, 4, 1,0),
new LoggingHandler(LogLevel.DEBUG)
);
// 客户端发4个字节的内容长度
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
send(buffer, "Hello, world");
send(buffer, "Hi!");
channel.writeInbound(buffer);
}
private static void send(ByteBuf buffer, String content) {
byte[] bytes = content.getBytes(); // 实际内容
int length = bytes.length; // 实际内容长度
buffer.writeInt(length);
buffer.writeByte(1); // 代表版本1
buffer.writeBytes(bytes);
}
}
4.3 协议设计与解析
4.3.1 Redis协议
1 |
|
先启动redis,再启动此客户端后,控制台打印如下:
发现key=name,value=zhangsan的数据成功插入redis,并且redis返回”ok”。
4.3.2 Http协议
netty已经帮我们封装好专门处理HTTP请求的编解码器HttpServerCodec。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class TestHttp {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new HttpServerCodec());
// 只关注HttpRequest类型的消息
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
// 获取请求
log.debug(msg.uri());
// 返回响应
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
byte[] bytes = "<h1>Hello, world!</h1>".getBytes();
response.headers().setInt(CONTENT_LENGTH, bytes.length);
response.content().writeBytes(bytes);
// 写回响应
ctx.writeAndFlush(response);
}
});
/*ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("{}", msg.getClass());
if (msg instanceof HttpRequest) { // 请求行,请求头
} else if (msg instanceof HttpContent) { //请求体
}
}
});*/
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}运行此服务端程序后,浏览器访问8080端口,服务端控制台显示如下:
浏览器显示如下:
4.3.3 自定义协议
- 自定义协议需要考虑的要素如下:
- 魔数,用来在第一时间判定是否是无效数据包。
- 版本号,可以支持协议的升级。
- 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk。
- 指令类型,是登录、注册、单聊、群聊等跟业务相关。
- 请求序号,为了双工通信,提供异步能力。
- 正文长度。
- 消息正文。
根据上面的要素,设计一个登录请求消息和登录响应消息,并使用Netty完成收发。
自定义编解码器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class MessageCodec extends ByteToMessageCodec<Message> {
public void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
// 1、4个字节的魔数
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2、1个字节的版本
out.writeByte(1);
// 3、1个字节的序列化方式,0代表jdk, 1代表json
out.writeByte(0);
// 4、1个字节的指令类型
out.writeByte(msg.getMessageType());
// 5、4个字节的请求序号
out.writeInt(msg.getSequenceId());
// 无意义,对齐填充
out.writeByte(0xff);
// 6、获取内容的字节数组
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 7、4个字节的正文长度
out.writeInt(bytes.length);
// 8、写入正文内容
out.writeBytes(bytes);
}
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", message);
// 将解析好的对象加入集合中给下个handler调用
out.add(message);
}
}测试代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class TestMessageCodec {
public static void main(String[] args) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(
new LoggingHandler(),
// 解决粘包半包问题
new LengthFieldBasedFrameDecoder(
1024, 12, 4, 0, 0),
new MessageCodec()
);
// encode
LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123");
channel.writeOutbound(message);
// decode
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
// 将message的内容填充到buf中,后面将buf解码
new MessageCodec().encode(null, message, buf);
channel.writeInbound(buf);
}
}测试结果分析如下:
验证LengthFieldBasedFrameDecoder解决了半包问题:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class TestMessageCodec {
public static void main(String[] args) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(
new LoggingHandler(),
// 解决粘包半包问题
new LengthFieldBasedFrameDecoder(
1024, 12, 4, 0, 0),
new MessageCodec()
);
LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123");
// decode
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
// 将message的内容填充到buf中,后面将buf解码
new MessageCodec().encode(null, message, buf);
ByteBuf s1 = buf.slice(0, 100);
ByteBuf s2 = buf.slice(100, buf.readableBytes() - 100);
s1.retain(); // 引用计数加一,即1+1=2
channel.writeInbound(s1); // writeInbound执行后会调用release方法释放内存,即2-1=1
channel.writeInbound(s2);
}
}如果不加LengthFieldBasedFrameDecoder则会报错如下:
handler共享
当handler不保存状态时,就可以安全地在多线程下被共享。
netty将可以共享的handler上会作个标记,即加了@Sharable注解,例如LoggingHandler。
但要注意对于编解码器类,不能继承ByteToMessageCodec或CombinedChannelDuplexHandler父类,他们的构造方法对@Sharable有限制。所以上面我们自定义的MessageCodec不能被共享,因为其继承了ByteToMessageCodec类。
所以为了让MessageCodec能够共享(确保编解码器不会保存状态),我们可以继承另一个父类MessageToMessageCodec。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
.Sharable
/**
* 必须和LengthFieldBasedFrameDecoder一起使用,确保接到的ByteBuf消息是完整的
*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
ByteBuf out = ctx.alloc().buffer();
// 1、4个字节的魔数
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2、1个字节的版本
out.writeByte(1);
// 3、1个字节的序列化方式,0代表jdk, 1代表json
out.writeByte(0);
// 4、1个字节的指令类型
out.writeByte(msg.getMessageType());
// 5、4个字节的请求序号
out.writeInt(msg.getSequenceId());
// 无意义,对齐填充
out.writeByte(0xff);
// 6、获取内容的字节数组
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 7、4个字节的正文长度
out.writeInt(bytes.length);
// 8、写入正文内容
out.writeBytes(bytes);
outList.add(out);
}
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", message);
// 将解析好的对象加入集合中给下个handler调用
out.add(message);
}
}
5、聊天室案例
5.1 聊天室业务介绍
消息类型抽象父类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public abstract class Message implements Serializable {
/**
* 根据消息类型字节,获得对应的消息 class
* @param messageType 消息类型字节
* @return 消息 class
*/
public static Class<? extends Message> getMessageClass(int messageType) {
return messageClasses.get(messageType);
}
private int sequenceId;
private int messageType;
public abstract int getMessageType();
public static final int LoginRequestMessage = 0;
public static final int LoginResponseMessage = 1;
public static final int ChatRequestMessage = 2;
public static final int ChatResponseMessage = 3;
public static final int GroupCreateRequestMessage = 4;
public static final int GroupCreateResponseMessage = 5;
public static final int GroupJoinRequestMessage = 6;
public static final int GroupJoinResponseMessage = 7;
public static final int GroupQuitRequestMessage = 8;
public static final int GroupQuitResponseMessage = 9;
public static final int GroupChatRequestMessage = 10;
public static final int GroupChatResponseMessage = 11;
public static final int GroupMembersRequestMessage = 12;
public static final int GroupMembersResponseMessage = 13;
public static final int PingMessage = 14;
public static final int PongMessage = 15;
/**
* 请求类型 byte 值
*/
public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
/**
* 响应类型 byte 值
*/
public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();
static {
messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);
messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
}
}用户管理接口与实现类。
1
2
3
4
5
6
7
8
9
10
11
12
13/**
* 用户管理接口
*/
public interface UserService {
/**
* 登录
* @param username 用户名
* @param password 密码
* @return 登录成功返回 true, 否则返回 false
*/
boolean login(String username, String password);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class UserServiceMemoryImpl implements UserService {
private Map<String, String> allUserMap = new ConcurrentHashMap<>();
{
allUserMap.put("zhangsan", "123");
allUserMap.put("lisi", "123");
allUserMap.put("wangwu", "123");
allUserMap.put("zhaoliu", "123");
allUserMap.put("qianqi", "123");
}
public boolean login(String username, String password) {
String pass = allUserMap.get(username);
if (pass == null) {
return false;
}
return pass.equals(password);
}
}1
2
3
4
5
6
7
8public abstract class UserServiceFactory {
private static UserService userService = new UserServiceMemoryImpl();
public static UserService getUserService() {
return userService;
}
}会话管理接口与实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41/**
* 会话管理接口
*/
public interface Session {
/**
* 绑定会话
* @param channel 哪个 channel 要绑定会话
* @param username 会话绑定用户
*/
void bind(Channel channel, String username);
/**
* 解绑会话
* @param channel 哪个 channel 要解绑会话
*/
void unbind(Channel channel);
/**
* 获取属性
* @param channel 哪个 channel
* @param name 属性名
* @return 属性值
*/
Object getAttribute(Channel channel, String name);
/**
* 设置属性
* @param channel 哪个 channel
* @param name 属性名
* @param value 属性值
*/
void setAttribute(Channel channel, String name, Object value);
/**
* 根据用户名获取 channel
* @param username 用户名
* @return channel
*/
Channel getChannel(String username);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40public class SessionMemoryImpl implements Session {
private final Map<String, Channel> usernameChannelMap = new ConcurrentHashMap<>();
private final Map<Channel, String> channelUsernameMap = new ConcurrentHashMap<>();
private final Map<Channel,Map<String,Object>> channelAttributesMap = new ConcurrentHashMap<>();
public void bind(Channel channel, String username) {
usernameChannelMap.put(username, channel);
channelUsernameMap.put(channel, username);
channelAttributesMap.put(channel, new ConcurrentHashMap<>());
}
public void unbind(Channel channel) {
String username = channelUsernameMap.remove(channel);
usernameChannelMap.remove(username);
channelAttributesMap.remove(channel);
}
public Object getAttribute(Channel channel, String name) {
return channelAttributesMap.get(channel).get(name);
}
public void setAttribute(Channel channel, String name, Object value) {
channelAttributesMap.get(channel).put(name, value);
}
public Channel getChannel(String username) {
return usernameChannelMap.get(username);
}
public String toString() {
return usernameChannelMap.toString();
}
}1
2
3
4
5
6
7
8
9public abstract class SessionFactory {
private static Session session = new SessionMemoryImpl();
public static Session getSession() {
return session;
}
}聊天组会话管理接口与实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50/**
* 聊天组会话管理接口
*/
public interface GroupSession {
/**
* 创建一个聊天组, 如果不存在才能创建成功, 否则返回 null
* @param name 组名
* @param members 成员
* @return 成功时返回组对象, 失败返回 null
*/
Group createGroup(String name, Set<String> members);
/**
* 加入聊天组
* @param name 组名
* @param member 成员名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group joinMember(String name, String member);
/**
* 移除组成员
* @param name 组名
* @param member 成员名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group removeMember(String name, String member);
/**
* 移除聊天组
* @param name 组名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group removeGroup(String name);
/**
* 获取组成员
* @param name 组名
* @return 成员集合, 如果群不存在或没有成员会返回 empty set
*/
Set<String> getMembers(String name);
/**
* 获取组成员的 channel 集合, 只有在线的 channel 才会返回
* @param name 组名
* @return 成员 channel 集合
*/
List<Channel> getMembersChannel(String name);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43public class GroupSessionMemoryImpl implements GroupSession {
private final Map<String, Group> groupMap = new ConcurrentHashMap<>();
public Group createGroup(String name, Set<String> members) {
Group group = new Group(name, members);
return groupMap.putIfAbsent(name, group);
}
public Group joinMember(String name, String member) {
return groupMap.computeIfPresent(name, (key, value) -> {
value.getMembers().add(member);
return value;
});
}
public Group removeMember(String name, String member) {
return groupMap.computeIfPresent(name, (key, value) -> {
value.getMembers().remove(member);
return value;
});
}
public Group removeGroup(String name) {
return groupMap.remove(name);
}
public Set<String> getMembers(String name) {
return groupMap.getOrDefault(name, Group.EMPTY_GROUP).getMembers();
}
public List<Channel> getMembersChannel(String name) {
return getMembers(name).stream()
.map(member -> SessionFactory.getSession().getChannel(member))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
}1
2
3
4
5
6
7
8public abstract class GroupSessionFactory {
private static GroupSession session = new GroupSessionMemoryImpl();
public static GroupSession getGroupSession() {
return session;
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17/**
* 聊天组,即聊天室
*/
public class Group {
// 聊天室名称
private String name;
// 聊天室成员
private Set<String> members;
public static final Group EMPTY_GROUP = new Group("empty", Collections.emptySet());
public Group(String name, Set<String> members) {
this.name = name;
this.members = members;
}
}自定义编解码器类。
1
2
3
4
5
6
7
8
9
10public class ProtocolFrameDecoder extends LengthFieldBasedFrameDecoder {
public ProtocolFrameDecoder() {
this(1024, 12, 4, 0, 0);
}
public ProtocolFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
}
5.2 聊天室业务-登录
登录请求消息实现类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class LoginRequestMessage extends Message {
private String username;
private String password;
public LoginRequestMessage() {
}
public LoginRequestMessage(String username, String password) {
this.username = username;
this.password = password;
}
public int getMessageType() {
return LoginRequestMessage;
}
}登录响应消息实现类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public abstract class AbstractResponseMessage extends Message {
private boolean success;
private String reason;
public AbstractResponseMessage() {
}
public AbstractResponseMessage(boolean success, String reason) {
this.success = success;
this.reason = reason;
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
public class LoginResponseMessage extends AbstractResponseMessage {
public LoginResponseMessage(boolean success, String reason) {
super(success, reason);
}
public int getMessageType() {
return LoginResponseMessage;
}
}登录请求处理器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18.Sharable
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
String username = msg.getUsername();
String password = msg.getPassword();
boolean login = UserServiceFactory.getUserService().login(username, password);
LoginResponseMessage message;
if (login) {
// 将用户信息保存到服务器,即把用户名和channel做个绑定
SessionFactory.getSession().bind(ctx.channel(), username);
message = new LoginResponseMessage(true, "登录成功");
} else {
message = new LoginResponseMessage(false, "用户名或密码不正确");
}
ctx.writeAndFlush(message);
}
}服务器端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ChatServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
// 登录请求处理器
LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder());
// ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(LOGIN_HANDLER);
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}客户端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
public class ChatClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
Scanner scanner = new Scanner(System.in);
// 用于NIO线程和system in线程的相互通信
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
// 表示是否登录成功
AtomicBoolean LOGIN = new AtomicBoolean(false);
// 表示是否退出
AtomicBoolean EXIT = new AtomicBoolean(false);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder());
// ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
// 在连接建立后触发active事件
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 负责接收用户在控制台的输入,负责向服务器发送各种消息
new Thread(() -> {
System.out.println("请输入用户名:");
String username = scanner.nextLine();
if(EXIT.get()){
return;
}
System.out.println("请输入密码:");
String password = scanner.nextLine();
if(EXIT.get()){
return;
}
// 构造消息对象
LoginRequestMessage message = new LoginRequestMessage(username, password);
System.out.println(message);
// 发送消息
ctx.writeAndFlush(message);
System.out.println("等待后续操作...");
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
try {
// 阻塞直至登录成功与否消息返回
WAIT_FOR_LOGIN.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 如果登录失败
if (!LOGIN.get()) {
ctx.channel().close();
return;
}
while (true) {
System.out.println("==================================");
System.out.println("send [username] [content]");
System.out.println("gsend [group name] [content]");
System.out.println("gcreate [group name] [m1,m2,m3...]");
System.out.println("gmembers [group name]");
System.out.println("gjoin [group name]");
System.out.println("gquit [group name]");
System.out.println("quit");
System.out.println("==================================");
String command = null;
try {
command = scanner.nextLine();
} catch (Exception e) {
break;
}
if(EXIT.get()){
return;
}
}
}, "system in").start();
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("msg: {}", msg);
if ((msg instanceof LoginResponseMessage)) {
LoginResponseMessage response = (LoginResponseMessage) msg;
if (response.isSuccess()) {
// 如果登录成功
LOGIN.set(true);
}
// 唤醒system in线程
WAIT_FOR_LOGIN.countDown();
}
}
});
}
});
Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
channel.closeFuture().sync();
} catch (Exception e) {
log.error("client error", e);
} finally {
group.shutdownGracefully();
}
}
}启动服务端再启动客户端,假设输入错误的账号密码,客户端打印如下:
服务器端打印如下:
假设输入正确的账号密码,客户端打印如下:
服务器端打印如下:
5.3 聊天室业务-单聊
单聊请求消息实现类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ChatRequestMessage extends Message {
private String content;
private String to;
private String from;
public ChatRequestMessage() {
}
public ChatRequestMessage(String from, String to, String content) {
this.from = from;
this.to = to;
this.content = content;
}
public int getMessageType() {
return ChatRequestMessage;
}
}单聊响应消息实现类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ChatResponseMessage extends AbstractResponseMessage {
private String from;
private String content;
public ChatResponseMessage(boolean success, String reason) {
super(success, reason);
}
public ChatResponseMessage(String from, String content) {
this.from = from;
this.content = content;
}
public int getMessageType() {
return ChatResponseMessage;
}
}单聊处理器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
protected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {
String to = msg.getTo();
Channel channel = SessionFactory.getSession().getChannel(to);
// 对方在线
if (channel != null) {
channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));
}
// 对方不在线
else {
ctx.writeAndFlush(new ChatResponseMessage(false, "对方用户不存在或者不在线"));
}
}
}服务器端添加单聊处理器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ChatServer {
public static void main(String[] args) {
…………
// 单聊处理器
ChatRequestMessageHandler CHAT_HANDLER = new ChatRequestMessageHandler();
…………
try {
…………
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
…………
// 添加单聊处理器
ch.pipeline().addLast(CHAT_HANDLER);
…………
}
});
…………
}
…………
}
}客户端添加单聊处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ChatClient {
public static void main(String[] args) {
…………
try {
…………
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
…………
ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
// 在连接建立后触发active事件
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 负责接收用户在控制台的输入,负责向服务器发送各种消息
new Thread(() -> {
…………
while (true) {
…………
String[] s = command.split(" ");
switch (s[0]){
case "send":
ctx.writeAndFlush(new ChatRequestMessage(username, s[1], s[2]));
break;
}
}
}, "system in").start();
}
…………
});
}
});
…………
}
…………
}
}启动服务端再启动两个客户端并输入正确的账号密码成功登录,此时两个客户端分别打印如下:
zhangsan发送给lisi消息:
lisi收到消息:
5.4 聊天室业务-群聊
5.4.1 创建群聊
群聊创建请求消息实现类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class GroupCreateRequestMessage extends Message {
private String groupName;
private Set<String> members;
public GroupCreateRequestMessage(String groupName, Set<String> members) {
this.groupName = groupName;
this.members = members;
}
public int getMessageType() {
return GroupCreateRequestMessage;
}
}群聊创建响应消息实现类。
1
2
3
4
5
6
7
8
9
10
11
12
13
public class GroupCreateResponseMessage extends AbstractResponseMessage {
public GroupCreateResponseMessage(boolean success, String reason) {
super(success, reason);
}
public int getMessageType() {
return GroupCreateResponseMessage;
}
}创建群聊处理器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22.Sharable
public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
protected void channelRead0(ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception {
String groupName = msg.getGroupName();
Set<String> members = msg.getMembers();
// 群管理器
GroupSession groupSession = GroupSessionFactory.getGroupSession();
Group group = groupSession.createGroup(groupName, members);
if (group == null) {
// 发生成功消息
ctx.writeAndFlush(new GroupCreateResponseMessage(true, groupName + "创建成功"));
// 发送拉群消息
List<Channel> channels = groupSession.getMembersChannel(groupName);
for (Channel channel : channels) {
channel.writeAndFlush(new GroupCreateResponseMessage(true, "您已被拉入" + groupName));
}
} else {
ctx.writeAndFlush(new GroupCreateResponseMessage(false, groupName + "已经存在"));
}
}
}服务器端添加创建群聊处理器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ChatServer {
public static void main(String[] args) {
…………
// 创建群聊处理器
GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER = new GroupCreateRequestMessageHandler();
…………
try {
…………
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
…………
// 添加创建群聊处理器
ch.pipeline().addLast(GROUP_CREATE_HANDLER);
…………
}
});
…………
}
…………
}
}客户端添加创建群聊处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ChatClient {
public static void main(String[] args) {
…………
try {
…………
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
…………
ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
// 在连接建立后触发active事件
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 负责接收用户在控制台的输入,负责向服务器发送各种消息
new Thread(() -> {
…………
while (true) {
…………
String[] s = command.split(" ");
switch (s[0]){
case "gcreate":
Set<String> set = new HashSet<>(Arrays.asList(s[2].split(",")));
set.add(username); // 加入自己
ctx.writeAndFlush(new GroupCreateRequestMessage(s[1], set));
break;
}
}
}, "system in").start();
}
…………
});
}
});
…………
}
…………
}
}先启动服务端,再启动三个客户端并登录完毕,由zhangsan拉入lisi和wangwu进入群聊后三个客户端控制台打印如下:
服务器端打印如下:
5.4.2 发送群聊消息
群聊聊天请求消息实现类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class GroupChatRequestMessage extends Message {
private String content;
private String groupName;
private String from;
public GroupChatRequestMessage(String from, String groupName, String content) {
this.content = content;
this.groupName = groupName;
this.from = from;
}
public int getMessageType() {
return GroupChatRequestMessage;
}
}群聊聊天响应消息实现类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ChatResponseMessage extends AbstractResponseMessage {
private String from;
private String content;
public ChatResponseMessage(boolean success, String reason) {
super(success, reason);
}
public ChatResponseMessage(String from, String content) {
this.from = from;
this.content = content;
}
public int getMessageType() {
return ChatResponseMessage;
}
}群聊发送消息处理器。
1
2
3
4
5
6
7
8
9
10
11
12.Sharable
public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
protected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception {
List<Channel> channels = GroupSessionFactory.getGroupSession()
.getMembersChannel(msg.getGroupName());
// 向群内所有的成员发消息
for (Channel channel : channels) {
channel.writeAndFlush(new GroupChatResponseMessage(msg.getFrom(), msg.getContent()));
}
}
}服务器端添加创建群聊处理器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ChatServer {
public static void main(String[] args) {
…………
// 往群聊发送消息处理器
GroupChatRequestMessageHandler GROUP_CHAT_HANDLER = new GroupChatRequestMessageHandler();
…………
try {
…………
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
…………
// 添加群聊发送消息处理器
ch.pipeline().addLast(GROUP_CHAT_HANDLER);
…………
}
});
…………
}
…………
}
}客户端添加创建群聊处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ChatClient {
public static void main(String[] args) {
…………
try {
…………
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
…………
ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
// 在连接建立后触发active事件
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 负责接收用户在控制台的输入,负责向服务器发送各种消息
new Thread(() -> {
…………
while (true) {
…………
String[] s = command.split(" ");
switch (s[0]){
case "gsend":
ctx.writeAndFlush(new GroupChatRequestMessage(username, s[1], s[2]));
break;
}
}
}, "system in").start();
}
…………
});
}
});
…………
}
…………
}
}
5.4.3 加入群聊
加入群聊请求消息实现类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class GroupJoinRequestMessage extends Message {
private String groupName;
private String username;
public GroupJoinRequestMessage(String username, String groupName) {
this.groupName = groupName;
this.username = username;
}
public int getMessageType() {
return GroupJoinRequestMessage;
}
}加入群聊响应消息实现类。
1
2
3
4
5
6
7
8
9
10
11
12
13
public class GroupJoinResponseMessage extends AbstractResponseMessage {
public GroupJoinResponseMessage(boolean success, String reason) {
super(success, reason);
}
public int getMessageType() {
return GroupJoinResponseMessage;
}
}加入群聊处理器。
1
2
3
4
5
6
7
8
9
10
11
12.Sharable
public class GroupJoinRequestMessageHandler extends SimpleChannelInboundHandler<GroupJoinRequestMessage> {
protected void channelRead0(ChannelHandlerContext ctx, GroupJoinRequestMessage msg) throws Exception {
Group group = GroupSessionFactory.getGroupSession().joinMember(msg.getGroupName(), msg.getUsername());
if (group != null) {
ctx.writeAndFlush(new GroupJoinResponseMessage(true, msg.getGroupName() + "群加入成功"));
} else {
ctx.writeAndFlush(new GroupJoinResponseMessage(true, msg.getGroupName() + "群不存在"));
}
}
}服务器端添加加入群聊处理器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ChatServer {
public static void main(String[] args) {
…………
// 加群处理器
GroupJoinRequestMessageHandler GROUP_JOIN_HANDLER = new GroupJoinRequestMessageHandler();
…………
try {
…………
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
…………
ch.pipeline().addLast(GROUP_JOIN_HANDLER);
…………
}
});
…………
}
…………
}
}客户端添加加入群聊处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ChatClient {
public static void main(String[] args) {
…………
try {
…………
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
…………
ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
// 在连接建立后触发active事件
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 负责接收用户在控制台的输入,负责向服务器发送各种消息
new Thread(() -> {
…………
while (true) {
…………
String[] s = command.split(" ");
switch (s[0]){
case "gjoin":
ctx.writeAndFlush(new GroupJoinRequestMessage(username, s[1]));
break;
}
}
}, "system in").start();
}
…………
});
}
});
…………
}
…………
}
}
5.4.4 退出群聊
退出群聊请求消息实现类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class GroupQuitRequestMessage extends Message {
private String groupName;
private String username;
public GroupQuitRequestMessage(String username, String groupName) {
this.groupName = groupName;
this.username = username;
}
public int getMessageType() {
return GroupQuitRequestMessage;
}
}退出群聊响应消息实现类。
1
2
3
4
5
6
7
8
9
10
11
12
public class GroupQuitResponseMessage extends AbstractResponseMessage {
public GroupQuitResponseMessage(boolean success, String reason) {
super(success, reason);
}
public int getMessageType() {
return GroupQuitResponseMessage;
}
}退出群聊处理器。
1
2
3
4
5
6
7
8
9
10
11
12.Sharable
public class GroupQuitRequestMessageHandler extends SimpleChannelInboundHandler<GroupQuitRequestMessage> {
protected void channelRead0(ChannelHandlerContext ctx, GroupQuitRequestMessage msg) throws Exception {
Group group = GroupSessionFactory.getGroupSession().removeMember(msg.getGroupName(), msg.getUsername());
if (group != null) {
ctx.writeAndFlush(new GroupQuitResponseMessage(true, "已退出群" + msg.getGroupName()));
} else {
ctx.writeAndFlush(new GroupQuitResponseMessage(true, msg.getGroupName() + "群不存在"));
}
}
}服务器端添加退出群聊处理器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ChatServer {
public static void main(String[] args) {
…………
// 退群处理器
GroupQuitRequestMessageHandler GROUP_QUIT_HANDLER = new GroupQuitRequestMessageHandler();
…………
try {
…………
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
…………
ch.pipeline().addLast(GROUP_QUIT_HANDLER);
…………
}
});
…………
}
…………
}
}客户端添加退出群聊处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ChatClient {
public static void main(String[] args) {
…………
try {
…………
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
…………
ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
// 在连接建立后触发active事件
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 负责接收用户在控制台的输入,负责向服务器发送各种消息
new Thread(() -> {
…………
while (true) {
…………
String[] s = command.split(" ");
switch (s[0]){
case "gquit":
ctx.writeAndFlush(new GroupQuitRequestMessage(username, s[1]));
break;
}
}
}, "system in").start();
}
…………
});
}
});
…………
}
…………
}
}
5.4.5 获取群员
获取聊员请求消息实现类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class GroupMembersRequestMessage extends Message {
private String groupName;
public GroupMembersRequestMessage(String groupName) {
this.groupName = groupName;
}
public int getMessageType() {
return GroupMembersRequestMessage;
}
}获取聊员响应消息实现类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class GroupMembersResponseMessage extends Message {
private Set<String> members;
public GroupMembersResponseMessage(Set<String> members) {
this.members = members;
}
public int getMessageType() {
return GroupMembersResponseMessage;
}
}获取群成员信息处理器。
1
2
3
4
5
6
7
8
9.Sharable
public class GroupMembersRequestMessageHandler extends SimpleChannelInboundHandler<GroupMembersRequestMessage> {
protected void channelRead0(ChannelHandlerContext ctx, GroupMembersRequestMessage msg) throws Exception {
Set<String> members = GroupSessionFactory.getGroupSession()
.getMembers(msg.getGroupName());
ctx.writeAndFlush(new GroupMembersResponseMessage(members));
}
}服务器端省略。
客户端。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ChatClient {
public static void main(String[] args) {
…………
try {
…………
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
…………
ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
// 在连接建立后触发active事件
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 负责接收用户在控制台的输入,负责向服务器发送各种消息
new Thread(() -> {
…………
while (true) {
…………
String[] s = command.split(" ");
switch (s[0]){
case "gmembers":
ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));
break;
}
}
}, "system in").start();
}
…………
});
}
});
…………
}
…………
}
}
5.5 聊天室业务-退出
退出登录处理器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {
// 当连接断开时触发inactive事件
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
SessionFactory.getSession().unbind(ctx.channel());
log.debug("{} 已经断开", ctx.channel());
}
// 当出现异常时触发
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
SessionFactory.getSession().unbind(ctx.channel());
log.debug("{} 已经异常断开 异常是{}", ctx.channel(), cause.getMessage());
}
}服务器端省略。
客户端。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ChatClient {
public static void main(String[] args) {
…………
try {
…………
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
…………
ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
// 在连接建立后触发active事件
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 负责接收用户在控制台的输入,负责向服务器发送各种消息
new Thread(() -> {
…………
while (true) {
…………
String[] s = command.split(" ");
switch (s[0]){
case "quit":
ctx.channel().close();
return;
}
}
}, "system in").start();
}
…………
});
}
});
…………
}
…………
}
}
5.6 聊天室业务-空闲检测
在网络编程中可能会出现
连接假死
的现象,可能的原因如下:- 网络设备出现故障,例如网卡,机房等,底层的TCP连接已经断开了,但应用程序没有感知到,仍然占用着资源。(TCP连接只是主机运输层维护的状态,如果物理层断开,运输层是无法感知的,也就是说物理层不维护TCP连接)
- 公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,就这么一直耗着。
- 应用程序线程阻塞,无法进行数据读写。
会导致如下问题:
- 假死的连接占用的资源不能自动释放。
- 向假死的连接发送数据,得到的反馈是发送超时。
服务器端解决:怎么判断客户端连接是否假死呢?如果能收到客户端数据,说明没有假死。因此策略就可以定为,每隔一段时间就检查这段时间内是否接收到客户端数据,没有就可以判定为连接假死。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20ch.pipeline().addLast(MESSAGE_CODEC);
// 用来判断是不是读空闲时间过长,或写空闲时间过长
// 5s内如果没有收到channel的数据,会触发一个IdleState#READER_IDLE事件
ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
// ChannelDuplexHandler可以同时作为入站和出站处理器
ch.pipeline().addLast(new ChannelDuplexHandler() {
// 用来触发特殊事件
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
IdleStateEvent event = (IdleStateEvent) evt;
// 触发了读空闲事件
if (event.state() == IdleState.READER_IDLE) {
log.debug("已经5s没有读到数据了");
ctx.channel().close();
}
}
});
ch.pipeline().addLast(LOGIN_HANDLER);客户端定时心跳:客户端可以定时向服务器端发送数据,只要这个时间间隔小于服务器定义的空闲检测的时间间隔,那么就能防止前面提到的误判,客户端可以定义如下心跳处理器。
1
2
3
4
5
6public class PingMessage extends Message {
public int getMessageType() {
return PingMessage;
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18ch.pipeline().addLast(MESSAGE_CODEC);
// 用来判断是不是读空闲时间过长,或写空闲时间过长
// 3s内如果没有向服务器写数据,会触发一个IdleState#WRITER_IDLE事件
ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
// ChannelDuplexHandler可以同时作为入站和出站处理器
ch.pipeline().addLast(new ChannelDuplexHandler() {
// 用来触发特殊事件
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
IdleStateEvent event = (IdleStateEvent) evt;
// 触发了写空闲事件
if (event.state() == IdleState.WRITER_IDLE) {
log.debug("3s没有写数据了,发送一个心跳包");
ctx.writeAndFlush(new PingMessage());
}
}
});