引言
在前面关于《Netty入门篇》的文章中,我们现已开端对Netty
这个闻名的网络结构有了认知,本章的意图则是接受上文,再对Netty
中的一些进阶知识进行论述,究竟前面的内容中,仅论述了一些Netty
的核心组件,想要真正把握Netty
结构,关于它我们应该具有更为全面的认知。
一、Netty中的粘包半包问题
实践上粘包、半包问题,并不仅仅只在Netty
中存在,凡是依据TCP
协议构建的网络组件,根本都需求面临这两个问题,关于粘包问题,在之前关于《计算机网络与协议簇-TCP沾包》中也曾讲到过:
但其时我写成了沾包,但实践上专业的术语解释为:粘包,这儿我纠正一下,接着再简略说清楚粘包和半包的问题:
粘包:这种现象就好像其名,指通信双方中的一端发送了多个数据包,但在另一端则被读取成了一个数据包,比方客户端发送
123、ABC
两个数据包,但服务端却收成的却是123ABC
这一个数据包。形成这个问题的实质原因,在前面TCP
的章节中讲过,这首要是由于TPC
为了优化传输功率,将多个小包兼并成一个大包发送,一同多个小包之间没有鸿沟切开形成的。
半包:指通信双方中的一端发送一个大的数据包,但在另一端被读取成了多个数据包,例如客户端向服务端发送了一个数据包:
ABCDEFGXYZ
,而服务端则读取成了ABCEFG、XYZ
两个包,这两个包实践上都是一个数据包中的一部分,这个现象则被称之为半包问题(发生这种现象的原因在于:接纳方的数据接纳缓冲区过小导致的)。
上述说到的这两种网络通信的问题具体该怎么处理,这点我们放到后面再细说,先来看看Netty
中的沾包和半包问题。
1.1、Netty的粘包、半包问题演示
这儿也就不多说废话了,结合《Netty入门篇》的知识,快速搭建出一个服务端、客户端的通信事例,如下:
// 演示数据粘包问题的服务端
public class AdhesivePackageServer {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server.group(group);
server.channel(NioServerSocketChannel.class);
server.childHandler(new ServerInitializer());
server.bind("127.0.0.1",8888);
}
}
// 演示粘包、半包问题的通用初始化器
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 数据安排稳当事情:当收到客户端数据时会读取通道内的数据
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
// 在这儿直接输出通道内的数据信息
System.out.println(ctx.channel());
super.channelReadComplete(ctx);
}
});
}
}
// 演示数据粘包问题的客户端
public class AdhesivePackageClient {
public static void main(String[] args) {
EventLoopGroup worker = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(worker);
client.channel(NioSocketChannel.class);
client.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 在通道准备安排稳当后会触发的事情
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
// 向服务端发送十次数据,每次发送一个字节!
for (int i = 0; i < 10; i++) {
System.out.println("正在向服务端发送第"+
i +"次数据......");
ByteBuf buffer = ctx.alloc().buffer(1);
buffer.writeBytes(new byte[]{(byte) i});
ctx.writeAndFlush(buffer);
}
}
});
}
});
client.connect("127.0.0.1", 8888).sync();
} catch (Exception e){
e.printStackTrace();
} finally {
worker.shutdownGracefully();
}
}
}
这个事例中的代码也并不难了解,客户端的代码中,会向服务端发送十次数据,而服务端仅仅只做了数据读取的动作而已,接着来看看运转成果:
从运转成果中可显着观测到,客户端发送的十个1Bytes
的数据包,在服务端直接被兼并成了一个10Bytes
的数据包,这显着便是粘包的现象,接着再来看看半包的问题,代码如下:
// 演示半包问题的服务端
public class HalfPackageServer {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server.group(group);
server.channel(NioServerSocketChannel.class);
// 调整服务端的接纳窗口巨细为四字节
server.option(ChannelOption.SO_RCVBUF,4);
server.childHandler(new ServerInitializer());
server.bind("127.0.0.1",8888);
}
}
// 演示半包问题的客户端
public class HalfPackageClient {
public static void main(String[] args) {
EventLoopGroup worker = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(worker);
client.channel(NioSocketChannel.class);
client.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 在通道准备安排稳当后会触发的事情
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
// 向服务端发送十次数据,每次发送十个字节!
for (int i = 0; i < 10; i++) {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]
{'a','b','c','d','e','f','g','x','y','z'});
ctx.writeAndFlush(buffer);
}
}
});
}
});
client.connect("127.0.0.1", 8888).sync();
} catch (Exception e){
e.printStackTrace();
} finally {
worker.shutdownGracefully();
}
}
}-
上面的代码中,客户端向服务端发送了十次数据,每次数据会发送10
个字节,而在服务端多加了下述这行代码:
server.option(ChannelOption.SO_RCVBUF,4);
这行代码的作用是调整服务端的接纳窗口巨细为四字节,由于默许的接纳窗口较大,客户端需求一次性发送很多数据才能演示出半包现象,这儿为了便于演示,因而将接纳窗口调小,运转成果如下:
从上述运转成果中,也可以显着观察到半包现象,客户端发送的十个数据包,每个包中的数据都为10
字节,但服务端中,接纳到的数据显着并不契合预期,尤其是第三个数据包,是一个不折不扣的半包现象。
1.2、粘包、半包问题的发生原因
前面简略聊了一下粘包、半包问题,但这些问题究竟是什么原因导致的呢?关于这点前面并未深入探讨,这儿来做一致解说,想要弄了解粘包、半包问题的发生原因,这还得说回TCP
协议,我们还记得之前说过的TCP-滑动窗口嘛?
1.2.1、TCP协议的滑动窗口
由于TCP
是一种可靠性传输协议,所以在网络通信进程中,会选用一问一答的形式,也便是一端发送数据后,必须得到另一端回来ACK
呼应后,才会持续发送后续的数据。但这种一问一答的同步办法,显着会非常影响数据的传输功率。
TCP
协议为了处理传输功率的问题,引入了一种名为滑动窗口的技能,也便是在发送方和接纳方上各有一个缓冲区,这个缓冲区被称为“窗口”,假定发送方的窗口巨细为100KB
,那么发送端的前100KB
数据,无需等候接纳端回来ACK
,可以一向发送,直到发满100KB
数据中止。
假如发送端在发送前
100KB
数据时,接纳端回来了某个数据包的ACK
,那此刻发送端的窗口会一向向下滑动,比方开端窗口规模是0~100KB
,收到ACK
后会滑动到20~120KB、120~220KB....
(实践上窗口的巨细、规模,TCP
会依据网络拥塞程度、ACK
呼应时刻等状况来自动调整)。
一同,除开发送方有窗口外,接纳方也会有一个窗口,接纳方只会读取窗口规模之内的数据,假如超出窗口规模的数据并不会读取,这也就意味着不会对窗口之外的数据包回来ACK
,所以发送方在未收到ACK
时,对应的窗口会中止向后滑动,并在必定时刻后对未回来ACK
的数据进行重发。
关于TCP
的滑动窗口,发送方的窗口起到优化传输功率的作用,而接纳端的窗口起到流量操控的作用。
1.2.2、传输层的MSS与链路层的MTU
了解了滑动窗口的概念后,接着来说说MSS、MTU
这两个概念,MSS
是传输层的最大报文长度约束,而MTU
则是链路层的最大数据包巨细约束,一般MTU
会约束MSS
,比方MTU=1500
,那么MSS
最大只能为1500
减去报文头长度,以TCP
协议为例,MSS
最大为1500-40=1460
。
为什么需求这个约束呢?这是由于网络设备硬件导致的,比方恣意类型的网卡,不或许让一个数据包无限增长,由于网卡会有带宽约束,比方一次性传输一个1GB
的数据包,假如不约束巨细直接发送,这会导致网络呈现阻塞,并且超出网络硬件设备单次传输的最大约束。
所以当一个数据包,超出
MSS
巨细时,TCP
协议会自动切开这个数据包,将该数据包拆分红一个个的小包,然后分批次进行传输,然后完成大文件的传输。
1.2.3、TCP协议的Nagle算法
依据MSS
最大报文约束,可以完成大文件的切开并分批发送,但在网络通信中,还有另一种特别状况,便是极小的数据包传输,由于TCP
的报文头默许会有40
个字节,假如数据只要1
字节,那加上报文头仍旧会发生一个41
字节的数据包。
假如这种体积较小的数据包在传输中经常呈现,这定然会导致网络资源的糟蹋,究竟数据包中只要
1
字节是数据,另外40
个字节是报文头,假如呈现1W
个这样的数据包,也就意味着会发生400MB
的报文头,但实践数据只占10MB
,这显着是不稳当的。
正是由于上述原因,因而TCP
协议中引入了一种名为Nagle
的算法,如若接连几次发送的数据都很小,TCP
会依据算法把多个数据兼并成一个包宣布,然后优化网络传输的功率,并且削减对资源的占用。
1.2.4、使用层的接纳缓冲区和发送缓冲区
关于操作系统的IO
函数而言,网络数据不管是发送也好,仍是接纳也罢,并不会选用“复制”的办法作业,比方现在想要传输一个10MB
的数据,不或许直接将这个数据一次性拷贝到缓冲区内,而是一个一个字节进行传输,举个比方:
假定现在要发送
ABCDEFGXYZ....
这组数据,IO
函数会挨个将每个字节放到发送缓冲区中,会呈现A、B、C、D、E、F....
这个次序挨个写入,而接纳方仍旧如此,读取数据时也会一个个字节读取,以A、B、C、D、E、F....
这个次序读取一个数据包中的数据(实践状况会杂乱一些,或许会按必定单位操作数据,而并不是以单个字节作为单位)。
而使用程序为了发送/接纳数据,通常都需求具有两个缓冲区,即所说的接纳缓冲区和发送缓冲区,一个用来暂存要发送的数据,另一个则用来暂存接纳到的数据,一同这两个缓冲区的巨细,可自行调整其巨细(Netty
默许的接纳/发送缓冲区巨细为1024KB
)。
1.2.5、粘包、半包问题的发生原因
了解了上述几个概念后,接着再来看看粘包和半包就简略很多了,粘包和半包问题,或许会由多方面因素导致,如下:
- 粘包:发送
12345、ABCDE
两个数据包,被接纳成12345ABCDE
一个数据包,多个包粘在一同。- 使用层:接纳方的接纳缓冲区太大,导致读取多个数据包一同输出。
-
TCP
滑动窗口:接纳方窗口较大,导致发送方宣布多个数据包,处理不及时形成粘包。 -
Nagle
算法:由于发送方的数据包体积过小,导致多个数据包兼并成一个包发送。
- 半包:发送
12345ABCDE
一个数据包,被接纳成12345、ABCDE
两个数据包,一个包拆成多个。- 使用层:接纳方缓冲区太小,无法存方发送方的单个数据包,因而拆开读取。
- 滑动窗口:接纳方的窗口太小,无法一次性放下完好数据包,只能读取其间一部分。
-
MSS
约束:发送方的数据包超过MSS
约束,被拆分为多个数据包发送。
上述便是呈现粘包、半包问题的根本原因,更多的是由于TCP
协议形成的,所以想要处理这两个问题,就得自己重写底层的TCP
协议,这关于我们而言并不现实,究竟TCP/IP
协议栈,根本包括各式各样的网络设备,想要从根源上处理粘包、半包问题,重写协议后还得替换掉一切网络设备内部的TCP
完成,现在世界上没有任何一个安排、企业、个人具有这样的影响力。
1.3、粘包、半包问题的处理计划
既然无法在底层从根源上处理问题,那此刻可以换个思路,也便是从使用层动身,粘包、半包问题都是由于数据包与包之间,没有鸿沟切开导致的,那想要处理这样的问题,发送方可以在每个数据包的尾部,自己拼接一个特别分隔符,接纳方读取到数据时,再依据对应的分隔符读取数据即可。
关于其他的一些网络编程的技能栈,我们不做过多延伸,要点来聊一聊Netty
中的粘包、半包问题该怎么处理呢?其实这也并不需求自己动手处理,由于Netty
内部早已内置了相关完成,究竟我们能想到的问题,结构的设计者也早已料到,接着一同来看看Netty
的处理计划吧。
1.3.1、运用短衔接处理粘包问题
关于短衔接我们应该都不生疏,HTTP/1.0
版别中,默许运用的便是TCP
短衔接,这是指客户端在发送一次数据后,就会立马断开与服务端的网络衔接,在客户端断开衔接后,服务端会收到一个-1
的状况码,而我们可以用这个作为音讯(数据)的鸿沟,以此区分不同的数据包,如下:
// 演示通过短衔接处理粘包问题的服务端
public class AdhesivePackageServer {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server.group(group);
server.channel(NioServerSocketChannel.class);
server.childHandler(new ServerInitializer());
server.bind("127.0.0.1",8888);
}
}
// 演示通过短衔接处理粘包问题的客户端
public class Client {
public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
sendData();
}
}
private static void sendData(){
EventLoopGroup worker = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(worker);
client.channel(NioSocketChannel.class);
client.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 在通道准备安排稳当后会触发的事情
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
// 向服务端发送一个20字节的数据包,然后断开衔接
ByteBuf buffer = ctx.alloc().buffer(1);
buffer.writeBytes(new byte[]
{'0','1','2','3','4',
'5','6','7','8','9',
'A','B','C','D','E',
'M','N','X','Y','Z'});
ctx.writeAndFlush(buffer);
ctx.channel().close();
}
});
}
});
client.connect("127.0.0.1", 8888).sync();
} catch (Exception e){
e.printStackTrace();
} finally {
worker.shutdownGracefully();
}
}
}
服务端的代码,仍旧用之前演示粘包问题的AdhesivePackageServer
,上述只对客户端的代码进行了改造,首要是将创立客户端衔接、发送数据的代码抽象成了一个办法,然后在循环内部调用该办法,运转成果如下:
从运转成果中可以看出,发送的3
个数据包,都未呈现粘包问题,每个数据包之间都是独立切开的。但这种办法处理粘包问题,实践上归于一种“投机取巧”的计划,究竟每个数据包都选用新的衔接发送,在操作系统级别来看,每个数据包都源自于不同的网络套接字,天然会分开读取。
但这种办法无法处理半包问题,例如这儿我们将服务端的接纳缓冲区调小:
// 演示半包问题的服务端
public class HalfPackageServer {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server.group(group);
server.channel(NioServerSocketChannel.class);
// 调整服务端的接纳缓冲区巨细为16字节(最小为16,无法设置更小)
server.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(16,16,16));
server.childHandler(new ServerInitializer());
server.bind("127.0.0.1",8888);
}
}
然后再发动这个服务端,接着再发动前面的客户端,作用如下:
从成果中仍旧会发现,多个数据包之间仍是发生了半包问题,由于服务端的接纳缓冲区一次性最大只能存下16Bytes
数据,所以客户端每次发送20Bytes
数据,无法全部存入缓冲区,终究就呈现了一个数据包被拆成多个包读取。
正由于短衔接这种办法,无法很好的处理半包问题,所以一般线上除开特别场景外,不然不会运用短衔接这种形式来单独处理粘包问题,接着看看Netty
中供给的一些处理计划。
1.3.2、定长帧解码器
前面聊到的短衔接办法,处理粘包问题的思路归于投机取巧行为,一同也需求频繁的树立/断开衔接,这无论是从资源利用率、仍是程序执行的功率上来说,都并不稳当,而Netty
中供给了一系列处理粘包、半包问题的完成类,即Netty
的帧解码器,先来看看定长帧解码器,事例如下:
// 通过定长帧解码器处理粘包、半包问题的演示类
public class FixedLengthFrameDecoderDemo {
public static void main(String[] args) {
// 通过Netty供给的测验通道来代替服务端、客户端
EmbeddedChannel channel = new EmbeddedChannel(
// 增加一个定长帧解码器(每条数据以8字节为单位拆包)
new FixedLengthFrameDecoder(8),
new LoggingHandler(LogLevel.DEBUG)
);
// 调用三次发送数据的办法(等价于向服务端发送三次数据)
sendData(channel,"ABCDEGF",8);
sendData(channel,"XYZ",8);
sendData(channel,"12345678",8);
}
private static void sendData(EmbeddedChannel channel, String data, int len){
// 获取发送数据的字节长度
byte[] bytes = data.getBytes();
int dataLength = bytes.length;
// 依据固定长度补齐要发送的数据
String alignString = "";
if (dataLength < len){
int alignLength = len - bytes.length;
for (int i = 1; i <= alignLength; i++) {
alignString = alignString + "*";
}
}
// 拼接上补齐字符,得到终究要发送的音讯数据
String msg = data + alignString;
byte[] msgBytes = msg.getBytes();
// 构建缓冲区,通过channel发送数据
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(8);
buffer.writeBytes(msgBytes);
channel.writeInbound(buffer);
}
}
留意看上述这个事例,在其间就并未搭建服务端、客户端了,而是选用EmbeddedChannel
目标来测验,这个通道是Netty
供给的测验通道,可以依据它来快速搭建测验用例,上述中的:
new EmbeddedChannel(
new FixedLengthFrameDecoder(8),
new LoggingHandler(LogLevel.DEBUG)
);
这段代码,就类似于之前在服务端的pipeline
增加处理器的进程,等价于下述这段代码:
socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(8));
socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
了解了EmbeddedChannel
后,接着先来看看运转成果,如下:
留意看上述成果,在该事例中,服务端会以8Bytes
为单位,然后对数据进行分包处理,平均每读取8Bytes
数据,就会将其当作一个数据包。假如客户端发送的一条数据,长度没有8
个字节,在sendData()
办法中则会以*
号补齐。比方上图中,发送了一条XYZ
数据,由于长度只要3
字节,所以会再拼接五个*
号补齐八字节的长度。
这种选用固定长度解析数据的办法,确实可以有用防止粘包、半包问题的呈现,由于每个数据包之间,会以八个字节的长度作为鸿沟,然后切开数据。但这种办法也存在三个致命缺陷:
- ①只适用于传输固定长度规模内的数据场景,并且客户端在发送数据前,还需自己依据长度补齐数据。
- ②假如发送的数据超出固定长度,服务端仍旧会按固定长度分包,所以仍然会存在半包问题。
- ③关于未达到固定长度的数据,还需求额定传输补齐的
*
号字符,会占用不用要的网络资源。
1.3.3、行帧解码器
上面说到的定长帧解码器,由于运用时存在少许约束,运用它来解析数据就并不那么灵活,尤其是针关于一些数据长度可变的场景,显得就有少许乏力,因而Netty
中还供给了行帧解码器,事例如下:
// 通过行帧解码器处理粘包、半包问题的演示类
public class LineFrameDecoderDemo {
public static void main(String[] args) {
// 通过Netty供给的测验通道来代替服务端、客户端
EmbeddedChannel channel = new EmbeddedChannel(
// 增加一个行帧解码器(在超出1024后还未检测到换行符,就会中止读取)
new LineBasedFrameDecoder(1024),
new LoggingHandler(LogLevel.DEBUG)
);
// 调用三次发送数据的办法(等价于向服务端发送三次数据)
sendData(channel,"ABCDEGF");
sendData(channel,"XYZ");
sendData(channel,"12345678");
}
private static void sendData(EmbeddedChannel channel, String data){
// 在要发送的数据完毕,拼接上一个\n换行符(\r\n也可以)
String msg = data + "\n";
// 获取发送数据的字节长度
byte[] msgBytes = msg.getBytes();
// 构建缓冲区,通过channel发送数据
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(8);
buffer.writeBytes(msgBytes);
channel.writeInbound(buffer);
}
}
在上述事例中,我们给服务端增加了一个LineBasedFrameDecoder(1024)
行解码器,其间有个1024
的数字,这是啥意思呢?这个是数据的最大长度约束,究竟在网络接纳进程中,假如一向没有读取到换行符,总不能一向接纳下去,所以当数据的长度超出该值后,Netty
会默许将前面读到的数据分红一个数据包。
一同在发送数据的sendData()
办法中,这回就无需我们自己补齐数据了,只需在每个要发送的数据完毕,手动拼接上一个\n
或\r\n
换行符即可,服务端在读取数据时,会按换行符来作为鸿沟切开,运转成果如下:
从成果中可以看出,每个数据包都是按客户端发送的格式做了解析,并未呈现粘包、半包现象。
1.3.4、分隔符帧解码器
上面聊了以换行符作为分隔符的解码器,但Netty
中还供给了自定义分隔符的解码器,运用这种解码器,能让诸位随心所欲的定义自己的分隔符,事例如下:
public class DelimiterFrameDecoderDemo {
public static void main(String[] args) {
// 自定义一个分隔符(记得要用ByteBuf目标来包装)
ByteBuf delimiter = ByteBufAllocator.DEFAULT.buffer(1);
delimiter.writeByte('*');
// 通过Netty供给的测验通道来代替服务端、客户端
EmbeddedChannel channel = new EmbeddedChannel(
// 增加一个分隔符帧解码器(传入自定义的分隔符)
new DelimiterBasedFrameDecoder(1024,delimiter),
new LoggingHandler(LogLevel.DEBUG)
);
// 调用三次发送数据的办法(等价于向服务端发送三次数据)
sendData(channel,"ABCDEGF");
sendData(channel,"XYZ");
sendData(channel,"12345678");
}
private static void sendData(EmbeddedChannel channel, String data){
// 在要发送的数据完毕,拼接上一个*号(由于前面自定义的分隔符为*号)
String msg = data + "*";
// 获取发送数据的字节长度
byte[] msgBytes = msg.getBytes();
// 构建缓冲区,通过channel发送数据
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(8);
buffer.writeBytes(msgBytes);
channel.writeInbound(buffer);
}
}
这个事例的运转成果与上一个完全相同,不同点则在于换了一个解码器,换成了:
new DelimiterBasedFrameDecoder(1024,delimiter)
而后发送数据的时分,对每个数据的完毕,手动拼接一个*
号作为分隔符即可。
相较于本来的定长解码器,行解码器、自定义分隔符解码器显着更加灵活,由于支撑可变长度的数据,但这两种解码器,仍旧存在少许缺陷:
- ①关于每一个读取到的字节都需求判别一下:是否为完毕的分隔符,这会影响整体功用。
- ②仍旧存在最大长度约束,当数据超出最大长度后,会自动将其分包,在数据传输量较大的状况下,仍旧会导致半包现象呈现。
1.3.5、LTC帧解码器
前面聊过的多个解码器中,无论是哪个,都多多少少会存在少许不完美,因而Netty
终究供给了一款LTC
解码器,这个解码器也归于实践Netty
开发中,使用最为广泛的一种,但了解起来稍微有些杂乱,先来看看它的结构办法:
public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset,
int lengthFieldLength,
int lengthAdjustment,
int initialBytesToStrip) {
this(maxFrameLength,
lengthFieldOffset,
lengthFieldLength,
lengthAdjustment,
initialBytesToStrip, true);
}
// 暂时省略其他参数的结构办法......
}
从上述结构器中可显着看出,LTC
中存在五个参数,看起来都比较长,接着简略解释一下:
-
maxFrameLength
:数据最大长度,答应单个数据包的最大长度,超出长度后会自动分包。 -
lengthFieldOffset
:长度字段偏移量,表明描绘数据长度的信息从第几个字段开端。 -
lengthFieldLength
:长度字段的占位巨细,表明数据中的运用了几个字节描绘正文长度。 -
lengthAdjustment
:长度调整数,表明在长度字段的N
个字节后才是正文数据的开端。 -
initialBytesToStrip
:头部剥离字节数,表明先将数据去掉N
个字节后,再开端读取数据。
上述这种办法描绘五个参数,我们估计了解起来有些困难,那么下面结合Netty
源码中的注释,先把这几个参数完全搞了解再说,先来看个事例:
比方上述这组数据,对应的参数如下:
lengthFieldOffset = 0
lengthFieldLength = 4
lengthAdjustment = 0
initialBytesToStrip = 0
这组参数表明啥意思呢?表明现在这条数据,长度字段从第0
个字节开端,运用4
个字节来描绘数据长度,这时服务端会读取数据的前4
个字节,得到正文数据的长度,然后得知:在第四个字节之后,再往后读十个字节,是一条完好的数据,终究向后读取10
个字节,终究就会读到Hi, ZhuZi.
这条数据。
但上述这种办法对数据解码之后,读取时仍旧会显现长度字段,也便是前四个用来描绘长度的字节也会被读到,因而终究会显现出10Hi, ZhuZi.
这样的格式,那假如想要去掉前面的长度字段怎么办呢?这需求用到initialBytesToStrip
参数,如下:
lengthFieldOffset = 0
lengthFieldLength = 4
lengthAdjustment = 0
initialBytesToStrip = 4
这组参数又是啥意思呢?其实和前面那一组数据没太大的改变,仅仅用initialBytesToStrip
声明要剥离掉前4
个字节,所以数据通过解码后,终究会去掉前面描绘长度的四个字节,仅显现Hi, ZhuZi.
这十个字节的数据。
上述这种形式,其实便是预设了一个长度字段,服务端、客户端之间约定运用N
个字节来描绘数据长度,接着在读取数据时,读取指定个字节,得到本次数据的长度,终究可以正常解码数据。但这种办法只能满意最根本的数据传输,假如在数据中还需求增加一些正文信息,比方附加数据头信息、版别号的状况,又该怎么处理呢?如下:
lengthFieldOffset = 8
lengthFieldLength = 4
lengthAdjustment = 0
initialBytesToStrip = 0
上述这个示例中,假定附加信息占8Bytes
,这儿就需求用到lengthFieldOffset
参数,以此来表明长度字段偏移量是8
,这意味着读取数据时,要从第九个字节开端,往后读四个字节的数据,才可以得到描绘数据长度的字段,然后解析得到10
,终究再往后读取十个字节的数据,读到一条完好的数据。
当然,假如只想要读到正文数据怎么办?如下:
lengthFieldOffset = 8
lengthFieldLength = 4
lengthAdjustment = 0
initialBytesToStrip = 12
仍旧只需求通过initialBytesToStrip
参数,从头部剥离掉前12
个字节即可,这儿的12
个字节,由八字节的附加信息、四字节的长度描绘组成,去掉这两部分,天然就得到了正文数据。
OK,再来看另一种状况,假如长度字段在最前面,附加信息在中心,但我只想要读取正文数据怎么办呢?
lengthFieldOffset = 0
lengthFieldLength = 4
lengthAdjustment = 8
initialBytesToStrip = 12
在这儿我们又用到了lengthAdjustment
这个参数,这个参数是长度调整数的意思,上面的示例中赋值为8
,即表明从长度字段后开端,跳过8
个字节后,才是正文数据的开端。接纳方在解码数据时,首要会从0
开端读取四个字节,得到正文数据的长度为10
,接着会依据lengthAdjustment
参数,跳过中心8
个的字节,终究再往后读10
个字节数据,然后得到终究的正文数据。
OK~,通过上述几个示例的解说后,信任我们对给出的几个参数都有所了解,如若觉得有些晕乎,可回头再多仔细阅读几遍,这样有助于加深对各个参数的印象。但实质上来说,LTC
解码器,便是依据这些参数,来承认一条数据的长度、位置,然后读取到精确的数据,防止粘包、半包的现象发生,接下来上个Demo
了解:
// 通过LTC帧解码器处理粘包、半包问题的演示类
public class LTCDecoderDemo {
public static void main(String[] args) {
// 通过Netty供给的测验通道来代替服务端、客户端
EmbeddedChannel channel = new EmbeddedChannel(
// 增加一个行帧解码器(在超出1024后还未检测到换行符,就会中止读取)
new LengthFieldBasedFrameDecoder(1024,0,4,0,0),
new LoggingHandler(LogLevel.DEBUG)
);
// 调用三次发送数据的办法(等价于向服务端发送三次数据)
sendData(channel,"Hi, ZhuZi.");
}
private static void sendData(EmbeddedChannel channel, String data){
// 获取要发送的数据字节以及长度
byte[] dataBytes = data.getBytes();
int dataLength = dataBytes.length;
// 先将数据长度写入到缓冲区、再将正文数据写入到缓冲区
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeInt(dataLength);
buffer.writeBytes(dataBytes);
// 发送终究组装好的数据
channel.writeInbound(buffer);
}
}
上述事例中创立了一个LTC
解码器,对应的参数值为1024,0,4,0,0
,这别离对应前面的五个参数,如下:
maxFrameLength = 1024
lengthFieldOffset = 0
lengthFieldLength = 4
lengthAdjustment = 0
initialBytesToStrip = 0
这组值意思为:数据的第0~4
个字节是长度字段,用来描绘正文数据的长度,运转成果如下:
作用非常显着,既没有发生粘包、半包问题,并且无需逐一字节判别是否为切开符,这比照之前的几种解码器而言,这种办法的功率显着好上特别特别多。当然,上述成果中,假如想要去掉前面的四个.
,就只需求将initialBytesToStrip = 4
即可,从头部剥离掉四个字节再读取。
1.3.6、粘包、半包处理计划小结
前面介绍了短衔接、定长解码器、行解码器、分隔符解码器以及LTC
解码器这五种计划,其间我们需求紧记的是终究一种,由于其他的计划多少存在一些功用问题,而通过LTC
解码器这种办法处理粘包、半包问题的功率最好,由于无需逐一字节判别音讯鸿沟。
但实践
Netty
开发中,假如其他解码器更契合事务需求,也不用死死追求运用LTC
解码器,究竟技能为事务供给服务,适合自己事务的,才是最好的!
二、Netty的长衔接与心跳机制
关于长衔接、短衔接,这个概念在前面稍有提及,所谓的短衔接便是每次读写数据完成后,立马断开客户端与服务端的网络衔接。而长衔接则是相反的意思,一次数据交互完成后,服务端和客户端之间持续坚持衔接,当后续需再次收/发数据时,可直接复用原有的网络衔接。
长衔接这种模式,在并发较高的状况下可以带来额定的功用收益,由于
Netty
服务端、客户端绑定IP
端口,搭建Channel
通道的进程,放到底层实践上便是TCP
三次握手的进程,同理,客户端、服务端断开衔接的进程,即对应着TCP
的四次挥手。
我们都知道,TCP
三次握手/四次挥手,这个进程无疑是比较“重量级”的,并发状况下,频繁创立、毁掉网络衔接,其资源开支、功用开支会比较大,所以运用长衔接的计划,可以有用削减创立和毁掉网络衔接的动作。
那怎么让Netty
敞开长衔接支撑呢?这需求涉及到之前用过的ChannelOption
这个类,接着来具体讲讲它。
2.1、Netty调整网络参数(ChannelOption)
ChannelOption
是Netty
供给的参数调整类,该类中供给了很多常量,别离对应着底层TCP、UDP、
计算机网络的一些参数,在创立服务端、客户端时,我们可以通过ChannelOption
类来调整网络参数,以此满意不同的事务需求,该类中供给的常量列表如下:
-
ALLOCATOR
:ByteBuf
缓冲区的分配器,默许值为ByteBufAllocator.DEFAULT
。 -
RCVBUF_ALLOCATOR
:通道接纳数据的ByteBuf
分配器,默许为AdaptiveRecvByteBufAllocator.DEFAULT
。 -
MESSAGE_SIZE_ESTIMATOR
:音讯巨细估算器,默许为DefaultMessageSizeEstimator.DEFAULT
。 -
CONNECT_TIMEOUT_MILLIS
:设置客户端的衔接超时时刻,默许为3000ms
,超出会断开衔接。 -
MAX_MESSAGES_PER_READ
:一次Loop
最大读取的音讯数。-
ServerChannel/NioChannel
默许16
,其他类型的Channel
默许为1
。
-
-
WRITE_SPIN_COUNT
:一次Loop
最大写入的音讯数,默许为16
。- 一个数据
16
次还未写完,需求提交一个新的任务给EventLoop
,防止数据量较大的场景阻塞系统。
- 一个数据
-
WRITE_BUFFER_HIGH_WATER_MARK
:写高水位符号,默许为64K
,超出时Channel.isWritable()
回来Flase
。 -
WRITE_BUFFER_LOW_WATER_MARK
:写低水位符号,默许为32K
,超出高水位又下降到低水位时,isWritable()
回来True
。 -
WRITE_BUFFER_WATER_MARK
:写水位符号,假如写的数据量也超出该值,仍旧回来Flase
。 -
ALLOW_HALF_CLOSURE
:一个远程衔接封闭时,是否半关本地衔接,默许为Flase
。-
Flase
表明自动封闭本地衔接,为True
会触发入站处理器的userEventTriggered()
办法。
-
-
AUTO_READ
:自动读取机制,默许为True
,通道上有数据时,自动调用channel.read()
读取数据。 -
AUTO_CLOSE
:自动封闭机制,默许为Flase
,发生错误时不会断开与某个通道的衔接。 -
SO_BROADCAST
:设置播送机制,默许为Flase
,为True
时会敞开Socket
的播送音讯。 -
SO_KEEPALIVE
:敞开长衔接机制,一次数据交互完后不会立马断开衔接。 -
SO_SNDBUF
:发送缓冲区,用于保存要发送的数据,未收到接纳数据的ACK
之前,数据会存在这儿。 -
SO_RCVBUF
:接受缓冲区,用户保存要接受的数据。 -
SO_REUSEADDR
:是否复用IP
地址与端口号,敞开后可重复绑定同一个地址。 -
SO_LINGER
:设置延迟封闭,默许为-1
。-
-1
:表明禁用该功用,当调用close()
办法后会当即回来,底层会先处理完数据。 -
0
:表明禁用该功用,调用后当即回来,底层会直接放弃正在处理的数据。 - 大于
0
的正整数:封闭时等候n
秒,或数据处理完成才正式封闭。
-
-
SO_BACKLOG
:指定服务端的衔接行列长度,当衔接数达到该值时,会拒绝新的衔接恳求。 -
SO_TIMEOUT
:设置接受数据时等候的超时时刻,默许为0
,表明无限等候。 -
IP_TOS
: -
IP_MULTICAST_ADDR
:设置IP
头的Type-of-Service
字段,描绘IP
包的优先级和QoS
选项。 -
IP_MULTICAST_IF
:对应IP
参数IP_MULTICAST_IF
,设置对应地址的网卡为多播模式。 -
IP_MULTICAST_TTL
:对应IP
参数IP_MULTICAST_IF2
,同上但支撑IPv6
。 -
IP_MULTICAST_LOOP_DISABLED
:对应IP
参数IP_MULTICAST_LOOP
,设置本地回环地址的多播模式。 -
TCP_NODELAY
:敞开TCP
的Nagle
算法,会将多个小包兼并成一个大包发送。 -
DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION
:DatagramChannel
注册的EventLoop
即表明已激活。 -
SINGLE_EVENTEXECUTOR_PER_GROUP
:Pipeline
是否由单线程执行,默许为True
,一切处理器由一条线程执行,无需通过线程上下文切换。
上面列出了ChannelOption
类中供给的参数,其间包括了网络通用的参数、TCP
协议、UDP
协议以及IP
协议的参数,其他的我们无需过多关怀,这儿要点留意TCP
协议的两个参数:
-
TCP_NODELAY
:敞开TCP
的Nagle
算法,会将多个小包兼并成一个大包发送。 -
SO_KEEPALIVE
:敞开长衔接机制,一次数据交互完后不会立马断开衔接。
第一个参数便是之前聊到的Nagle
算法,而关于现在要聊的长衔接,便是SO_KEEPALIVE
这个参数,想要让这些参数收效,需求将其装载到对应的服务端/客户端上,Netty
中供给了两个装载参数的办法:
-
option()
:发生在衔接初始化阶段,也便是程序初始化时,就会装载该办法装备的参数。 -
childOption()
:发生在衔接树立之后,这些参数只要等衔接树立后才会被装载。
其实也可以这样了解,option()
办法装备的参数是对大局收效的,而childOption()
装备的参数,是针关于衔接收效的,而想要敞开长衔接装备,只需稍微改造一下服务端/客户端代码即可:
// 服务端代码
server.childOption(ChannelOption.SO_KEEPALIVE, true);
// 客户端代码
client.option(ChannelOption.SO_KEEPALIVE, true);
通过上述的办法敞开长衔接之后,TCP
默许每两小时会发送一次心跳检测,查看对端是否还存活,假如对端由于网络故障导致下线,TCP
会自动断开与对方的衔接。
2.2、Netty的心跳机制
前面聊到了Netty
的长衔接,其实实质上并不是Netty
供给的长衔接完成,而是通过调整参数,凭借传输层TCP
协议供给的长衔接机制,然后完成服务端与客户端的长衔接支撑。不过TCP
尽管供给了长衔接支撑,但其心跳机制并不够完善,Why
?其实答案很简略,由于心跳检测的距离时刻太长了,每隔两小时才检测一次!
或许有人会说:两小时就两小时,这有什么问题吗?其实问题有些大,由于两小时太长了,无法有用检测到机房断电、机器重启、网线拔出、防火墙更新等状况,假定一次心跳完毕后,对端就呈现了这些故障,依托
TCP
自身的心跳频率,需求等到两小时之后才能检测到问题。而这些现已失效的衔接应当及时除掉,不然会长时刻占用服务端资源,究竟服务端的可用衔接数是有限的。
所以,光依托TCP
的心跳机制,这无法保障我们的使用稳健性,因而一般开发中心件也好、通信程序也罢、亦或是RPC
结构等,都会在使用层再自完成一次心跳机制,而所谓的心跳机制,也并不是特别巨大上的东西,完成的思路有两种:
- 服务端自动勘探:每距离必定时刻后,向一切客户端发送一个检测信号,进程如下:
- 假定现在有三个节点,
A
为服务端,B、C
都为客户端。-
A
:你们还活着吗? -
B
:我还活着! -
C
:…..(假定挂掉了,无呼应)
-
-
A
收到了B
的呼应,但C
却未给出呼应,很有或许挂了,A
中止与C
的衔接。
- 假定现在有三个节点,
- 客户端自动奉告:每距离必定时刻后,客户端向服务端发送一个心跳包,进程如下:
- 仍旧是上述那三个节点。
-
B
:我还活着,不要开除我! -
C
:….(假定挂掉了,不发送心跳包) -
A
:收到B
的心跳包,但未收到C
的心跳包,将C
的网络衔接断开。
一般来说,一套健全的心跳机制,都会结合上述两种计划一同完成,也便是客户端定时向服务端发送心跳包,当服务端未收到某个客户端心跳包的状况下,再自意向客户端建议勘探包,这一步首要是做二次承认,防止由于网络拥塞或其他问题,导致本来客户端宣布的心跳包丢失。
2.2.1、心跳机制的完成思路分析
前面叨叨絮絮说了很多,那么在Netty
中该怎么完成呢?其实在Netty
中供给了一个名为IdleStateHandler
的类,它可以对一个通道上的读、写、读/写操作设置定时器,其间首要供给了三种类型的心跳检测:
// 当一个Channel(Socket)在指定时刻后未触发读事情,会触发这个事情
public static final IdleStateEvent READER_IDLE_STATE_EVENT;
// 当一个Channel(Socket)在指定时刻后未触发写事情,会触发这个事情
public static final IdleStateEvent WRITER_IDLE_STATE_EVENT;
// 上述读、写等候事情的结合体
public static final IdleStateEvent ALL_IDLE_STATE_EVENT;
在Netty
中,当一个已树立衔接的通道,超出指定时刻后还没有呈现数据交互,对应的Channel
就会进入搁置Idle
状况,依据不同的Socket/Channel
事情,会进入不同的搁置状况,而不同的搁置状况又会触发不同的搁置事情,也便是上述说到的三种搁置事情,在Netty
顶用IdleStateEvent
事情类来表明。
OK,正是由于
Netty
供给了IdleStateEvent
搁置事情类,所以我们可以依据它来完成心跳机制,但这儿还需求用到《Netty入门篇-入站处理器》中聊到的一个办法:userEventTriggered()
,这个钩子办法,会在通道触发恣意事情后被调用,这也就意味着:只要通道上触发了事情,都会触发该办法执行,搁置事情也不破例!
有了IdleState、userEventTriggered()
这两个根底后,我们就可依据这两个玩意儿,去完成一个简略的心跳机制,最根本的功用完成如下:
- 客户端:在搁置必定时刻后,可以自动给服务端发送心跳包。
- 服务端:可以自动检测到未发送数据包的搁置衔接,并中止衔接。
2.2.2、带有心跳机制的客户端完成
上述这两点功用完成起来并不难,我们首要写一下客户端的完成,如下:
// 心跳机制的客户端处理器
public class HeartbeatClientHandler extends ChannelInboundHandlerAdapter {
// 通用的心跳包数据
private static final ByteBuf HEARTBEAT_DATA =
Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("I am Alive", CharsetUtil.UTF_8));
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
// 假如当时触发的事情是搁置事情
if (event instanceof IdleStateEvent) {
IdleStateEvent idleEvent = (IdleStateEvent) event;
// 假如当时通道触发了写搁置事情
if (idleEvent.state() == IdleState.WRITER_IDLE){
// 表明当时客户端有一段时刻未向服务端发送数据了,
// 为了防止服务端封闭当时衔接,手动发送一个心跳包
ctx.channel().writeAndFlush(HEARTBEAT_DATA.duplicate());
System.out.println("成功向服务端发送心跳包....");
} else {
super.userEventTriggered(ctx, event);
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("正在与服务端树立衔接....");
// 树立衔接成功之后,先向服务端发送一条数据
ctx.channel().writeAndFlush("我是会发心跳包的客户端-A!");
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("服务端自动封闭了衔接....");
super.channelInactive(ctx);
}
}
由于要凭借userEventTriggered()
办法来完成事情监听,所以我们需求定义一个类承继入站处理器,接着在其间做了一个判别,假如当时触发了IdleStateEvent
搁置事情,这也就意味着现在没有向服务端发送数据了,因而需求发送一个心跳包,奉告服务端自己还活着,接着需求将这个处理器加在客户端上面,如下:
// 演示心跳机制的客户端(会发送心跳包)
public class ClientA {
public static void main(String[] args) {
EventLoopGroup worker = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(worker);
client.channel(NioSocketChannel.class);
// 打开长衔接装备
client.option(ChannelOption.SO_KEEPALIVE, true);
// 指定一个自定义的初始化器
client.handler(new ClientInitializer());
client.connect("127.0.0.1", 8888).sync();
} catch (Exception e){
e.printStackTrace();
}
}
}
// 客户端的初始化器
public class ClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 装备假如3s内未触发写事情,就会触发写搁置事情
pipeline.addLast("IdleStateHandler",
new IdleStateHandler(0,3,0,TimeUnit.SECONDS));
pipeline.addLast("Encoder",new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast("Decoder",new StringDecoder(CharsetUtil.UTF_8));
// 装载自定义的客户端心跳处理器
pipeline.addLast("HeartbeatHandler",new HeartbeatClientHandler());
}
}
客户端的代码根本上和之前的事例差异不大,要点看ClientInitializer
这个初始化器,里边首要加入了一个IdleStateHandler
,参数为0、3、0
,单位是秒,这是啥意思呢?点进源码看看结构函数,如下:
public IdleStateHandler(long readerIdleTime,
long writerIdleTime,
long allIdleTime,
TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
没错,其实赋值的三个参数,也就别离对应着读操作的搁置事情、写操作的搁置事情、读写操作的搁置事情,假如赋值为0
,表明这些搁置事情不需求关怀,在前面的赋值中,第二个参数writerIdleTime
被我们赋值成了3
,这表明假如客户端通道在三秒内,未触发写事情,就会触发写搁置事情,而后会调用HeartbeatClientHandler.userEventTriggered()
办法,然后向服务端发送一个心跳包。
2.2.3、带有心跳机制的服务端完成
接着再来看看服务端的代码完成,相同需求有一个心跳处理器,如下:
// 心跳机制的服务端处理器
public class HeartbeatServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
// 假如当时触发的事情是搁置事情
if (event instanceof IdleStateEvent) {
IdleStateEvent idleEvent = (IdleStateEvent) event;
// 假如对应的Channel通道触发了读搁置事情
if (idleEvent.state() == IdleState.READER_IDLE){
// 表明对应的客户端没有发送心跳包,则封闭对应的网络衔接
// (心跳包也是一种特别的数据,会触发读事情,有心跳就不会进这步)
ctx.channel().close();
System.out.println("封闭了未发送心跳包的衔接....");
} else {
super.userEventTriggered(ctx, event);
}
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 假如收到的是心跳包,则给客户端做出一个回复
if ("I am Alive".equals(msg)){
ctx.channel().writeAndFlush("I know");
}
System.out.println("收到客户端音讯:" + msg);
super.channelRead(ctx, msg);
}
}
在Server
端的心跳处理器中,相同监听了搁置事情,但这儿监听的是读搁置事情,由于一个通道假如长时刻没有触发读事情,这表明对应的客户端现已很长事情没有发数据了,所以需求封闭对应的客户端衔接。
有小伙伴或许会疑惑:为什么一个客户端通道长时刻未发送数据就需求封闭衔接呀?这不是违背了长衔接的初衷吗?答案并非如此,由于前面在我们的客户端中,在通道长时刻未触发写事情的状况下,会自意向服务端发送心跳包,而心跳包也是一种特别的数据包,仍旧会触发服务端上的读事情,所以凡是正常发送心跳包的衔接,都不会被服务端自动封闭。
OK,接着来看看服务端的完成,其实和前面的客户端差不多:
// 演示心跳机制的服务端
public class Server {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server.group(group);
server.channel(NioServerSocketChannel.class);
// 在这儿敞开了长衔接装备,以及装备了自定义的初始化器
server.childOption(ChannelOption.SO_KEEPALIVE, true);
server.childHandler(new ServerInitializer());
server.bind("127.0.0.1",8888);
}
}
// 服务端的初始化器
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 装备假如5s内未触发读事情,就会触发读搁置事情
pipeline.addLast("IdleStateHandler",
new IdleStateHandler(5,0,0,TimeUnit.SECONDS));
pipeline.addLast("Encoder",new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast("Decoder",new StringDecoder(CharsetUtil.UTF_8));
// 装载自定义的服务端心跳处理器
pipeline.addLast("HeartbeatHandler",new HeartbeatServerHandler());
}
}
要点留意看:在服务端装备的是读搁置事情,假如在5s
内未触发读事情,就会触发对应通道的读搁置事情,但这儿是5s
,为何不装备成客户端的3s
呢?由于假如两头的搁置超时时刻装备成相同,就会形成客户端正在发心跳包、服务端正在封闭衔接的这种状况呈现,终究导致心跳机制无法正常作业,关于这点我们也可以自行演示。
2.2.4、普通的客户端完成
终究,为了方便观看作用,这儿我们再创立一个不会发送心跳包的客户端B
,相同打开它的长衔接选项,然后来比照测验作用,如下:
// 演示心跳机制的客户端(不会发送心跳包)
public class ClientB {
public static void main(String[] args) {
EventLoopGroup worker = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(worker);
client.channel(NioSocketChannel.class);
client.option(ChannelOption.SO_KEEPALIVE, true);
client.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel)
throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("Encoder",new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast("Decoder",new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
// 树立衔接成功之后,先向服务端发送一条数据
ctx.channel().writeAndFlush("我是不会发心跳包的客户端-B!");
}
@Override
public void channelInactive(ChannelHandlerContext ctx)
throws Exception {
System.out.println("由于没发送心跳包,俺被开除啦!");
// 当通道被封闭时,中止前面发动的线程池
worker.shutdownGracefully();
}
});
}
});
client.connect("127.0.0.1", 8888).sync();
} catch (Exception e){
e.printStackTrace();
}
}
}
上述这段代码中,仅构建出了一个最根本的客户端,其间首要干了两件事情:
- ①在衔接树立成功之后,先向服务端发送一条数据。
- ②在衔接(通道)被封闭时,输出一句“俺被开除啦!”的信息,并高雅中止线程池。
除此之外,该客户端并未装载自己完成的客户端心跳处理器,这也就意味着:客户端B
并不会自动给服务端发送心跳包。
2.2.5、Netty心跳机制测验
接着别离发动服务端、客户端A
、客户端B
,然后查看操控台的日志,如下:
从上图的运转成果来看,在三方发动之后,整体进程如下:
-
ClientA
:先与服务端树立衔接,并且在树立衔接之后发送一条数据,后续持续发送心跳包。 -
ClientB
:先与服务端树立衔接,然后在树立衔接成功后发送一条数据,后续不会再发数据。 -
Server
:与ClientA、B
坚持衔接,然后定时检测搁置衔接,封闭未发送心跳包的衔接。
在上述这个进程中,由于ClientB
树立衔接后,未自意向服务端发送心跳包,所以在一段时刻之后,服务端自动将ClientB
的衔接(通道)封闭了,有人会问:明明ClientB
还活着呀,这样做合理吗?
其实这个问题是合理的,由于这儿仅仅模仿线上环境测验,所以
ClientB
没有自动发送数据包,但在线上环境,每个客户端都会定时向服务端发送心跳包,都会为每个客户端装备心跳处理器。在都装备了心跳处理器的状况下,假如一个客户端长时刻没发送心跳包,这意味着这个客户端十有八九凉凉了,所以天然需求将其封闭,防止这类“废弃衔接”占用服务端资源。
不过上述的心跳机制仅完成了最根底的版别,还未完全将其完善,但我这儿就不持续往下完成了,究竟主干现已搭建好了,剩下的仅仅一些细枝末节,我这儿提几点完善思路:
- ①在检测到某个客户端未发送心跳包的状况下,服务端应当自动再建议一个勘探包,二次承认客户端是否真的挂了,这样做的好处在于:可以有用防止网络抖动形成的“客户端假死”现象。
- ②客户端、服务端之间交互的数据包,应当选用一致的格式进行封装,也便是都恪守同一规范包装数据,例如
{msgType:"Heartbeat", msgContent:"...", ...}
。 - ③在客户端被封闭的状况下,凡是不是由于物理因素,如机房断电、网线被拔、机器宕机等状况形成的客户端下线,客户端都必须具有断线重连功用。
将上述三条完善后,才可以被称为是一套相对健全的心跳检测机制,所以我们感兴趣的状况下,可依据前面给出的源码接着完成~
三、Netty进阶篇总结
在这章节中,我们一点一滴的将粘包、半包、解码器、长衔接、心跳机制等一些进阶技能做了具体论述,其实本来计划将这章内容也写在《Netty入门篇》中的,但由于篇幅过长,所以不得不将其拆出来写,看完这篇中止,也就意味着我们对Netty
这个结构有了根本认知。
但实践网络使用、中心件、
RPC
、根底架构等开发进程中,想要用Netty
也并非易事,所以在接下来的章节中,会依据Netty
解说多个实战事例,然后让诸位真正可以将Netty
这个技能栈纳入囊中!