本文正在参加「金石计划 . 分割6万现金大奖」
引言
现如今的开发环境中,分布式/微服务架构大行其道,而分布式/微服务的根基在于网络编程,而Netty
恰恰是Java网络编程范畴的无冕之王。Netty
这个结构信任我们定然听说过,其在Java网络编程中的地位,比方JavaEE
中的Spring
。
当然,这样去聊它我们或许无法实践感触出它的重要性,那先来看看依据Netty
构建的运用:
调查上述列出的开源组件,一眼望去简直满是各个范畴中大名鼎鼎的结构,而这些组件都是依据Netty
构建的,包含中间件、大数据、离线核算、分布式、RPC、No-SQL
等各个方向….,很显着的可感知出Netty
的地位之高,因而假如要打造一款Java
高功用的网络通讯程序、想要真实熟知分布式架构的底层原理,Netty
成为了每个Java
开发进阶必需求掌握的中心技能之一。
Netty
的重要性不言而知,但网上相关的大部分视频、文章、书本等材料却形形色色,很难真实协助我们构建出一套完整的体系,本文的意图便是带诸位走入依据Netty
的网络世界,在真实意义上为诸君构建一套Netty
的常识储备。
本文会先从概念开端,到根底入门、中心组件顺次翻开,后续结合多个实战事例全面详解
Netty
的运用。但在学习之前,我们最好有Java-IO
体系、多路复用模型等相关常识的储备,如若未曾具有请先移步:《Java-IO机制全解》、《多路复用模型剖析》两文,前者是有必要,后者则暂时无需掌握,由于在后续的《Netty
源码篇》中才会触及。
一、初识Netty的根底概念与快速入门
留意看:上图中右边这位黑眼圈堪比熊猫眼的哥们,从他头顶的发量就能显着感触出其技能强度,他!!!周围的这位才是Netty
结构的原作者Trustin Lee
(韩国人),一起他也是另一个著名网络结构Mina
的中心主程之一,现任职于Apple
苹果集团……,不过多介绍作者了,总归是一位网络方面的大牛。
要点来聊聊我们的主角:Netty
结构,其实这个结构是依据Java原生NIO
技能的进一步封装,在其间对Java-NIO
技能做了进一步增强,作者充分结合了Reactor
线程模型,将Netty
变为了一个依据异步事情驱动的网络结构,Netty
从诞生至今共发布了五个大版别,但现在最常用的反而并非是最新的5.x
系列,而是4.x
系列的版别,原因在于Netty
自身便是依据Java-NIO
封装的,而JDK
自身又很安稳,再加上5.x
版别并未有太大的功用差异,因而4.x
系列才是干流。
再回过头来考虑一个问题:为什么Netty
要二次封装原生NIO
呢?信任看过NIO
源码的小伙伴都清楚,原生的NIO
规划的特别繁琐,而且还存在一系列安全隐患,因而Netty
则是抱着简化NIO
、解决隐患、提高功用等意图而研发的。
不过有意思的一点在于:
Netty
尽管是依据Java-NIO
封装的结构,但实践运用起来却跟之前聊到的Java-AIO(NIO2)
技能有些相似。
1.1、Netty的入门实例
上面扯了不少Netty
的概念,现在就直接先实操一番快速入门,终究编程讲究施展出真理,首要第一步则是增加对应的依靠,如下:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.43.Final</version>
</dependency>
然后先创立NettyServer
服务端,代码如下:
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
// 创立两个EventLoopGroup,boss:处理衔接事情,worker处理I/O事情
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
// 创立一个ServerBootstrap服务端(同之前的ServerSocket相似)
ServerBootstrap server = new ServerBootstrap();
try {
// 将前面创立的两个EventLoopGroup绑定在server上
server.group(boss,worker)
// 指定服务端的通道为Nio类型
.channel(NioServerSocketChannel.class)
// 为到来的客户端Socket增加处理器
.childHandler(new ChannelInitializer<NioSocketChannel>() {
// 这个只会履行一次(首要是用于增加更多的处理器)
@Override
protected void initChannel(NioSocketChannel ch) {
// 增加一个字符解码处理器:对客户端的数据解码
ch.pipeline().addLast(
new StringDecoder(CharsetUtil.UTF_8));
// 增加一个入站处理器,对收到的数据进行处理
ch.pipeline().addLast(
new SimpleChannelInboundHandler<String>() {
// 读取事情的回调办法
@Override
protected void channelRead0(ChannelHandlerContext
ctx,String msg) {
System.out.println("收到客户端信息:" + msg);
}
});
}
});
// 为当时服务端绑定IP与端口地址(sync是同步堵塞至衔接成功中止)
ChannelFuture cf = server.bind("127.0.0.1",8888).sync();
// 封闭服务端的办法(之后不会在这儿封闭)
cf.channel().closeFuture().sync();
}finally {
// 优雅中止之前创立的两个Group
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
紧接着再构建一个NettyClient
客户端,代码如下:
public class NettyClient {
public static void main(String[] args) {
// 由于无需处理衔接事情,所以只需求创立一个EventLoopGroup
EventLoopGroup worker = new NioEventLoopGroup();
// 创立一个客户端(同之前的Socket、SocketChannel)
Bootstrap client = new Bootstrap();
try {
client.group(worker)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc)
throws Exception {
// 增加一个编码处理器,对数据编码为UTF-8格局
sc.pipeline().addLast(new
StringEncoder(CharsetUtil.UTF_8));
}
});
// 与指定的地址树立衔接
ChannelFuture cf = client.connect("127.0.0.1", 8888).sync();
// 树立衔接成功后,向服务端发送数据
System.out.println("正在向服务端发送信息......");
cf.channel().writeAndFlush("我是<竹子爱熊猫>!");
} catch (Exception e){
e.printStackTrace();
} finally {
worker.shutdownGracefully();
}
}
}
先看运转成果吧,操控台输出如下:
NettyServer操控台输出:
收到客户端信息:我是<竹子爱熊猫>!
NettyClient操控台输出:
正在向服务端发送信息......
从成果中很简略看出这个事例中做了什么事情,其实无非便是利用Netty
完结了简略的对端通讯,完结的功用很简略,但关于未学习过Netty
技能的小伙伴,在代码方面估量有少许懵,那么接下来简略的解说一下代码。
但在此之前先声明一点:
Netty
是支撑链式编程的一个结构,也便是如上述中的代码调用,一切的办法都能够一直用.
连下去,所以在Netty
的运用中会见到许多的这类写法。
上述事例的代码,说杂乱呢也其实并不难,信任认真看完了之前《Java-IO篇》的小伙伴多少都能看懂代码,不了解的当地估量就在于其间呈现的几个新的概念:EventLoopGroup、ServerBootstrap、childHandler
,我们先关于这些概念做简略解说,后续会要点剖析:
-
EventLoopGroup
:能够了解成之前的Selector
挑选器,但结合了线程池(后续详细剖析)。 -
ServerBootstrap/Bootstrap
:相似于之前的ServerSocketChannel/SocketChannel
。 -
childHandler
:这个是新概念,能够了解成过滤器,在之前的Servlet
编程中,新恳求到来都会通过一个个的过滤器,而这个处理器也相似于之前的过滤器,新衔接到来时,也会通过增加好的一系列处理器。
OK~,关于上述几个新概念有了简略认知后,接着把上面事例的完整流程剖析一下:
- ①先创立两个
EventLoopGroup
事情组,然后创立一个ServerBootstrap
服务端。 - ②将创立的两个事情组
boss、worker
绑定在服务端上,并指定服务端通道为NIO
类型。 - ③在
server
上增加处理器,对新到来的Socket
衔接进行处理,在这儿首要分为两类:-
ChannelInitializer
:衔接到来时履行,首要是用于增加更多的处理器(只触发一次)。 -
addLast()
:通过该办法增加的处理器不会立马履行,而是依据处理器类型择机履行。
-
- ④为创立好的服务端绑定
IP
及端口号,调用sync()
意思是堵塞至绑定成功中止。 - ⑤再创立一个
EventLoopGroup
事情组,并创立一个Bootstrap
客户端。 - ⑥将事情组绑定在客户端上,由于无需处理衔接事情,所以只需求一个事情组。
- ⑦指定
Channel
通道类型为NIO
、增加处理器…..(同服务端相似) - ⑧与前面服务端绑定的地址树立衔接,由于默许是异步的,也要调用
sync()
堵塞。 - ⑨树立衔接后,客户端将数据写入到通道预备发送,首要会先通过增加好的编码处理器,将数据的格局设为
UTF-8
。 - ⑩服务器收到数据后,会先通过解码处理器,然后再去到入站处理,履行对应的
Read()
办法逻辑。 - ⑪客户端完结数据发送后,先封闭通道,再优雅封闭创立好的事情组。
- ⑫同理,服务端作业完结后,先封闭通道再中止事情组。
结合上述的流程,再去看一遍给出的事例源码,信任诸位应该能够彻底了解。不过需求留意的一点是:Netty
的大部分操作都是异步的,比方地址绑定、客户端衔接等。比方调用connect()
办法与服务端树立衔接时,主线程会把这个作业交给事情组中的线程去完结,所以此刻假如主线程直接去向通道中写入数据,有几率会呈现报错,由于实践生产环境中,或许由于网络推迟导致衔接树立的时刻有些长,此刻通道并未树立成功,因而测验发送数据时就会有问题,这点与之前的Java-AIO
通讯事例中,客户端树立衔接要调用.get()
办法是同理。
到这儿,你对
Netty
结构现已入门了,接着我们要点聊聊Netty
中的一些中心组件。
二、Netty结构中心组件:发动器与事情组
关于Netty
有了根本的认知后,接下来渐渐的了解这个结构吧,先顺次来看看其间的一些中心组件,了解这些组件及作用后,才干真实意义上的“玩转Netty
”。
2.1、发动器-ServerBootstrap、Bootstrap
ServerBootstrap、Bootstrap
这两个组件应该无需过多解说,上个表格比照我们就了解了:
比照项 | 服务端 | 客户端 |
---|---|---|
BIO |
ServerSocket |
Socket |
NIO |
ServerSocketChannel |
SocketChannel |
AIO |
AsynchronousServerSocketChannel |
AsynchronousSocketChannel |
Netty |
ServerBootstrap |
Bootstrap |
从上表中能显着感觉出它俩在Netty
中的作用,无非便是服务端与客户端换了个叫法罢了。
2.2、事情组-EventLoopGroup、EventLoop
这两个东西比较重要,但一起也比较笼统,EventLoop
这东西翻译过来便是事情循环的意思,你能够把它了解成NIO
中的Selector
挑选器,实践它实质上便是这玩意儿,由于内部会保护一个Selector
,然后由一条线程会循环处理Channel
通道上发生的一切事情,所以每个EventLoop
目标都能够当作一个单线程履行器。
EventLoopGroup
能够将其了解成AIO
中的AsynchronousChannelGroup
或许会更适宜,在AIO
的ACG
(前面那玩意儿的缩写)中,我们需求手动指定一个线程池,然后AIO
的一切客户端作业都会运用线程池中的线程进行处理,而Netty
中的EventLoopGroup
就相似于AIO-ACG
这玩意儿,只不过不需求我们处理线程池了,而是Netty
内部保护。
对
EventLoopGroup、EventLoop
有了根本认知后,你再点进它们的源码完结,其实能够观测到:其实它们承继了两个类,一个是Netty
自己完结的有序线程池OrderedEventExecutor
类,另一个则JDK
供给的原生守时调度线程池ScheduledExecutorService
类(源码篇会详细剖析,这儿先简略了解)。
看过之前关于《JDK线程池》文章的小伙伴应该清楚,已然EventLoop/EventLoopGroup
承继自JDK
原生的守时线程池,那也就代表着:它具有JDK
线程池中一切供给的办法,一起也应该会支撑履行异步使命、守时使命的功用。那么实践情况是这样吗?答案是Yes
,如下:
public static void main(String[] args) {
EventLoopGroup threadPool = new NioEventLoopGroup();
// 递送Runnable类型的一般异步使命
threadPool.execute(()->{
System.out.println("execute()办法提交的使命....");
});
// 递送Callable类型的有回来异步使命
threadPool.submit(() -> {
System.out.println("submit()办法提交的使命....");
return "我是履行成果噢!";
});
// 递送Callable类型的延时调度使命
threadPool.schedule(()->{
System.out.println("schedule()办法提交的使命,三秒后履行....");
return "调度履行后我会回来噢!";
},3,TimeUnit.SECONDS);
// 递送Runnable类型的推迟距离调度使命
threadPool.scheduleAtFixedRate(()->{
System.out.println("scheduleAtFixedRate()办法提交的使命....");
},3,1,TimeUnit.SECONDS);
}
/* ~~~~~~~~~~~~~~~~~~我是性感的分割线~~~~~~~~~~~~~~~~~~ */
履行成果如下:
当即履行:
execute()办法提交的使命....
submit()办法提交的使命....
延时三秒后履行:
schedule()办法提交的使命....
scheduleAtFixedRate()办法提交的使命....
之后没距离一秒履行:
scheduleAtFixedRate()办法提交的使命....
scheduleAtFixedRate()办法提交的使命....
上述我们创立了一个EventLoopGroup
事情循环组,然后通过之前JDK
线程池供给的一系列的提交使命的办法,向其递送了几个异步使命,然后运转该程序,答案清楚明了,EventLoopGroup
的确能够当做JDK
原生线程池来运用。
当然,这些并非剖析的要点,要点来看看
EventLoopGroup
如安在Netty
中合理运用。
在了解它们的Netty
用法之前,先来看看除原生线程池之外所供给的办法:
-
EventLoop.inEventLoop(Thread)
:判别一个线程是否归于当时EventLoop
。 -
EventLoop.parent()
:判别当时EventLoop
归于哪一个事情循环组。 -
EventLoopGroup.next()
:获取当时事情组中的下一个EventLoop
(线程)。
这些办法我们简略了解即可,由于大多数情况下在Netty
源码中才会用到,暂时无需重视太多,我们先把目光移到前面给出的Netty
运用事例中,还记住最开端界说的两个事情组吗?
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
为什么在服务端要界说两个组呢?一个难道不行吗?其实也是能够的,但界说两个组的好处在于:能够让Group
中的每个EventLoop
分工愈加清晰,不同的Group
别离处理不同类型的事情,各司其职。
在前面事例中,为服务端绑定了两个事情循环组,也就代表着会依据
ServerSocketChannel
上触发的不同事情,将对应的作业分发到这两个Group
中处理,其间boss
首要担任客户端的衔接事情,而worker
大多数情况下担任处理客户端的IO
读写事情。
当客户端的SocketChannel
衔接到来时,首要会将这个注册事情的作业交给boss
处理,boss
会调用worker.register()
办法,将这条客户端衔接注册到worker
作业组中的一个EventLoop
上。前面提到过:EventLoop
内部会保护一个Selector
挑选器,因而实践上也便是将客户端通道注册到其内部中的挑选器上。
留意:将一个
Socket
衔接注册到一个EventLoop
上之后,这个客户端衔接则会和这个EventLoop
绑定,以后这条通道上发生的一切事情,都会交由这个EventLoop
处理。
到这儿我们应该也了解了为何要拆出两个EventLoopGroup
,首要意图就在于分工更为明细。当然,由于EventLoopGroup
实质上能够了解成一个线程池,其间存在的线程资源天然是有限的,那此刻假如到来的客户端衔接大于线程数量怎么办呢?这是不影响的,由于Netty
自身是依据Java-NIO
封装的,而NIO
底层又是依据多路复用模型完结的,天然生成就能完结一条线程处理多个衔接的功用,所以就算衔接数大于线程数,也彻底能够Hold
住。
OK~,除开能够依据事情类型区别Group
之外,也能够依据为每个处理器区别不同的事情组,如下:
// 创立EventLoopGroup和JDK原生的线程池相同,能够指定线程数量
EventLoopGroup extra = new NioEventLoopGroup(2);
sc.pipeline().addLast(extra, new xxxChannelHandler());
这样做的好处在于什么呢?由于前面提到过:一个衔接注册到EventLoop
,之后一切的作业都会由这个EventLoop
处理,而一个EventLoop
又有或许一起处理多个衔接,因而假设一条衔接上的某个处理器,履行进程非常耗时,此刻必然就会影响到这个EventLoop
处理的其他衔接,因而关于一些较为耗时的Handler
,能够专门指派给一个额定的extra
事情组处理,这样就不会影响到所处理的其他衔接。
当然,这个功用其实也略微有些鸡肋,一般多个
Handler
之间都会存在耦合联系,下一个Handler
需求依靠上一个Handler
的处理成果履行,因而也很难拆出来独自放到另一个事情组中履行。
看到这儿,信任你关于EventLoopGroup、EventLoop
这两个组件应该有了根本认知,简略来说能够EventLoop
了解成有一条线程专门保护的Selector
挑选器,而EventLoopGroup
则能够了解成一个有序的守时调度线程池,担任处理一切的EventLoop
。举个生活事例来加深形象:
现在有个工厂,其间分为了不同的片区,一个片区中有许多条流水线,由每个工人担任一部分流水线的作业。开端作业后,流水线的传输带会源源不断的将货品传递过来,这些货品终究会等候工人进行加工。
在上述这个比方中,工厂便是ServerBootstrap
服务端,而一个个片区便是不同的EventLoopGroup
事情组,一条流水线则能够了解成一个SocketChannel
客户端通道,而担任多条流水线的工人便是EventLoop
单线程履行器,加工的动作其实便是处理通道上发生的事情。
我们能够将这个比方套进去想象一下,信任这会让你形象愈加深刻。
三、Netty中的增强版通道(ChannelFuture)
关于通道这个概念,信任诸位都不生疏,这也是Java-NIO、AIO
中的中心组件之一,而在Netty
中也对其做了增强和拓宽。首要来看看通道类型,Netty
依据不同的多路复用函数,别离拓宽出了不同的通道类型:
-
NioServerSocketChannel
:通用的NIO
通道模型,也是Netty
的默许通道。 -
EpollServerSocketChannel
:对应Linux
体系下的epoll
多路复用函数。 -
KQueueServerSocketChannel
:对应Mac
体系下的kqueue
多路复用函数。 -
OioServerSocketChannel
:对应本来的BIO
模型,用的较少,一般用原生的。
当然,关于客户端的通道也能够挑选TCP、UDP...
类型的,就不再介绍了,要点来看看Netty
中是怎么关于通道类做的增强。
其实在
Netty
中,首要结合了JDK
供给的Future
接口,对通道类做了进一步增强。
增强的方面首要是支撑了异步,但并非Future
那种伪异步,而是跟之前聊到过的《CompletableFuture》有些相似,支撑异步回调处理成果。还记住之前客户端怎么衔接服务端的嘛?如下:
Bootstrap client = new Bootstrap();
client.connect("127.0.0.1", 8888);
但这个connect()
衔接办法,实质上是一个异步办法,回来的并不是Channel
目标,而是一个ChannelFuture
目标,如下:
public ChannelFuture connect(String inetHost, int inetPort);
也包含ServerBootstrap
绑定地址的bind()
也相同,回来的并非ServerChannel
,也是一个ChannelFuture
目标。这是由于在Netty
的机制中,绑定/衔接作业都是异步的,因而假如要用Netty
创立一个客户端衔接,为了保证衔接树立成功后再操作,一般情况下都会再调用.sync()
办法同步堵塞,直到衔接树立成功后再运用通道写入数据,如下:
// 与服务端树立衔接
ChannelFuture cf = client.connect("127.0.0.1", 8888);
// 同步堵塞至衔接树立成功中止
cf.sync();
// 衔接树立成功后再获取对应的Socket通道写入数据
cf.channel().writeAndFlush("...");
上述这种办法能够保证衔接树立成功后再写数据,但已然Netty
中的绑定、衔接等这些操作都是异步的,有没有办法让整个进程都是异步的呢?
答案是当然有,怎么操作呢?
我们能够向ChannelFuture
中增加回调处理器,然后异步处理,如下:
ChannelFuture cf = client.connect("127.0.0.1", 8888);
cf.addListener((ChannelFutureListener) cfl -> {
// 这儿能够用cf,也能够用cfl,回来的都是同一个channel通道
cf.channel().writeAndFlush("...");
});
当通过connect()
办法与服务端树立衔接时,Netty
会将这个使命交给当时Bootstrap
绑定的EventLoopGroup
中的线程履行,因而树立衔接的进程是异步的,所以会返还一个ChannelFuture
目标给我们,而此刻能够通过该目标的addListener()
办法编写成功回调逻辑,当衔接树立成功后,会由对应的线程来履行其间的代码,因而能够完结全进程的异步操作。
这样做,好像的确完结了整个进程的异步,甚至封闭通道的进程也能够换成异步的,如下:
// 异步封闭Channel通道
ChannelFuture closeCF = cf.channel().closeFuture();
// 通道封闭后,增加对应的回调函数
closeCF.addListener((ChannelFutureListener) cfl -> {
// 封闭前面创立的EventLoopGroup事情组,也能够在这儿做其他善后作业
worker.shutdownGracefully();
});
那Netty
中为何要将许多的操作都笼统成异步履行呢?这不是反而让逻辑愈加杂乱化吗?让建议衔接、树立衔接、发送数据、接纳数据、封闭衔接等一系列操作,悉数交由调用的那条线程履行不能够吗?答案是能够的,但异步能在必定程度上提高功用,尤其是并发越高,带来的优势更为显着。
关于这段话我们估量会有疑问,为什么能提高功用呢?下面举个比方了解。
3.1、为何Netty一切API都是异步式操作?
信任我们必定在生活中见过这样的场景:医院治病/体检、银行开户、政府就事、法院申述、保险公司买保险等等,各类处理事务的当地,都会拿号处理,然后通过一个个的窗口处理不同的事务,那为什么要这么做呢?就拿常规的医院治病来说,为什么会分为如下进程呢?
- 导诊处:先阐明大致情况,导诊人员依据你的病理,辅导你挂什么科的号。
- 挂号处:去到对应的病理科排队挂号(暂时不考虑缴费,假设网上缴挂号费)。
- 确诊室:跟着挂的号找到对应的科室,医师依据你的情况进行确诊。
- 化验处:从你身上提取一些标本,然后去到化验处等候化验成果。
- 缴费处:医师依据化验成果剖析病情,然后给出详细的医治计划,让你来缴费。
- 拿药/医治处:交完相关的费用后,依据医治计划进行拿药/医治等处理措施。
有上述这些进程实践上并不古怪,问题是在于每个进程都分为了专门的科室处理,因而以上述流程为例,至少需求有六个医师供给服务,那么为什么不专门由这六位医师专门供给全系列服务呢?如下:
我们剖析一下,假设此刻每个进程平均要五分钟,一个患者的完整流程下来就需求半小时,而下一批预约治病的其他患者,则需求等候半小时后才干被受理,而把这些进程拆开之后再来看看:
此刻有六位医师各司其职,每位医师担任单一的作业,这样做的好处在于:每个挂号的患者只需求等候五分钟,就能够被受理,通过这种办法就将之前批次式治病,转变为了流水线式治病。
而
Netty
结构中的异步处理办法,也具有异曲同工之妙,将API
的操作从批处理转变成了流式处理。套入实践的事务中,也便是主线程(调用API
的线程)无需等候操作完结后再履行,而是调用某个API
后可持续往下履行,相较而言,在并发情况下能很大程度上提高程序功用。
但上述这个比方估量有些小伙伴照旧会犯模糊,那接着再举个愈加形象化的比方,比方快递小哥送货,假如以同步办法作业,将一个货品送达指定地址后,需求等候客户签收才干去送下个货品,这无疑会让下个客户等很久很久,而且也极端影响快递小哥的作业功率。
而选用异步办法作业,快递小哥将一个货品送达指定地址后,给对应客户发个信息后,就立马赶往下个客户的货品地址,前面的客户拿到货品后,再给快递小哥回个信息即可。在这种异步作业办法中,小哥无需在原地“堵塞”等候客户签收,只需求将手中一个个货品送达指定地址就行,这在很大程度上提高了全体作业功率,每个客户之间拿到货品的时刻也大大缩短了,
Netty
结构中的异步思想也是同理。
3.2、ChannelFuture、Netty-Future、JDK-Future的联系
当我们试图翻阅ChannelFuture
的完结时,会发现该类承继了Future
接口:
public interface ChannelFuture extends Future<Void> {
// 省掉内部办法.....
}
但要留意,这个Future
接口并非是JDK
原生的Future
接口,而是Netty
结构中的Future
接口:
package io.netty.util.concurrent;
public interface Future<V> extends java.util.concurrent.Future<V> {
// 省掉内部办法.....
}
此刻会发现,Netty-Future
又承继自JDK-Future
接口,这也就意味着Netty-Future
拓宽了JDK-Future
接口的功用,在之前《并发编程-异步使命》中,我们曾详细聊到过JDK
原生的Future
类,尽管依据Future+Callable
能够完结异步回调,但这种办法完结的异步回调则是一种“伪异步”,为啥呢?先来看看JDK-Future
供给的中心办法:
办法名 | 办法作用 |
---|---|
isDone() |
判别当时异步使命是否完毕 |
cancel() |
撤销当时异步使命 |
isCancel() |
判别当时异步使命是否被撤销 |
get() |
堵塞等候当时异步使命履行完结 |
在JDK-Future
接口中,想要获取一个异步使命的履行成果,此刻只能调用get()
办法,但该办法是一个堵塞办法,调用后会堵塞主线程直到使命完毕中止,这显着仍旧会导致异步变为同步履行,所以这种办法是一种“伪异步”,此刻再来看看Netty-Future
中增强的中心办法:
办法名 | 办法作用 |
---|---|
getNow() |
非堵塞式获取使命成果,使命未履行完结时回来null
|
sync() |
堵塞等候至异步使命履行完毕,履行犯错时会抛出反常 |
await() |
堵塞等候至异步使命履行完毕,履行犯错时不会抛出反常 |
isSuccess() |
判别使命是否履行成功,假如为true 代表履行成功 |
cause() |
获取使命履行犯错时的报错信息,假如履行未犯错,则回来null
|
addLinstener() |
增加回调办法,异步使命履行完结后会主动履行回调办法中的代码 |
在原生JDK-Future
的根底上,Netty-Future
新增了一个反常检测机制,当异步使命履行犯错时,能够通过cause()
办法处理反常,一起也依据回调办法,可通过addLinstener()
办法增加异步履行后的回调逻辑,然后让主线程创立使命后永久不会堵塞,做到了真实意义上的异步履行。
当然,除开根本的Future
接口外,Netty
结构中还有一个Promise
接口,该接口承继自Netty-Future
接口:
public interface Promise<V> extends Future<V> {
// 省掉内部办法.....
}
这个接口中首要多拓宽了两个办法:
办法名 | 办法作用 |
---|---|
setSuccess() |
设置使命的履行状况为成功 |
setFailure() |
设置使命的履行状况为失利 |
这两个办法能够用来设置异步使命的履行状况,因而Promise
接口除开具有Netty-Future
的功用外,还能作为多个线程之间传递异步使命成果的容器。
3.3、不同Future的作用测验
public class FutureDemo {
// 测验JDK-Future的办法
public static void jdkFuture() throws Exception {
System.out.println("--------JDK-Future测验--------");
// 创立一个JDK线程池用于履行异步使命
ExecutorService threadPool = Executors.newSingleThreadExecutor();
System.out.println("主线程:进程①");
// 向线程池提交一个带有回来值的Callable使命
java.util.concurrent.Future<String> task =
threadPool.submit(() ->
"我是JDK-Future使命.....");
// 输出获取到的使命履行成果(堵塞式获取)
System.out.println(task.get());
System.out.println("主线程:进程②");
// 封闭线程池
threadPool.shutdownNow();
}
// 测验Netty-Future的办法
public static void nettyFuture(){
System.out.println("--------Netty-Future测验--------");
// 创立一个Netty中的事情循环组(实质是线程池)
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
System.out.println("主线程:进程①");
// 向线程池中提交一个带有回来值的Callable使命
io.netty.util.concurrent.Future<String> task =
eventLoop.submit(() ->
"我是Netty-Future使命.....");
// 增加一个异步使命履行完结之后的回调办法
task.addListener(listenerTask ->
System.out.println(listenerTask.getNow()));
System.out.println("主线程:进程②");
// 封闭事情组(线程池)
group.shutdownGracefully();
}
// 测验Netty-Promise的办法
public static void nettyPromise() throws Exception {
System.out.println("--------Netty-Promise测验--------");
// 创立一个Netty中的事情循环组(实质是线程池)
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
// 主动创立一个传递异步使命成果的容器
DefaultPromise<String> promise = new DefaultPromise<>(eventLoop);
// 创立一条线程履行,往成果中增加数据
new Thread(() -> {
try {
// 主动抛出一个反常
int i = 100 / 0;
// 假如异步使命履行成功,向容器中增加数据
promise.setSuccess("我是Netty-Promise容器:履行成功!");
}catch (Throwable throwable){
// 假如使命履行失利,将反常信息放入容器中
promise.setFailure(throwable);
}
}).start();
// 输出容器中的使命成果
System.out.println(promise.get());
}
public static void main(String[] args) throws Exception {
jdkFuture();
nettyFuture();
nettyPromise();
}
}
在上述的测验类中,存在三个测验办法:
-
jdkFuture()
:测验JDK-Future
的办法。 -
nettyFuture()
:测验Netty-Future
的办法。 -
nettyPromise()
:测验Netty-Promise
的办法。
接着发动对应的类,来看看操控台的输出成果:
--------JDK-Future测验--------
主线程:进程①
我是JDK-Future使命.....
主线程:进程②
--------Netty-Future测验--------
主线程:进程①
主线程:进程②
我是Netty-Future使命.....
--------Netty-Promise测验--------
Exception in thread "main" java.util.concurrent.ExecutionException:
java.lang.ArithmeticException: / by zero
........
首要来比照一下JDK-Future、Netty-Future
两者之间的差别,在运用JDK-Future
时,想要获取异步使命的履行成果,调用get()
办法后会堵塞主线程,也便是主线程的进程②,需求等到异步使命履行完结后才会持续履行,因而输出成果为:
--------JDK-Future测验--------
主线程:进程①
我是JDK-Future使命.....
主线程:进程②
但此刻再来看看Netty-Future
,由于在内部我们提交异步使命后,就当即通过addListener()
增加了一个回调,这个回调办法会在异步使命履行完毕后调用,我们将获取使命成果的作业,放入到了回调办法中完结,此刻会观测到,获取Netty-Future
的履行成果并不会堵塞主线程:
--------Netty-Future测验--------
主线程:进程①
主线程:进程②
我是Netty-Future使命.....
而关于Netty-Promise
的运用就无需过多解说,也便是能够依据异步使命的履行状况,向Promise
目标中设置不同的成果,在前面的多线程中,由于主动制造了反常,所以终究会进入catch
代码块,履行setFailure()
向容器中填充反常信息。
四、中心组件 – 通道处理器(Handler)
Handler
可谓是整个Netty
结构中最为重要的一部分,它的责任首要是用于处理Channel
通道上的各种事情,一切的处理器都可被大体分为两类:
- 入站处理器:一般都是
ChannelInboundHandlerAdapter
以及它的子类完结。 - 出站处理器:一般都是
ChannelOutboundHandlerAdapter
以及它的子类完结。
在体系中网络操作都一般会分为入站和出站两种,所谓的入站便是指接纳恳求,反之,所谓的出站则是指回来呼应,而Netty
中的入站处理器,会在客户端音讯到来时被触发,而出站处理器则会在服务端回来数据时被触发,接着来翻开聊一聊。
4.1、入站处理器与出站处理器
前面讲了解了入站、出站的根本概念,接着来简略认识一下Netty
中的入站处理器,这儿先上个事例:
// 服务端
public class HandlerServer {
public static void main(String[] args) {
// 0.预备作业:创立一个事情循环组、一个ServerBootstrap服务端
EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server
// 1.绑定前面创立的事情循环组
.group(group)
// 2.声明通道类型为服务端NIO通道
.channel(NioServerSocketChannel.class)
// 3.通过ChannelInitializer完结通道的初始化作业
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nsc) throws Exception {
// 4.获取通道的ChannelPipeline处理器链表
ChannelPipeline pipeline = nsc.pipeline();
// 5.依据pipeline链表向通道上增参加站处理器
pipeline.addLast("In-①",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("俺是第一个入站处理器...");
super.channelRead(ctx, msg);
}
});
pipeline.addLast("In-②",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("我是第二个入站处理器...");
super.channelRead(ctx, msg);
}
});
pipeline.addLast("In-③",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("朕是第三个入站处理器...");
}
});
}
})
// 为当时发动的服务端绑定IP和端口地址
.bind("127.0.0.1",8888);
}
}
// 客户端
public class HandlerClient {
public static void main(String[] args) {
// 0.预备作业:创立一个事情循环组、一个Bootstrap发动器
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client
// 1.绑定事情循环组
.group(group)
// 2.声明通道类型为NIO客户端通道
.channel(NioSocketChannel.class)
// 3.初始化通道,增加一个UTF-8的编码器
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc)
throws Exception {
// 增加一个编码处理器,对数据编码为UTF-8格局
ChannelPipeline pipeline = sc.pipeline();
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
}
});
// 4.与指定的地址树立衔接
ChannelFuture cf = client.connect("127.0.0.1", 8888).sync();
// 5.树立衔接成功后,向服务端发送数据
System.out.println("正在向服务端发送信息......");
cf.channel().writeAndFlush("我是<竹子爱熊猫>!");
} catch (Exception e){
e.printStackTrace();
} finally {
// 6.终究封闭事情循环组
group.shutdownGracefully();
}
}
}
在上述事例的服务端代码中,发动服务端时为其增加了In-①、In-②、In-③
这三个入站处理器,接着编写了一个客户端,其内部首要是向服务端发送了一条数据,运转成果如下:
俺是In-①入站处理器...
我是In-②入站处理器...
朕是In-③入站处理器...
此刻我们调查成果会发现,入站处理器的履行次序,会依照增加的次序履行,两个过滤器之间,依靠super.channelRead(ctx, msg);
这行代码来完结向下调用的逻辑,这和之前Servlet
中的过滤器相差无几。
除开上述重写的channelRead()
办法外,入站处理器中还有许多其他办法能够重写,每个办法都对应着一种事情,会在不一起机下被触发,如下:
// 会在当时Channel通道注册到挑选器时触发(与EventLoop绑守时触发)
public void channelRegistered(ChannelHandlerContext ctx) ...
// 会在挑选器移除当时Channel通道时触发(与EventLoop解除绑守时触发)
public void channelUnregistered(ChannelHandlerContext ctx) ...
// 会在通道预备就绪后触发(Pipeline处理器增加完结、绑定EventLoop后触发)
public void channelActive(ChannelHandlerContext ctx) ...
// 会在通道封闭时触发
public void channelInactive(ChannelHandlerContext ctx) ...
// 会在收到客户端数据时触发(每当有数据时都会调用该办法,表明有数据可读)
public void channelRead(ChannelHandlerContext ctx, Object msg) ...
// 会在一次数据读取完结后触发
public void channelReadComplete(ChannelHandlerContext ctx) ...
// 当通道上的某个事情被触发时,这个办法会被调用
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) ...
// 当通道的可写状况发生改动时被调用(一般在发送缓冲区超出约束时调用)
public void channelWritabilityChanged(ChannelHandlerContext ctx) ...
// 当通道在读取进程中抛出反常时,当时办法会被触发调用
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) ...
接着再来看看出站处理器,这回依据上述事例做少许改造即可,也便是再通过pipeline.addLast()
办法多增加几个处理器,但处理器的类型为ChannelOutboundHandlerAdapter
,如下:
// 依据pipeline链表向通道上增加出站处理器
pipeline.addLast("Out-A",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
System.out.println("在下是Out-A出站处理器...");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("Out-B",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
System.out.println("不才是Out-B出站处理器...");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("Out-C",new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
System.out.println("寡人是Out-C出站处理器...");
super.write(ctx, msg, promise);
}
});
依据本来入站处理器的履行逻辑,是不是理论上履行次序为Out-A、Out-B、Out-C
?先看运转成果:
此刻调查成果可显着看到,在通道上增加的出站处理器压根没被触发呀,这是为何呢?这要说回前面聊到的出站概念,出站是指呼应进程,也意味着出站处理器是在服务端回来数据时被触发的,而事例中并未向客户端回来数据,显着就不会触发出站处理器,所以此刻我们在In-③
入站处理器中,多加几行代码:
// 利用通道向客户端回来数据
ByteBuf resultMsg = ctx.channel().alloc().buffer();
resultMsg.writeBytes("111".getBytes());
nsc.writeAndFlush(resultMsg);
此刻再运转事例,就会看到如下成果:
俺是In-①入站处理器...
我是In-②入站处理器...
朕是In-③入站处理器...
寡人是Out-C出站处理器...
不才是Out-B出站处理器...
在下是Out-A出站处理器...
此刻留意看,成果和意料的不同,呈现的次序并非Out-A、Out-B、Out-C
,而是Out-C、Out-B、Out-A
,这是啥原因呢?为什么与增加次序反过来了?这其实跟pipeline
处理器链表有关,等会儿再聊聊pipeline
这个概念,先来看看出站处理器中的其他办法:
// 当通道调用bind()办法时触发(当Channel绑定端口地址时被调用,一般用于客户端通道)
public void bind(...) ...
// 当通道调用connect()办法,衔接到长途节点/服务端时触发(一般也用于客户端通道)
public void connect(...) ...
// 当客户端通道调用disconnect()办法,与服务端断开衔接时触发
public void disconnect(...) ...
// 当客户端通道调用close()办法,封闭衔接时触发
public void close(...) ...
// 当通道与EventLoop解除绑守时触发
public void deregister(...) ...
// 当通道中读取屡次数据时被调用触发
public void read(...) ...
// 当通道中写入数据时触发
public void write(...) ...
// 当通道中的数据被Flush给对端节点时调用
public void flush(...) ...
关于出站/入站处理器的这些其他办法/事情,我们可依据事务的不同,挑选重写不同的办法,其间每个不同的办法,其触发时机也不同,因而能够在适当的方位重写办法,作为事务代码的切入点。
4.2、pipeline处理器链表
假如触摸Netty
结构的小伙伴应该对这玩意儿不生疏,假如没触摸过也无关紧要,其实它也并非是特别难明的概念,一个处理器被称为Handler
,而一个Handler
增加到一个通道上之后,则被称之为ChannelHandler
,而一个通道上的一切ChannelHandler
悉数衔接起来,则被称之为ChannelPipeline
处理器链表。
以上述给出的事例来说,其内部构成的ChannelPipeline
链表如下:
pipeline
实质上是一个双向链表,一起具有head、tail
头尾节点,每当调用pipeline.addLast()
办法增加一个处理器时,就会将处理器封装成一个节点,然后参加pipeline
链表中:
- 当接纳到客户端的数据时,
Netty
会从Head
节点开端顺次往后履行一切入站处理器。 - 而当服务端回来数据时,
Netty
会从Tail
节点开端顺次向前履行一切入站处理器。
了解上述进程后,我们应该就了解了之前出站处理器的履行次序,为何是Out-C、Out-B、Out-A
,由于出站处理器是以Tail
尾节点开端,向前顺次履行的原因造成的,那处理器的作用是干嘛的呢?举个比方我们就懂了。
这儿假设
Netty
的服务端是一个饲料加工厂,客户端则是质料供应商,衔接两者之间的通道就相当于一条条的流水线,而客户端发送的数据相当于质料。
在一条流水线上,玉米、豆粕、小麦….等质料不或许啥也不干,直接从头传到尾,假如质料想要加工成某款私聊,显着需求通过一道道工序,而处理器则是这一道道工序。
比方质料刚传进来时,首要要将其粉碎成颗粒,接着需求将其碾压成粉末,终究需求依照配方份额进行混合,才干构成按配方制成的饲料。在这个进程中,质料进入加工厂后,通过的一道道工序则能够被称为入站处理器。
而质料被加工成饲料后,想要对外出售,还需求先装入一个个的饲料袋,然后将饲料袋进行封口,终究印上生产日期与厂家,才干打包成终究的商用饲料对外出售。而该进程中的一道道工序,则可被了解成是一个个出站处理器。
在上述的比方中,一个加工厂的流水线上,存在着一道道工序,通过顺次处理后,能够将质料加工成终究商品。Netty
中亦是同理,关于客户端和服务端之间的数据,能够通过处理器,完结一系列中心处理,如转化编码格局、对数据进行序列化、对数据进行加/解密等操作。
4.3、自界说出/入站处理器
前面简略讲了解了一些关于Netty
处理器的常识,但实践开发进程中,为了更好的代码阅览性,以及代码的保护性,一般pipeline.addLast
并不会直接new
接口,而是自己界说处理器类,然后承继对应的父类,如下:
// 自界说的入站处理器
public class ZhuziHandler extends ChannelInboundHandlerAdapter {
public ZhuziHandler() {
super();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 在这儿面编写处理入站msg的中心代码.....
// (假如要自界说msg的处理逻辑,请记住去掉下面这行代码)
super.channelRead(ctx, msg);
}
}
关于入站处理器而言,首要重写其channelRead()
办法即可,该办法会在音讯入站时被调用,能够在其间完结对数据的杂乱处理,而自界说处理器完结后,想要让该处理器收效,请记住将其绑定到对应的通道上,如下:
pipeline.addLast("In-X", new ZhuziHandler());
与入站处理器相反的出站处理器亦是同理,只不过将父类完结换成ChannelInboundHandlerAdapter
,而且重写其write()
办法即可,这样一切音讯(数据)出站时,都会调用该办法。
终究,不只仅处理器能够独自抽出来完结,而且关于通道的初始化器,也能够独自抽出来完结,如下:
// 自界说的通道初始化器
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 设置编码器、解码器、处理器
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new ZhuziHandler());
}
}
这样写能够让代码的整洁性更强,而且能够统一处理通道上的一切出/入站处理器,而服务端的代码改成下述办法即可:
server
.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerInitializer())
.bind("127.0.0.1",8888);
五、Netty重构后的缓冲区(ByteBuf)
在之前讲《JavaIO体系-NIO》的时候曾聊到过它的三大件,其间就包含了ByteBuffer
,其作用首要是用来作为服务端和客户端之间传输数据的容器,NIO
中的ByteBuffer
支撑运用堆内存、本地(直接)内存来创立,而Netty-ByteBuf
也相同如此,如下:
-
ByteBufAllocator.DEFAULT.heapBuffer(cap)
:运用堆内存来创立ByteBuf
目标。 -
ByteBufAllocator.DEFAULT.directBuffer(cap)
:运用本地内存来创立ByteBuf
目标。
依据堆内存创立的ByteBuf
目标会遭到GC
机制处理,在发生GC
时需求来回移动Buffer
目标,一起之前在NIO
中也聊到过堆、本地内存的差异,如下:
一般本地内存的读写功率都会比堆内存高,由于OS
能够直接操作本地内存,而堆内存在读写数据时,则需求多出一步内存仿制的动作,总结如下:
- 堆内存由于直接遭到
JVM
处理,所以在Java
程序中创立时,分配功率较高,但读写功率低。 - 本地内存由于
OS
可直接操作,所以读写功率高,但由于创立时,需求向OS
额定恳求,分配功率低。
但上述聊到的这些特征,NIO
的Buffer
也具有,那Netty
关于Buffer
缓冲区终究增强了什么呢?首要是三方面:Buffer
池化技能、动态扩容机制、零仿制完结。
5.1、ByteBuf缓冲区池化技能
池化这个词汇我们应该都不生疏,Java
线程池、数据库衔接池,这些都是池化思想的产品,一般体系中较为宝贵的资源,都会选用池化技能来缓存,以便于下次需求时可直接运用,而无需通过繁琐的创立进程。
前面聊到过,
Netty
默许会选用本地内存创立ByteBuf
目标,而本地内存由于不是操作体系分配给Java
程序运用的,所以依据本地内存创立目标时,则需求额定独自向OS
恳求,这个进程天然开支较大,在高并发情况下,频频的创立、销毁ByteBuf
目标,一方面会导致功用降低,一起还有或许造成OOM
的危险(运用完没及时开释,内存未归还给OS
的情况下会呈现内存溢出)。
而运用池化技能后,一方面能有用防止OOM
问题发生,一起还能够省掉等候创立缓冲区的时刻,那Netty
中的池化技能,什么时候会敞开呢?这个要分渠道!
-
Android
体系默许会选用非池化技能,而其他体系,如Linux、Mac、Windows
等会默许启用。
但上述这条原则是Netty4.1
版别之后才参加的,由于4.1
之前的版别,其内部的池化技能还不够完善,所以4.1
之前的版别默许会禁用池化技能。当然,假如你在某些渠道下想自行决定是否敞开池化,可通过下述参数操控:
-
-Dio.netty.allocator.type=unpooled
:封闭池化技能。 -
-Dio.netty.allocator.type=pooled
:敞开池化技能。
这两个参数直接通过JVM
参数的办法,在发动Java
程序时指定即可。假如你想要检查自己创立的ByteBuf
目标,是否运用了池化技能,可直接打印目标的Class
即可,如下:
// 检查创立的缓冲区是否运用了池化技能
private static void byteBufferIsPooled(){
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
System.out.println(buffer.getClass());
}
public static void main(String[] args) {
byteBufferIsPooled();
}
/* *
* 输出成果:
* class io.netty.buffer.PooledUnsafeDirectByteBuf
* */
从输出的成果中的类名可看出,假如是以Pooled
开始的类名,则表明当时ByteBuf
目标运用池化技能,如若是以Unpooled
开始的类名,则表明未运用池化技能。
5.2、ByteBuf动态扩容机制
在前面聊《JavaNIO-Buffer缓冲区》的时候曾简略聊过NIO
的Buffer
源码,其内部的完结有些傻,每个Buffer
目标都具有一根limit
指针,这根指针用于操控读取/写入办法,因而在运用NIO-Buffer
时,每次写完缓冲区后,都需求调用flip()
办法来反转指针,以此来保证NIO-Buffer
的正常读写。
由于Java-NIO
中的Buffer
规划有些缺德,因而在运用NIO
的原生Buffer
目标时,就显得额定麻烦,必需求遵从如下进程:
- ①先创立对应类型的缓冲区
- ②通过
put
这类办法往缓冲区中写入数据 - ③调用
flip()
办法将缓冲区转化为读办法 - ④通过
get
这类办法从缓冲区中读取数据 - ⑤调用
clear()、compact()
办法清空缓冲区数据
而正是由于Java-NIO
原生的Buffer
规划的不合理,因而Netty
中直接重构了整个缓冲区组件,在Netty-ByteBuf
中,存在四个中心属性:
-
initialCapacity
:初始容量,创立缓冲区时指定的容量巨细,默许为256
字节。 -
maxCapacity
:最大容量,当初始容量不足以供给运用时,ByteBuf
的最大扩容约束。 -
readerIndex
:读取指针,默许为0
,当读取一部分数据时,指针会随之移动。 -
writerIndex
:写入指针,默许为0
,当写入一部分数据时,指针会随之移动。
首要来说说和NIO-Buffer
的两个首要差异:首要将本来一根指针变为了两根,别离对应读/写操作,这样就保证了运用ByteBuf
时,无需每次读写数据时手动翻转办法。一起参加了一个最大容量约束,在创立的ByteBuf
无法存下数据时,答应在最大容量的规模内,对ByteBuf
进行主动扩容,下面上个图了解:
上图中模拟了运用ByteBuf
缓冲区的进程,在创立时会先分配一个初始容量,这个容量能够自己指定,不指定默许为256
,接着会去创立出对应容量的缓冲区,开始读写指针都为0
,后续会跟着运用情况不断变化。
这儿要点调查终究一个状况,在真实运用进程中,一个ByteBuf
会被分为四个区域:
- 已抛弃区域:这是指现已被读取过的数据区域,由于其间的数据已被运用,所以归于抛弃区域。
- 可读取区域:这首要是指被写入过数据,但还未读取的区域,这块区域的数据都可被读取运用。
- 可写入区域:这首要是指写入指针和容量之间的区域,意味着这块区域是能够被写入数据的。
- 可扩容区域:这首要是指容量和最大容量之间的区域,代表当时缓冲区可扩容的规模。
ByteBuf
的首要完结位于AbstractByteBuf
这个子类中,但内部还有两根markedReaderIndex、markedWriterIndex
符号指针,这两根指针就相似于NIO-Buffer
中的mark
指针,这儿就不做重复赘述。下面上个事例简略试验一下BtyeBuf
的主动扩容特性,代码如下:
// 测验Netty-ByteBuf主动扩容机制
private static void byteBufCapacityExpansion() {
// 不指定默许容量巨细为16
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
System.out.println("测验前的Buffer容量:" + buffer);
// 运用StringBuffer来测验ByteBuf的主动扩容特性
StringBuffer sb = new StringBuffer();
// 往StringBuffer中刺进17个字节的数据
for (int i = 0; i < 17; i++) {
sb.append("6");
}
// 将17个字节巨细的数据写入缓冲区
buffer.writeBytes(sb.toString().getBytes());
printBuffer(buffer);
}
在这个测验主动扩容的办法中,终究用到了一个printBuffer()
办法来打印缓冲区,这是自界说的一个输出办法,也就依据Netty
自身供给的Dump
办法完结的,如下:
// 打印ByteBuf中数据的办法
private static void printBuffer(ByteBuf buffer) {
// 读取ByteBuffer已运用的字节数
int byteSize = buffer.readableBytes();
// 依据byteSize来核算显现的行数
int rows = byteSize / 16 + (byteSize % 15 == 0 ? 0 : 1) + 4;
// 创立一个StringBuilder用来显现输出
StringBuilder sb = new StringBuilder(rows * 80 * 2);
// 获取缓冲区的容量、读/写指针信息放入StringBuilder
sb.append("ByteBuf缓冲区信息:{");
sb.append("读取指针=").append(buffer.readerIndex()).append(", ");
sb.append("写入指针=").append(buffer.writerIndex()).append(", ");
sb.append("容量巨细=").append(buffer.capacity()).append("}");
// 利用Netty结构自带的格局化办法、Dump办法输出缓冲区数据
sb.append(StringUtil.NEWLINE);
ByteBufUtil.appendPrettyHexDump(sb, buffer);
System.out.println(sb.toString());
}
接着在main
办法中调用并运转,如下:
public static void main(String[] args) {
byteBufCapacityExpansion();
}
/* * 运转成果:
*
* 测验前的Buffer容量:PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 16)
* ByteBuf缓冲区信息:{读取指针=0, 写入指针=17, 容量巨细=64}
* +-------------------------------------------------+
* | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
* +--------+-------------------------------------------------+----------------+
* |00000000| 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 |6666666666666666|
* |00000010| 36 |6 |
* +--------+-------------------------------------------------+----------------+
* */
先来调查开始的容量:cap=16
,由于这是我们显现指定的初始容量,接着向该ByteBuf
中刺进17
个字节数据后,会发现容量主动扩展到了64
,但假如运用NIO-Buffer
来进行这样的操作,则会抛出反常。一起终究还把缓冲区中详细的数据打印出来了,这个是利用Netty
自带的appendPrettyHexDump()
办法完结的,中间是字节值,后边是详细的值,这儿就不做过多阐述~
5.3、 Netty中的读写API
首要在叙述Netty-ByteBuf
的读写API
之前,我们再说清楚一点与NIO-Buffer
的差异,不知我们是否还记住我在之前NIO
中聊到的一点:
其实这也是NIO-Buffer
规划不合理的一个当地,当你想要向缓冲区中写入不同类型的数据,要么得自己手动转化成Byte
字节类型,要么得new
一个对应的子完结,所以整个完结就较为臃肿,我们能够点进Java.nio
包看一下,你会看到下述场景:
这儿的类联系,我们一眼看过去显着会感觉头大,根本上完结都大致相同,但针关于每个数据类型,都编写了对应的完结类,而Netty
的作者显着意识到了这点,因而并未供给多种数据类型的缓冲区,仅供给了ByteBuf
这一种缓冲区,Why
?
其实道理非常简略,由于核算机上的一切数据资源,在底层实质上都是
0、1
构成的字节数据,所以只供给Byte
类型的ByteBuf
缓冲区就够了,终究它能够存储一切类型的数据,一起为了便于写入其他类型的数据,如Int、boolean、long....
,Netty
结构中也对外供给了相关的写入API
,接着一起来看看。
// Netty-ByteBuf笼统类
public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf> {
// 写入boolean数据的办法,内部运用一个字节表明,0=false、1=true
public abstract ByteBuf writeBoolean(boolean var1);
// 写入字节数据的办法
public abstract ByteBuf writeByte(int var1);
// 大端写入Short数据的办法
public abstract ByteBuf writeShort(int var1);
// 小端写入Short数据的办法
public abstract ByteBuf writeShortLE(int var1);
// 下述办法和写Short类型的办法仅类型不同,都区别了巨细端,不再重复注释
public abstract ByteBuf writeMedium(int var1);
public abstract ByteBuf writeMediumLE(int var1);
public abstract ByteBuf writeInt(int var1);
public abstract ByteBuf writeIntLE(int var1);
public abstract ByteBuf writeLong(long var1);
public abstract ByteBuf writeLongLE(long var1);
public abstract ByteBuf writeChar(int var1);
public abstract ByteBuf writeFloat(float var1);
public ByteBuf writeFloatLE(float value) {
return this.writeIntLE(Float.floatToRawIntBits(value));
}
public abstract ByteBuf writeDouble(double var1);
public ByteBuf writeDoubleLE(double value) {
return this.writeLongLE(Double.doubleToRawLongBits(value));
}
// 将另一个ByteBuf目标写入到当时缓冲区
public abstract ByteBuf writeBytes(ByteBuf var1);
// 将另一个ByteBuf目标的前N个长度的数据,写入到当时缓冲区
public abstract ByteBuf writeBytes(ByteBuf var1, int var2);
// 将另一个ByteBuf目标的指定规模数据,写入到当时缓冲区
public abstract ByteBuf writeBytes(ByteBuf var1, int var2, int var3);
// 向缓冲区中写入一个字节数组
public abstract ByteBuf writeBytes(byte[] var1);
// 向缓冲区中写入一个字节数组中,指定规模的数据
public abstract ByteBuf writeBytes(byte[] var1, int var2, int var3);
// 将一个NIO的ByteBuffer数据写入到当时ByteBuf目标
public abstract ByteBuf writeBytes(ByteBuffer var1);
// 将一个输入流中的数据写入到当时缓冲区
public abstract int writeBytes(InputStream var1, int var2)
throws IOException;
// 将一个NIO的ScatteringByteChannel通道中的数据写入当时缓冲区
public abstract int writeBytes(ScatteringByteChannel var1, int var2)
throws IOException;
// 将一个NIO的文件通道中的数据写入当时缓冲区
public abstract int writeBytes(FileChannel var1, long var2, int var4)
throws IOException;
// 将一个任意字符类型的数据写入缓冲区(CharSequence是一切字符类型的老大)
public abstract int writeCharSequence(CharSequence var1, Charset var2);
// 省掉其他写入数据的API办法........
}
上面列出了Netty-ByteBuf
中常用的写入办法,其实我们在这儿就能显着调查出与NIO
的差异,NIO
是为不同数据类型供给了不同的完结类,而Netty
则仅仅只是为不同类型,供给了不同的API
办法,显着后者的做法更佳,由于全体的代码结构会更为优雅。
这儿首要说一下大端写入和小端写入的差异,早年面的
API
列表中,我们能够看到,Netty
为每种数据类型,都供给了一个结尾带LE
的写入办法,这个带LE
的办法则是小端写入办法,那么巨细端之间有何差异呢?
巨细端写入是网络编程中的通用概念,由于网络数据传输进程中,一切的数据都是以二进制的字节格局传输的,而所谓的大端(Big Endian
)写入,是指先写高位,再写低位,高低位又是什么意思呢?
- 高位写入:指早年往后写,例如
1
这个数字,比特位办法为000...001
。 - 低位写入:指从后往前写,仍旧是
1
这个数字,比特位办法为100...000
。
这儿不了解的小伙伴又会疑问:为啥高位写入时,1
在终究面呀?这是由于要先写0
,再写1
的原因导致的。而反过来。所谓的小端(Little Endian
)写入,也便是指先写低位,再写高位。默许情况下,网络通讯会选用大端写入的办法。
简略了解Netty-ByteBuf
写入数据的API
后,接着再来看一些读取数据的API
办法,如下:
// 一系列read开始的读取办法,这种办法会改动读取指针(区别巨细端)
public abstract boolean readBoolean();
public abstract byte readByte();
public abstract short readUnsignedByte();
public abstract short readShort();
public abstract short readShortLE();
public abstract int readUnsignedShort();
public abstract int readUnsignedShortLE();
public abstract int readMedium();
public abstract int readMediumLE();
public abstract int readUnsignedMedium();
public abstract int readUnsignedMediumLE();
public abstract int readInt();
public abstract int readIntLE();
public abstract long readUnsignedInt();
public abstract long readUnsignedIntLE();
public abstract long readLong();
public abstract long readLongLE();
public abstract char readChar();
public abstract float readFloat();
// 省掉其他的read办法.....
// 一系列get开始的读取办法,这种办法不会改动读取指针(区别巨细端)
public abstract boolean getBoolean(int var1);
public abstract byte getByte(int var1);
public abstract short getUnsignedByte(int var1);
public abstract short getShort(int var1);
public abstract short getShortLE(int var1);
public abstract int getUnsignedShort(int var1);
public abstract int getUnsignedShortLE(int var1);
public abstract int getMedium(int var1);
public abstract int getMediumLE(int var1);
public abstract int getUnsignedMedium(int var1);
public abstract int getUnsignedMediumLE(int var1);
public abstract int getInt(int var1);
public abstract int getIntLE(int var1);
public abstract long getUnsignedInt(int var1);
public abstract long getUnsignedIntLE(int var1);
public abstract long getLong(int var1);
public abstract long getLongLE(int var1);
public abstract char getChar(int var1);
public abstract float getFloat(int var1);
// 省掉其他的get办法.....
在上面列出的一系列读取办法中,首要可分为read、get
两大类办法:
-
readXXX()
:这种办法读取数据后,会导致ByteBuf
内部的读取指针随之移动。 -
getXXX()
:这种办法读取数据后,不会改动ByteBuf
内部的读取指针。
那么读取指针改动之后会呈现什么影响呢?我们还记住前面聊到的ByteBuf
的四部分嘛?前面讲过,读取指针之前的数据部分,都会被符号为抛弃部分,这也就意味着通过read
系列的办法读取一段数据后,会导致这些数据无法再次被读取到,这儿来做个试验:
// 测验ByteBuf的read、get、mark功用
private static void bufferReader(){
// 分配一个初始容量为10的缓冲区
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
// 向缓冲区中写入10个字符(占位十个字节)
StringBuffer sb = new StringBuffer();
for (int i = 0; i < 10; i++) {
sb.append(i);
}
buffer.writeBytes(sb.toString().getBytes());
// 运用read办法读取前5个字节数据
printBuffer(buffer);
buffer.readBytes(5);
printBuffer(buffer);
// 再运用get办法读取后五个字节数据
buffer.getByte(5);
printBuffer(buffer);
}
public static void main(String[] args) {
bufferReader();
}
在上面的循环中,我是通过StringBuffer
来作为缓冲区的数据,但为何不直接写入int
数据呢?这是由于int
默许会占四个字节,而StringBuffer
底层是char
,一个字符只占用一个字节~,这儿是一个小细节,接着来看看运转成果:
ByteBuf缓冲区信息:{读取指针=0, 写入指针=10, 容量巨细=10}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 30 31 32 33 34 35 36 37 38 39 |0123456789 |
+--------+-------------------------------------------------+----------------+
ByteBuf缓冲区信息:{读取指针=5, 写入指针=10, 容量巨细=10}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 35 36 37 38 39 |56789 |
+--------+-------------------------------------------------+----------------+
ByteBuf缓冲区信息:{读取指针=5, 写入指针=10, 容量巨细=10}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 35 36 37 38 39 |56789 |
+--------+-------------------------------------------------+----------------+
从上述成果中可看出,运用readBytes()
办法读取五个字节后,读取指针会随之移动到5
,接着看看前后的数据变化,此刻会发现数据从0123456789
变成了56789
,这是由于前面五个字节的数据,现已归于抛弃部分了,所以printBuffer()
办法无法读取显现。
接着再看看后边,通过
getByte()
读取五个字节后,此刻ByteBuf
目标的读取指针,显着不会随之移动,也便是通过get
系列办法读取缓冲区数据,并不会导致读过的数据抛弃。
那假如运用read
系列办法读取数据后,后续仍旧想要读取数据该怎么办呢?这儿能够运用ByteBuf
内部的符号指针完结,如下:
// 在上述办法的终究持续追加下述代码:
// 运用mark符号一下读取指针,然后再运用read办法读取数据
buffer.markReaderIndex();
buffer.readBytes(5);
printBuffer(buffer);
// 此刻再通过reset办法,使读取指针康复到前面的符号方位
buffer.resetReaderIndex();
printBuffer(buffer);
此刻再次查询运转成果,如下:
ByteBuf缓冲区信息:{读取指针=10, 写入指针=10, 容量巨细=10}
ByteBuf缓冲区信息:{读取指针=5, 写入指针=10, 容量巨细=10}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 35 36 37 38 39 |56789 |
+--------+-------------------------------------------------+----------------+
从成果中能够显着看到,对读取指针做了符号后,再次运用read
系列办法读取数据,仍旧会导致读过的部分变为抛弃数据,但后续能够通过reset
办法,将读取指针康复到前面的符号方位,然后再次检查缓冲区的数据,就会发现数据又能够重复被读取啦~
其实除开能够通过
markReaderIndex()、resetReaderIndex()
办法符号、康复读取指针外,还能够通过markWriterIndex()、resetWriterIndex()
办法来符号、康复写入指针。符号读取指针后,能够让缓冲区中的一段数据被屡次read
读取,而符号写入指针后,能够让缓冲区的一段区间被重复写入,但每次后边的写入会覆盖前面写入的数据。
OK~,关于ByteBuf
的API
操作就介绍到这儿,其实内部供给了一百多个API
办法,但我就不一一去做阐明啦,我们点进源码后就能看到,感兴趣的小伙伴能够自行调试!
5.4、ByteBuf的内存收回
在前面聊到过,Netty-ByteBuf
在除安卓渠道外,都会运用池化技能来创立,那一个已创立出的ByteBuf
目标,其占用的内存在什么情况下会归还给内存池呢?想要聊了解这点,得先了解ByteBuf
的引证开释。
学习过
JVM-GC
机制的小伙伴应该知道,JVM
中运用的目标存活判定法是根可达算法,而在此之前的一种常用算法被称之为《引证计数法》,但由于该算法存在循环引证的问题,所以并不适合作为主动判定存活的算法,但Netty-ByteBuf
中恰恰运用了这种算法。
首要来看看Netty-ByteBuf
的类联系:
public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf>
从上面的类界说中可显着看到,ByteBuf
完结了ReferenceCounted
接口,该接口翻译过来的含义则是引证计数,该接口中供给的办法列表如下:
public interface ReferenceCounted {
// 检查一个目标的引证计数统计值
int refCnt();
// 对一个目标的引证计数+1
ReferenceCounted retain();
// 对一个目标的引证计数+n
ReferenceCounted retain(int var1);
// 记载当时目标的当时拜访方位,内存走漏时会回来该办法记载的值
ReferenceCounted touch();
ReferenceCounted touch(Object var1);
// 对一个目标的引证计数-1
boolean release();
// 对一个目标的引证计数-n
boolean release(int var1);
}
要点重视retain()、release()
办法,这两个办法别离对应加/减一个目标的引证计数,把ByteBuf
套入进来,当一个缓冲区目标的引证计数为0
时,会清空当时缓冲区中的数据,而且将占用的内存归还给内存池,一切测验再次拜访该ByteBuf
目标的操作,都会被拒绝。简略来说,一句话总结便是:当一个ByteBuf
目标的引证计数变为0
时,该缓冲区就会变为外部不行拜访的状况。
综上所述,在运用完一个ByteBuf
目标后,清晰后续不会用到该目标时,必定要记住手动调用release()
清空引证计数,不然会导致该缓冲区持久占用内存,终究引发内存走漏。
这儿拓宽一点小细节,好像在
Netty-Channel
中,都会选用ByteBuf
来发送/接纳数据,那这些通道传输数据用的ByteBuf
目标,其占用的内存会在何时收回呢?这会牵扯到前面的ChannelPipeline
链表。
还记住这幅通道处理器链表图嘛?在其间有两个特殊的处理器,即Head、Tail
处理器:
-
Head
处理器:- 假如通道上只要入站处理器,它会作为整个处理器链表的第一个处理器调用。
- 假如通道上只要出站处理器,它会作为整个处理器链表的终究一个处理器调用。
- 假如通道上入/出站处理器都有,它会作为入站的第一个处理调用,出站的终究一个处理器调用。
-
Tail
处理器:- 假如通道上只要入站处理器,
Tail
节点会作为整个链表的终究一个处理器调用。 - 假如通道上只要出站处理器,
Tail
节点会作为整个链表的第一个处理器调用。 - 假如通道上入/出站处理器都有,它会作为出站的第一个调用、入站的终究一个调用。
- 假如通道上只要入站处理器,
结合上面所说的内容,Head、Tail
处理器在任何情况下,其间至少会有一个,作为通道上的终究一个处理器调用,而在这两个头尾处理器中,会主动开释ByteBuf
的作业,先来看看Head
处理器,源码如下:
// ChannelPipeline处理器链表的默许完结类
public class DefaultChannelPipeline implements ChannelPipeline {
// Head处理器的完结类:一起完结了入站、出站处理器接口
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
// 作为入站链表第一个处理器时,会调用的办法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 持续往下调用其他自界说的入站处理器
ctx.fireChannelRead(msg);
}
// 作为出站链表的终究一个处理器时,会调用的办法
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
// unsafe.write()终究会调用到AbstractUnsafe.write()办法
this.unsafe.write(msg, promise);
}
}
// 省掉其他办法....
}
public abstract class AbstractChannel extends DefaultAttributeMap
implements Channel {
protected abstract class AbstractUnsafe implements Unsafe {
public final void write(Object msg, ChannelPromise promise) {
this.assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// 这儿先不需求了解,后续源码篇会聊
if (outboundBuffer == null) {
this.safeSetFailure(promise, this.newClosedChannelException(AbstractChannel.
this.initialCloseCause));
// 终究在这儿,仍旧调用了引证计数东西类的release办法
ReferenceCountUtil.release(msg);
} else {
int size;
try {
msg = AbstractChannel.this.filterOutboundMessage(msg);
size = AbstractChannel.this.pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable var6) {
this.safeSetFailure(promise, var6);
// 这儿也会调用了引证计数东西类的release办法
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
}
// 省掉其他办法....
}
// 省掉其他类与办法....
}
// 引证计数东西类
public final class ReferenceCountUtil {
public static boolean release(Object msg) {
// 这儿会先判别一下对应的msg目标是否完结了引证计数接口,
// 只要对应的msg完结了ReferenceCounted接口时,才会开释引证
return msg instanceof ReferenceCounted ?
((ReferenceCounted)msg).release() : false;
}
// 省掉其他办法.....
}
Head
节点会作为出站链表的终究一个处理器调用,因而在一切自界说出站处理器履行完结后,终究调用该节点的write()
办法,在这个办法内部,终究调用了AbstractUnsafe.write()
办法,对应的办法完结中,我们仅需重视ReferenceCountUtil.release(msg)
这行代码即可,终究会在该东西类中开释msg
目标的引证计数。
接着再来看看Tail
节点的完结源码:
// ChannelPipeline处理器链表的默许完结类
public class DefaultChannelPipeline implements ChannelPipeline {
// Tail处理器的完结类:完结了入站处理器接口,作为入站调用链终究的处理器
final class TailContext extends AbstractChannelHandlerContext
implements ChannelInboundHandler {
// 一切自界说的入站处理器履行完结后,会调用的办法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
DefaultChannelPipeline.this.onUnhandledInboundMessage(ctx, msg);
}
// 省掉其他办法.....
}
// 前面Tail、Head调用的开释办法
protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
// 调用开释ByteBuf缓冲区的办法
this.onUnhandledInboundMessage(msg);
// 记载日志
if (logger.isDebugEnabled()) {
logger.debug("Discarded message pipeline :" +
"{}. Channel : {}.", ctx.pipeline().names(), ctx.channel());
}
}
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug("Discarded inbound message {} that reached " +
"at the tail of the pipeline. Please check your pipeline" +
"configuration.", msg);
} finally {
// 终究调用了引证计数东西类的release办法
ReferenceCountUtil.release(msg);
}
}
}
Tail
节点会作为入站链表的终究一个处理器调用,所以在履行Tail
处理器时,终究会调用它的channelRead()
办法,而在相应的办法内部,调用了onUnhandledInboundMessage()
办法,跟着源码持续走,此刻也会发现,终究也调用了ReferenceCountUtil.release(msg)
办法来开释引证。
依据源码中的推断,好像
Netty
结构发送/接纳数据用的ByteBuf
,都会由头尾处理器来开释,但答案的确如此吗?NO
,为什么呢?再次将目光放到ReferenceCountUtil.release(msg)
这处代码:
// 引证计数东西类
public final class ReferenceCountUtil {
public static boolean release(Object msg) {
// 这儿会先判别一下对应的msg目标是否完结了引证计数接口,
// 只要对应的msg完结了ReferenceCounted接口时,才会开释引证
return msg instanceof ReferenceCounted ?
((ReferenceCounted)msg).release() : false;
}
// 省掉其他办法.....
}
// ByteBuf的类界说
public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf>
此刻我们留意看,ReferenceCountUtil.release()
在履行前,会先判别一下当时的msg
是否完结了ReferenceCounted
接口,而ByteBuf
是完结了的,因而假如履行到Head/Tail
处理器时,msg
数据仍旧为ByteBuf
类型,头尾处理器天然能够完结收回作业,但如若是下面这种情况呢?
pipeline.addLast("In-①",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("俺是In-①入站处理器...");
// 在第一个入站处理器中,将接纳到的ByteBuf数据转化为String向下传递
ByteBuf buffer = (ByteBuf) msg;
String message = buffer.toString(Charset.defaultCharset());
super.channelRead(ctx, message);
}
});
pipeline.addLast("In-②",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("我是In-②入站处理器...");
super.channelRead(ctx, msg);
}
});
在上述这个事例中,我们在第一个入站处理器中,将接纳到的ByteBuf
数据转化为String
向下传递,也就意味着从In-②
处理器开端,后边一切的处理器收到的msg
都为String
类型,当自界说的两个处理器履行完结后,终究会调用Tail
处理器完结收尾作业,但问题来了!
由于在
In-①
中msg
类型发生了改动,所以当Tail
处理器中调用ReferenceCountUtil.release()
时,由于String
并未完结ReferenceCounted
接口,所以Tail
无法对该msg
进行开释,终究就会造成内存走漏问题。
但此刻内存走漏,发生在哪个方位呢?答案是位于In-①
中,由于In-①
处理器中就现已将ByteBuf
用完了,将其间的数据转化成了String
类型,而ByteBuf
后续处理器都不会用到,因而该ByteBuf
占用的内存永久不会被开释,所以必定要留意:在运用处理器的进程中,假如清晰ByteBuf
不会持续运用,那请必定要记住手动调用release()
办法开释引证,以上述事例阐明:
ByteBuf buffer = (ByteBuf) msg;
String message = buffer.toString(Charset.defaultCharset());
buffer.release();
当清晰不运用该ByteBuf
值时,请记住调用对应的release()
办法开释引证!这样能够有用防止内存走漏的问题呈现,有人也许会说,JVM
不是有GC
机制吗?为什么会呈现内存走漏呀?
关于上述问题的道理非常简略,由于
Netty
默许选用本地内存来创立缓冲区,而且会利用池化技能处理一切缓冲区,假如一个ByteBuf
目标的引证不为0
,那么该ByteBuf
会永久的占用内存资源,Netty
无法主动将其占用的内存收回到池中。
5.5、 Netty中的零仿制技能
想要讲清楚Netty-ByteBuf
中的零仿制技能,那首要得先了解零仿制终究是个啥,因而我们先讲了解零仿制的概念,再讲清楚操作体系的零仿制技能,然后再说说Java-NIO
中的零仿制表现,终究再来聊Netty-ByteBuf
中的零仿制技能。
六、随处可见的零仿制技能
零仿制这个词,在许多当地都有呈现,例如Kafka、Nginx、Tomcat、RocketMQ...
的底层都运用了零仿制的技能,那终究什么叫做零仿制呢?其实所谓的零仿制,并不是不需求通过数据仿制,而是削减内存仿制的次数,上个比方来了解,比方Nginx
向客户端供给文件下载的功用。
客户端要下载的文件都位于Nginx
所在的服务器磁盘中,假如当一个客户端恳求下载某个资源文件时,这时需求通过的进程如下:
先来简略聊一聊文件下载时,Nginx
服务器内部的数据传输进程:
- ①客户端恳求下载服务器上的某个资源,
Nginx
解析恳求并得知客户端要下载的详细文件。 - ②
Nginx
向OS
建议体系IO
调用,调用内核read(fd)
函数,运用上下文切态至内核空间。 - ③
read()
函数通过DMA
操控器,将目标文件的数据从磁盘读取至内核缓冲区。 - ④
DMA
传输数据完结后,CPU
将数据从内核缓冲区仿制至用户缓冲区(程序的内存空间)。 - ⑤
CPU
仿制数据完结后,read()
调用完毕并回来,上下文从内核态切回用户态。 - ⑥
Nginx
再次向OS
建议内核write(fd)
函数的体系调用,运用上下文再次切到内核态。 - ⑦接着
CPU
将用户缓冲区中的数据,写入到Socket
网络套接字的缓冲区。 - ⑧数据仿制到
Socket
缓冲区后,DMA
操控器将Socket
缓冲区的数据传输到网卡设备。 - ⑨
DMA
操控器将数据仿制至网卡设备后,write()
函数调用完毕,再次切回用户态。 - ⑩文件数据抵达网卡后,
Nginx
预备向客户端呼应数据,组装报文回来数据……
从上述流程我们可得知,一次文件下载传统的IO
流程,需求通过四次切态,四次数据仿制(CPU、DMA
各两次),而所谓的零仿制,并不是指不需求通过数据仿制,而是指削减其间的数据仿制次数。
6.1、操作体系中的零仿制技能
我这儿指的操作体系默许是Linux
,由于MacOS、Windows
体系相对闭源,因而关于这两个操作体系中的零仿制技能个人并不了解。在Linux
中供给了多种零仿制的完结:
- ①
MMAP
同享内存 +write()
体系函数。 - ②
sendfile()
内核函数。 - ③结合
DMA-Scatter/Gather Copy
收集仿制功用完结的sendfile()
函数。 - ④
splice()
内核函数。
6.1.1、MMAP同享内存
先来聊聊第①种吧,MMAP
同享内存这个概念,在上篇关于《Linux-IO多路复用模型:select、poll、epoll源码剖析》的文章结尾提到过,MMAP
同享内存是指:在内核空间和用户空间之间的一块同享内存,这块内存可被用户态和内核态直接拜访,结构如下:
先看左边的图,这也是众多材料中撒播的图,同享内存位于用户态和内核态之间,这样了解其实也并无大碍,但右边的图才更为准确,由于内核态和用户态自身是两个空间,各自之间并不存在真实的同享区域,MMAP
同享内存是通过虚拟内存机制完结的,也便是通过内存映射技能完结的。
什么又叫做内存映射技能呢?这个其实很好了解,就比方
Linux
中的软链接、Windows
中的快捷办法相同,拿我们了解的Windows
体系来说,一般在装置一个程序后,为了便利后续运用,一般都会默许在桌面上生成快捷办法(图标),这个快捷办法其实并不是一个真实的程序,而是指向装置目录下xxx.exe
的链接。
在Windows
体系上装置一个程序后,我们能够通过点击桌面图标翻开,亦可双击装置目录下的xxx.exe
文件发动,而操作体系中的同享内存也是相同的思路。
在干流操作体系中都有一种名为虚拟内存的机制,这是指能够分配多个虚拟内存地址,指向同一个物理内存地址,此刻内核态程序和用户态程序,能够通过不同的虚拟地址,来操作同一块物理内存,这也便是
MMAP
同享内存技能的真实完结。
MMAP
的体系界说如下:
void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
-
addr
:指定映射的虚拟内存地址。 -
length
:映射的内存空间长度。 -
prot
:映射内存的保护办法。 -
flags
:指定映射的类型。 -
fd
:进行映射的文件句柄。 -
offset
:文件偏移量。
还是之前那幅图(不要问我为什么,由于懒的画~),要点看图中圈出来的区域:
假如内核缓冲区和用户缓冲区运用了MMAP
同享内存,那当DMA
操控器将数据仿制至内核缓冲区时,由于这儿的内核缓冲区,实质是一个虚拟内存地址指向用户缓冲区,所以DMA
会直接将磁盘数据仿制至用户缓冲区,这就削减了一次内核缓冲区到用户缓冲区的CPU
仿制进程,后续直接调用write()
函数把数据写到Socket
缓冲区即可,因而这也是一种零仿制的表现。
6.1.2、sendfile()内核函数
sendfile()
是Linux2.1
版别中推出的一个内核函数,体系调用的原型如下:
ssize_t sendfile(int fd_in, int fd_out, off_t *offset, size_t count);
-
fd_in
:待写入数据的文件描述符(一般为Socket
网络套接字的描述符)。 -
fd_out
:待读取数据的文件描述符(一般为磁盘文件的描述符)。 -
offset
:磁盘文件的文件偏移量。 -
count
:声明在fd_out
和fd_in
之间,要传输的字节数。
关于啥是文件描述符我就不重复赘述了,这仍旧在上篇的《Linux多路复用函数源码剖析-FD文件描述符》中聊到过,当调用sendfile()
函数传输数据时,将out_fd
指定为等候写入数据的网络套接字,将in_fd
指定为待读取数据的磁盘文件,就能够直接在内核缓冲区中完结传输进程,无需通过用户缓冲区,如下:
仍旧以前面Nginx
下载文件的进程为例,完整流程如下:
- ①客户端恳求下载服务器上的某个资源,
Nginx
解析恳求并得知客户端要下载的详细文件。 - ②
Nginx
向OS
建议体系IO
调用,调用内核sendfile()
函数,上下文切态至内核空间。 - ③
sendfile()
函数通过DMA
操控器,将目标文件的数据从磁盘读取至内核缓冲区。 - ④
DMA
传输数据完结后,CPU
将数据从内核缓冲区仿制至Socket
缓冲区。 - ⑤
CPU
仿制数据完结后,DMA
操控器将数据从Socket
缓冲区仿制至网卡设备。 - ⑥数据仿制到网卡后,
sendfile()
调用完毕,运用上下文切回用户态空间。 - ⑦
Nginx
预备向客户端呼应数据,组装报文回来数据……
相较于本来的MMAP+write()
的办法,运用sendfile()
函数来处理IO
恳求,这显着功用更佳,由于这儿不只仅削减了一次CPU
仿制,而且还削减了两次切态的进程。
6.1.3、DMA-Scatter/Gather Copy – sendfile()函数
前面聊了Linux2.1
版别中的sendfile()
函数,而到了Linux2.4
版别中,又对sendfile()
做了升级,引入了S/G-DMA
技能支撑,也便是在DMA
仿制阶段,假如硬件支撑的情况下,会参加Scatter/Gather
操作,这样就省去了仅有的一次CPU
仿制进程,如下:
优化后的sendfile()
函数,仿制数据时只需求奉告out_fd、in_fd、count
即可,然后DMA
操控器会直接将数据从磁盘仿制至网卡,而无需通过CPU
将数据仿制至Socket
缓冲区这一步。
6.1.4、splice()内核函数
前面聊到的sendfile()
函数只适用于将数据从磁盘文件仿制到Socket
套接字或网卡上,所以这也约束了它的运用规模,因而在Linux2.6
版别中,引入了splice()
函数,其体系调用的原型如下:
ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out,
size_t len, unsigned int flags);
-
fd_in
:等候写入数据的文件描述符。 -
off_in
:假如fd_in
是一个管道文件(如Socket
),该值有必要为NULL
,不然为文件的偏移量。 -
fd_out
:等候读取数据的文件描述符。 -
off_out
:作用同off_in
参数。 -
len
:指定fd_in、fd_out
之间传输数据的长度。 -
flags
:操控数据传输的办法:-
SPLICE_F_MOVE
:假如数据适宜,按标准页巨细移动数据(2.6.21
版别后被抛弃)。 -
SPLICE_F_NONBLOCK
:以非堵塞式办法履行splice()
,实践仍旧会受FD
状况影响。 -
SPLICE_F_MORE
:给内核一个提示,后续splice()
还会持续传输更多的数据。 -
SPLICE_F_GIFT
:没有作用的选项。
-
运用splice
函数时,fd_in、fd_out
中有必要至少有一个是管道文件描述符,套到网络编程中的含义便是指:必需求有一个文件描述符是Socket
类型,假如两个磁盘文件进行仿制,则无法运用splice
函数。
splice()
函数的作用和DMA-Scatter/Gather
版的sendfile()
函数彻底相同,但与其不同的是:splice()
函数不只不需求硬件支撑,而且能够做到两个文件描述符之间的数据零仿制,完结的进程是依据一端的管道文件描述符,在两个FD
之间搭建pipeline
管道,然后完结两个FD
之间的数据零仿制。
6.2、特殊的零仿制技能
前面聊到了四种Linux
体系中的零仿制技能,而除开Linux
体系中的零仿制技能外,还有一些特殊的零仿制完结,先来聊一聊缓冲区同享技能,然后再聊聊运用程序中的零仿制表现。
6.2.1、缓冲区同享
缓冲区同享技能相似于Linux
中的MMAP
同享内存,但缓冲区同享则是真实意义上的内存同享技能,内核缓冲区和用户缓冲区同享同一块内存,如下:
操作体系一般为了体系的安全性,在运转期间都会分为用户态和内核态,无法直接拜访用户态程序内核态空间,所以Linux
中的MMAP
是依据虚拟内存完结的,而想要完结真实意义上的内存同享,这也就意味着需求重写内核结构,现在比较老练的只要Solaris
体系上的Fast Buffer
技能,但我们只需了解即可,由于这个也很少用到。
6.2.2、程序数据的零仿制
前面聊到的零仿制技能,都是在削减磁盘文件和网络套接字之间的数据仿制次数,而程序中也会存在许多的数据仿制进程,比方将一个大调集拆分为两个小调集、将多个小调集兼并成一个大调集等等,传统的做法如下:
List<Integer> a = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> b = new ArrayList<>();
List<Integer> c = new ArrayList<>();
for (Integer num : a) {
int index = a.indexOf(num);
if (index < 5){
b.add(num);
} else {
c.add(num);
}
}
而这种做法显着会牵扯到数据仿制,但上述这个做法,会从a
中将数据仿制到b、c
调集中,而所谓的零仿制,便是无需发生仿制动作,也能够将a
拆分红b、c
两个调集。
关于详细怎么完结,这点待会儿在
Netty-ByteBuf
中演示,由于Netty
中的零仿制技能,也完结了程序数据的零仿制。
6.3、Java-NIO中的零仿制表现
Java-NIO
中,首要有三个方面用到了零仿制技能:
-
MappedByteBuffer.map()
:底层调用了操作体系的mmap()
内核函数。 -
DirectByteBuffer.allocateDirect()
:能够直接创立依据本地内存的缓冲区。 -
FileChannel.transferFrom()/transferTo()
:底层调用了sendfile()
内核函数。
调查上述给出的三处方位,其实实质也便是在调用操作体系内核供给的零仿制函数,以此削减数据的仿制次数。
6.4、再聊Netty中的零仿制表现
Netty
中的零仿制与前面操作体系层面的零仿制不同,它是一种用户进程等级的零仿制表现,首要也包含三方面:
①
Netty
的发送、接纳数据的ByteBuf
缓冲区,默许会运用堆外本地内存创立,选用直接内存进行Socket
读写,数据传输时无需通过二次仿制。假如运用传统的堆内存进行Socket
网络数据读写,JVM
需求先将堆内存中的数据仿制一份到直接内存,然后才写入Socket
缓冲区中,相较于堆外直接内存,音讯在发送进程中多了一次缓冲区的内存仿制。
②
Netty
的文件传输选用了transferTo()/transferFrom()
办法,它能够直接将文件缓冲区的数据发送到目标Channel(Socket)
,底层便是调用了sendfile()
内核函数,防止了文件数据的CPU
仿制进程。
③
Netty
供给了组合、拆解ByteBuf
目标的API
,我们能够依据一个ByteBuf
目标,对数据进行拆解,也能够依据多个ByteBuf
目标进行数据兼并,这个进程中不会呈现数据仿制,下面要点聊一聊这个!
其间前两条就不过多赘述了,终究前面都唠叨过好几回,要点说说第三种零仿制技能,这是一种Java
等级的零仿制技能,ByteBuf
中首要有slice()、composite()
这两个办法,用于拆分、兼并缓冲区,先来聊聊拆分缓冲区的办法,事例如下:
// 测验Netty-ByteBuf的slice零仿制办法
private static void sliceZeroCopy(){
// 分配一个初始容量为10的缓冲区
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
// 写入0~9十个字节数据
byte[] numData = {'0','1','2','3','4','5','6','7','8','9'};
buffer.writeBytes(numData);
printBuffer(buffer);
// 从下标0开端,向后截取五个字节,拆分红一个新ByteBuf目标
ByteBuf b1 = buffer.slice(0, 5);
printBuffer(b1);
// 从下标5开端,向后截取五个字节,拆分红一个新ByteBuf目标
ByteBuf b2 = buffer.slice(5, 5);
printBuffer(b2);
// 证明切开出的两个ByteBuf目标,是同享第一个ByteBuf目标数据的
// 这儿修正截取后的b1目标,然后检查开始的buffer目标
b1.setByte(0,'a');
printBuffer(buffer);
}
public static void main(String[] args) {
sliceZeroCopy();
}
在上述办法中,首要创立了一个buffer
目标,往其间写入了0~9
这十个字符,接着将其拆分红了b1、b2
这两个ByteBuf
目标,b1、b2
都具有独立的读写指针,但却并未真实的从buffer
中仿制新的数据出来,而是依据buffer
这个目标,进行了数据截取,运转成果如下:
ByteBuf缓冲区信息:{读取指针=0, 写入指针=10, 容量巨细=10}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 30 31 32 33 34 35 36 37 38 39 |0123456789 |
+--------+-------------------------------------------------+----------------+
ByteBuf缓冲区信息:{读取指针=0, 写入指针=5, 容量巨细=5}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 30 31 32 33 34 |01234 |
+--------+-------------------------------------------------+----------------+
ByteBuf缓冲区信息:{读取指针=0, 写入指针=5, 容量巨细=5}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 35 36 37 38 39 |56789 |
+--------+-------------------------------------------------+----------------+
ByteBuf缓冲区信息:{读取指针=0, 写入指针=10, 容量巨细=10}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 31 32 33 34 35 36 37 38 39 |a123456789 |
+--------+-------------------------------------------------+----------------+
调查上述第二、三个ByteBuf
缓冲区信息,与前面说的毫无差异,显着都具有独立的读写指针,但我为什么说:b1、b2
没有仿制数据呢?接着看办法中的终究一步,我对b1
的第一个元素做了修正,然后输出了buffer
目标,看上述成果中的第四个ByteBuf
缓冲区信息,其实会发现:buffer
目标中下标为0
的数据,也被改成了a
!由此即可证明前面的观念。
不过这种零仿制办法,尽管削减了数据仿制次数,但也会有必定的局限性:
①运用slice()
办法拆分出的ByteBuf
目标,不支撑扩容,也便是切开的长度为5
,最大长度也只能是5
,超出长度时会抛出下标越界反常。
②由于拆分出的ByteBuf
目标,其数据依靠于原ByteBuf
目标,因而当原始ByteBuf
目标被开释时,拆分出的缓冲区也会不行用,所以在运用slice()
办法时,要手动调用retain()/release()
来增加引证计数(这个后边细聊)。
除开上述的slice()
办法外,还有其他一个叫做duplicate()
的零仿制办法,它的作用是彻底克隆原有ByteBuf
目标,但读写指针都是独立的,而且支撑主动扩容,我们感兴趣能够自行试验。
接着聊一聊兼并ByteBuf
缓冲区的零仿制办法,该办法的运用办法与前面的办法并不同,如下:
// 测验Netty-ByteBuf的composite零仿制办法
private static void compositeZeroCopy(){
// 创立两个小的ByteBuf缓冲区,并往两个缓冲区中刺进数据
ByteBuf b1 = ByteBufAllocator.DEFAULT.buffer(5);
ByteBuf b2 = ByteBufAllocator.DEFAULT.buffer(5);
byte[] data1 = {'a','b','c','d','e'};
byte[] data2 = {'n','m','x','y','z'};
b1.writeBytes(data1);
b2.writeBytes(data2);
// 创立一个兼并缓冲区的CompositeByteBuf目标
CompositeByteBuf buffer = ByteBufAllocator.DEFAULT.compositeBuffer();
// 将前面两个小的缓冲区,兼并成一个大的缓冲区
buffer.addComponents(true,b1,b2);
printBuffer(buffer);
}
public static void main(String[] args) {
compositeZeroCopy();
}
/* * 运转成果:
ByteBuf缓冲区信息:{读取指针=0, 写入指针=10, 容量巨细=10}
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 65 6e 6d 78 79 7a |abcdenmxyz |
+--------+-------------------------------------------------+----------------+
* */
事例中,想要将多个缓冲区兼并成一个大的缓冲区,需求先创立一个CompositeByteBuf
目标,接着调用它的addComponent()/addComponents()
办法,将小的缓冲区增加进去即可。但在兼并多个缓冲区时,addComponents()
办法中的第一个参数有必要为true
,不然不会主动增长读写指针。
其实说终究,
Netty-ByteBuf
缓冲区的零仿制办法,实践上也能够被称之为“一种特殊的浅仿制”,与之对应的是“深仿制”,而ByteBuf
中的“深仿制”,则是一系列以Copy
开始的办法,通过这类办法仿制缓冲区,会彻底分配新的内存地址、读写指针。
终究,在Netty
内部还供给了一个名为Unpooled
的东西类,这首要是针关于非池化缓冲区的东西类,内部也供给了一系列wrappend
开始的办法,能够用来组合、包装多个ByteBuf
目标或字节数组,调用对应办法时,内部也不会发生仿制动作,这也是一类零仿制的办法。
七、Netty入门篇小结
通过上述一系列的叨叨絮絮后,关于Netty
结构的根本概念,以及Netty
结构中大多数中心组件做了介绍,但关于一些粘包、半包、解码器、长衔接、心跳机制等内容未阐述,本来计划将这些内容一篇写完,但本章的字数实在太多,严重超出单章约束:
因而关于后续一些进阶的常识,会再开设一篇叙述,预计Netty
的文章会有4~5
篇左右,大体次序为《Netty
入门篇》、《Netty
进阶篇》、《Netty
实战篇》、《Netty
运用篇》、《Netty
源码篇》,但详细的篇幅会在后续适当调整。
本篇的内容就到这儿啦,如若对你有协助,请记住点个小赞支撑一下~