引言
关于Netty
网络结构的内容,前面现已讲了两个章节,但总归来说难以真实把握,究竟只是对其间一个个组件进行解说,很难让诸位将其串起来构成一条线,所以本章中则会结合实战事例,对Netty
进行更深层次的学习与把握,实战事例也并不难,一个非常朴素的IM
谈天程序。
本来计划做个多人斗地主,但斗地主需求织入过多的事务逻辑,因而一方面会带来不必要的了解难度,让事例更为杂乱化,另一方面代码量也会偏多,所以终究仍旧挑选完结根本的谈天程序,既简略,又能加深对
Netty
的了解。
一、依据Netty规划通讯协议
协议,这玩意儿相信我们肯定不陌生了,究竟在《网络编程》系列的前两章,都在围绕着网络协议打开叙述,再来简略回忆一下协议的概念:网络协议是指一种通讯两边都必须遵守的约定,两个不同的端,依照必定的格局对数据进行“编码”,一起依照相同的规矩进行“解码”,然后完结两者之间的数据传输与通讯。
当自己想要打造一款IM
通讯程序时,关于音讯的封装、拆分也相同需求规划一个协议,通讯的两端都必须遵守该协议作业,这也是完结通讯程序的条件,但为什么需求通讯协议呢?因为TCP/IP
中是依据流的办法传输音讯,音讯与音讯之间没有鸿沟,而协议的意图则在于约定音讯的样式、鸿沟等。
1.1、Redis通讯的RESP协议
不知我们是否还记得之前在《Redis总述篇》中聊到的RESP
客户端协议,这是Redis
供给的一种客户端通讯协议,假如想要操作Redis
,就必须遵守该协议的格局发送数据,但这个协议特别简略,如下:
- 首要要求一切指令,都以
*
最初,后面跟着详细的子指令数量,接着用换行符切割。 - 接着需求先用
$
符号声明每个子指令的长度,然后再用换行符切割。 - 终究再拼接上详细的子指令,相同用换行符切割。
这样描绘有些令人难懂,那就直接看个事例,例如一条简略set
指令,如下:
客户端指令:
set name ZhuZi
转变为RESP指令:
*3
$3
set
$4
name
$5
ZhuZi
依照Redis
的规矩,但凡满意RESP
协议的客户端,都能够直接衔接并操作Redis
服务端,这也就意味着我们能够直接经过Netty
来手写一个Redis
客户端,代码如下:
// 依据Netty、RESP协议完结的Redis客户端
public class RedisClient {
// 换行符的ASCII码
static final byte[] LINE = {13, 10};
public static void main(String[] args) {
EventLoopGroup worker = new NioEventLoopGroup();
Bootstrap client = new Bootstrap();
try {
client.group(worker);
client.channel(NioSocketChannel.class);
client.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel)
throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ChannelInboundHandlerAdapter(){
// 通道树立成功后调用:向Redis发送一条set指令
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
String command = "set name ZhuZi";
ByteBuf buffer = respCommand(command);
ctx.channel().writeAndFlush(buffer);
}
// Redis呼应数据时触发:打印Redis的呼应成果
@Override
public void channelRead(ChannelHandlerContext ctx,
Object msg) throws Exception {
// 接受Redis服务端履行指令后的成果
ByteBuf buffer = (ByteBuf) msg;
System.out.println(buffer.toString(CharsetUtil.UTF_8));
}
});
}
});
// 依据IP、端口衔接Redis服务端
client.connect("192.168.12.129", 6379).sync();
} catch (Exception e){
e.printStackTrace();
}
}
private static ByteBuf respCommand(String command){
// 先对传入的指令以空格进行切割
String[] commands = command.split(" ");
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// 遵循RESP协议:先写入指令的个数
buffer.writeBytes(("*" + commands.length).getBytes());
buffer.writeBytes(LINE);
// 接着别离写入每个指令的长度以及详细值
for (String s : commands) {
buffer.writeBytes(("$" + s.length()).getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes(s.getBytes());
buffer.writeBytes(LINE);
}
// 把转换成RESP格局的指令回来
return buffer;
}
}
在上述这个事例中,也只是只是经过respCommand()
这个办法,对用户输入的指令进行了转换,一起在上面经过Netty
,与Redis
的地址、端口树立了衔接,在衔接树立成功后,就会向Redis
发送一条转换成RESP
指令的set
指令,接着等候Redis
的呼应成果并输出,如下:
+OK
因为这是一条写指令,所以当Redis
收到履行完结后,终究就会回来一个OK
,我们也可直接去Redis
中查询,也仍旧能够查询到刚刚写入的name
这个键值。
1.2、HTTP超文本传输协议
前面我们自己针关于Redis
的RESP
协议,对用户指令进行了封装,然后发往Redis
履行,但关于这些常用的协议,Netty
早已供给好了现成的处理器,想要运用时无需从头开发,能够直接运用现成的处理器来完结,比方现在我们能够依据Netty
供给的处理器,完结一个简略的HTTP
服务器,代码如下:
// 依据Netty供给的处理器完结HTTP服务器
public class HttpServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server
.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 增加一个Netty供给的HTTP处理器
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx,
Object msg) throws Exception {
// 在这儿输出一下音讯的类型
System.out.println("音讯类型:" + msg.getClass());
super.channelRead(ctx, msg);
}
});
pipeline.addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx,
HttpRequest msg) throws Exception {
System.out.println("客户端的恳求路径:" + msg.uri());
// 创立一个呼应方针,版别号与客户端保持一致,状况码为OK/200
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(
msg.protocolVersion(),
HttpResponseStatus.OK);
// 结构呼应内容
byte[] content = "<h1>Hi, ZhuZi!</h1>".getBytes();
// 设置呼应头:告诉客户端本次呼应的数据长度
response.headers().setInt(
HttpHeaderNames.CONTENT_LENGTH,content.length);
// 设置呼应主体
response.content().writeBytes(content);
// 向客户端写入呼应数据
ctx.writeAndFlush(response);
}
});
}
})
.bind("127.0.0.1",8888)
.sync();
}
}
在该事例中,我们就未曾手动对HTTP
的数据包进行拆包处理了,而是在服务端的pipeline
上增加了一个HttpServerCodec
处理器,这个处理器是Netty
官方供给的,其类承继联系如下:
public final class HttpServerCodec
extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>
implements SourceCodec {
// ......
}
调查会发现,该类承继自CombinedChannelDuplexHandler
这个组合类,它组合了编码器、解码器,这也就意味着HttpServerCodec
即能够对客户端的数据做解码,也能够对服务端呼应的数据做编码,一起除开增加了这个处理器外,在第二个处理器中打印了一下客户端的音讯类型,终究一个处理器中,对客户端的恳求做出了呼应,其实也便是回来了一句话算了。
此刻在浏览器输入http://127.0.0.1:8888/index.html
,成果如下:
音讯类型:class io.netty.handler.codec.http.DefaultHttpRequest
音讯类型:class io.netty.handler.codec.http.LastHttpContent$1
客户端的恳求路径:/index.html
此刻来看成果,客户端的恳求会被解析成两个部分,榜首个是恳求信息,第二个是主体信息,但按理来说浏览器发出的恳求,归于GET
类型的恳求,GET
恳求是没有恳求体信息的,但Netty
仍旧会解析成两部分~,只不过GET
恳求的第二部分是空的。
在第三个处理器中,我们直接向客户端回来了一个h1
标签,一起也要记得在呼应头里边,加上呼应内容的长度信息,不然浏览器的加载圈,会一向不同的转动,究竟浏览器也不知道内容有多长,就会一向反复加载,尝试等候更多的数据。
1.3、自界说音讯传输协议
Netty
除开供给了HTTP
协议的处理器外,还供给了DNS、HaProxy、MemCache、MQTT、Protobuf、Redis、SCTP、RTSP.....
一系列协议的完结,详细界说位于io.netty.handler.codec
这个包下,当然,我们也能够自己完结自界说协议,依照自己的逻辑对数据进行编解码处理。
很多依据Netty
开发的中间件/组件,其内部根本上都开发了专属的通讯协议,以此来作为不同节点间通讯的根底,所以解下来我们依据Netty
也来自己规划一款通讯协议,这也会作为后续完结谈天程序时的根底。
但所谓的协议规划,其实只是只需求依照必定束缚,完结编码器与解码器即可,发送方在发出数据之前,会经过编码器对数据进行处理,而接纳方在收到数据之前,则会由解码器对数据进行处理。
1.3.1、自界说协议的要素
在自界说传输协议时,我们必定需求考虑几个因素,如下:
- 魔数:用来榜首时间判别是否为自己需求的数据包。
- 版别号:提高协议的拓宽性,便利后续对协议进行晋级。
- 序列化算法:音讯正文详细该运用哪种办法进行序列化传输,例如
Json、ProtoBuf、JDK...
。 - 音讯类型:榜首时间判别出当时音讯的类型。
- 音讯序号:为了完结双工通讯,客户端和服务端之间收/发音讯不会彼此阻塞。
- 正文长度:供给给
LTC
解码器运用,防止解码时呈现粘包、半包的现象。 - 音讯正文:本次音讯要传输的详细数据。
在规划协议时,一个完好的协议应该包含上述所说的几方面,这样才干供给两边通讯时的根底,依据上述几个字段,能够在榜首时间内判别出:音讯是否可用、当时协议版别、音讯的详细类型、音讯的长度等各类信息,然后给后续处理器运用(自界说的协议规矩本身便是一个编解码处理器算了)。
1.3.2、自界说协议实战
前面简略聊到过,所谓的自界说协议便是自己规矩音讯格局,以及自己完结编/解码器对音讯完结封装/拆解,所以这儿想要自界说一个音讯协议,就只需求满意前面两个条件即可,因而完结如下:
@ChannelHandler.Sharable
public class ChatMessageCodec extends MessageToMessageCodec<ByteBuf, Message> {
// 音讯出站时会经过的编码办法(将原生音讯方针封装成自界说协议的音讯格局)
@Override
protected void encode(ChannelHandlerContext ctx, Message msg,
List<Object> list) throws Exception {
ByteBuf outMsg = ctx.alloc().buffer();
// 前五个字节作为魔数
byte[] magicNumber = new byte[]{'Z','h','u','Z','i'};
outMsg.writeBytes(magicNumber);
// 一个字节作为版别号
outMsg.writeByte(1);
// 一个字节表明序列化办法 0:JDK、1:Json、2:ProtoBuf.....
outMsg.writeByte(0);
// 一个字节用于表明音讯类型
outMsg.writeByte(msg.getMessageType());
// 四个字节表明音讯序号
outMsg.writeInt(msg.getSequenceId());
// 运用Java-Serializable的办法对音讯方针进行序列化
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] msgBytes = bos.toByteArray();
// 运用四个字节描绘音讯正文的长度
outMsg.writeInt(msgBytes.length);
// 将序列化后的音讯方针作为音讯正文
outMsg.writeBytes(msgBytes);
// 将封装好的数据传递给下一个处理器
list.add(outMsg);
}
// 音讯入站时会经过的解码办法(将自界说格局的音讯转变为详细的音讯方针)
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf inMsg, List<Object> list) throws Exception {
// 读取前五个字节得到魔数
byte[] magicNumber = new byte[5];
inMsg.readBytes(magicNumber,0,5);
// 再读取一个字节得到版别号
byte version = inMsg.readByte();
// 再读取一个字节得到序列化办法
byte serializableType = inMsg.readByte();
// 再读取一个字节得到音讯类型
byte messageType = inMsg.readByte();
// 再读取四个字节得到音讯序号
int sequenceId = inMsg.readInt();
// 再读取四个字节得到音讯正文长度
int messageLength = inMsg.readInt();
// 再依据正文长度读取序列化后的字节正文数据
byte[] msgBytes = new byte[messageLength];
inMsg.readBytes(msgBytes,0,messageLength);
// 关于读取到的音讯正文进行反序列化,终究得到详细的音讯方针
ByteArrayInputStream bis = new ByteArrayInputStream(msgBytes);
ObjectInputStream ois = new ObjectInputStream(bis);
Message message = (Message) ois.readObject();
// 终究把反序列化得到的音讯方针传递给后续的处理器
list.add(message);
}
}
上面自界说的处理器中,承继了MessageToMessageCodec
类,首要担任将数据在原生ByteBuf
与Message
之间进行彼此转换,而Message
方针是自界说的音讯方针,这儿暂且无需过多关怀。其间首要完结了两个办法:
-
encode()
:出站时会经过的编码办法,会将原生音讯方针按自界说的协议封装成对应的字节数据。 -
decode()
:入站时会经过的解码办法,会将协议格局的字节数据,转变为详细的音讯方针。
上述自界说的协议,也便是必定规矩的字节数据,每条音讯数据的组成如下:
- 魔数:运用第
1~5
个字节来描绘,这个魔数值能够按自己的主意自界说。 - 版别号:运用第
6
个字节来描绘,不同数字表明不同版别。 - 序列化算法:运用第
7
个字节来描绘,不同数字表明不同序列化办法。 - 音讯类型:运用第
8
个字节来描绘,不同的音讯类型运用不同数字表明。 - 音讯序号:运用第
9~12
个字节来描绘,其实便是一个四字节的整数。 - 正文长度:运用第
13~16
个字节来描绘,也是一个四字节的整数。 - 音讯正文:长度不固定,依据每次详细发送的数据来决定。
在其间,为了完结简略,这儿的序列化办法,则选用的是JDK
默认的Serializable
接口办法,但这种办法生成的方针字节较大,实践情况中最好仍是挑选谷歌的ProtoBuf
办法,这种算法归于序列化算法中,功用最佳的一种落地完结。
当然,这个自界说的协议是供给给后续的谈天事务运用的,但这种实战型的内容分享,根本上代码量较高,所以我们看起来会有些单调,而本文所运用的谈天室事例,是依据《B站-黑马Netty视频教程》二次改良的,因而如若感觉文字描绘较为单调,可直接点击前面给出的链接,观看
P101~P121
视频进行学习。
终究来调查一下,我们会发现,在我们界说的这个协议编解码处理器上,存在着一个@ChannelHandler.Sharable
注解,这个注解的作用是干吗的呢?其实很简略,用来标识当时处理器是否可在多线程环境下运用,假如带有该注解的处理器,则表明能够在多个通道间共用,因而只需求创立一个即可,反之同理,假如不带有该注解的处理器,则每个通道需求单独创立运用。
二、依据Netty打造IM谈天程序
前面简略过了一下自界说协议后,接着来依据Netty
结构上手一个真实的实战项目,那也便是依据Netty
打造一款IM
即时通讯的谈天程序,这儿在完结过程中,只是只会给出中心完结,但终究会供给完好代码的Github
链接,因而我们要点了解中心即可。
2.1、IM程序的用户模块
谈天、谈天,天然是得先有人,然后才干进行谈天沟通,与QQ、微信类似,假如你想要运用某款谈天程序时,条件都得是先具有一个对应的账户才行,因而在我们规划IM
体系之处,那也需求对应的用户功用完结,但这儿为了简略,相同不再结合数据库完结完好的用户模块了,而是依据内存完结用户的办理,如下:
public interface UserService {
boolean login(String username, String password);
}
这是用户模块的顶层接口,只是只供给了一个登录接口,关于注册、鉴权、等级…..等一系列功用,我们感爱好的可在后续进行拓宽完结,接着来看看该接口的完结类,如下:
public class UserServiceMemoryImpl implements UserService {
private Map<String, String> allUserMap = new ConcurrentHashMap<>();
{
// 在代码块中对用户列表进行初始化,向其间增加了两个用户信息
allUserMap.put("ZhuZi", "123");
allUserMap.put("XiongMao", "123");
}
@Override
public boolean login(String username, String password) {
String pass = allUserMap.get(username);
if (pass == null) {
return false;
}
return pass.equals(password);
}
}
这个完结类并未结合数据库来完结,而是只是在程序发动时,经过代码块的办法,加载了ZhuZi、XiongMao
两个用户信息并放入内存的Map
容器中,这儿有爱好的小伙伴,可自即将Map
容器换成数据库的表即可。
其间完结的login()
登录接口尤为简略,只是只是判别了一下有没有对应用户,假如有的话则看看暗码是否正确,正确回来true
,暗码过错则回来false
,是的,我所写的登录功用便是这么简略,走个简略的过场,哈哈哈~
2.1.1、服务端、客户端的根底架构
根本的用户模块有了,但这儿还未曾套入详细完结,因而先简略的建立出服务端、客户端的架构,然后再依据构建好的架构完结根底的用户登录功用,服务端的根底建立如下:
public class ChatServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
ChatMessageCodec MESSAGE_CODEC = new ChatMessageCodec();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(MESSAGE_CODEC);
}
});
Channel channel = serverBootstrap.bind(8888).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("服务端呈现过错:" + e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
服务端的代码现在很简略,只是只是装载了一个自己的协议编/解码处理器,然后便是一些老过程,不再过多的重复赘述,接着再来建立一个简略的客户端,代码完结如下:
public class ChatClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
ChatMessageCodec MESSAGE_CODEC = new ChatMessageCodec();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(MESSAGE_CODEC);
}
});
Channel channel = bootstrap.connect("localhost", 8888).sync().channel();
channel.closeFuture().sync();
} catch (Exception e) {
System.out.println("客户端呈现过错:" + e);
} finally {
group.shutdownGracefully();
}
}
}
现在只是只是与服务端树立了衔接,然后装载了一个自界说的编解码器,到这儿就建立了最根本的服务端、客户端的根底架构,接着来依据它完结简略的登录功用。
2.1.2、用户登录功用的完结
关于登录功用,因为需求在服务端与客户端之间传输数据,因而我们能够规划一个音讯方针,但因为后续单聊、群聊都需求发送不同的音讯格局,因而先规划出一个父类,如下:
public abstract class Message implements Serializable {
private int sequenceId;
private int messageType;
@Override
public String toString() {
return "Message{" +
"sequenceId=" + sequenceId +
", messageType=" + messageType +
'}';
}
public int getSequenceId() {
return sequenceId;
}
public void setSequenceId(int sequenceId) {
this.sequenceId = sequenceId;
}
public void setMessageType(int messageType) {
this.messageType = messageType;
}
public abstract int getMessageType();
public static final int LoginRequestMessage = 0;
public static final int LoginResponseMessage = 1;
public static final int ChatRequestMessage = 2;
public static final int ChatResponseMessage = 3;
public static final int GroupCreateRequestMessage = 4;
public static final int GroupCreateResponseMessage = 5;
public static final int GroupJoinRequestMessage = 6;
public static final int GroupJoinResponseMessage = 7;
public static final int GroupQuitRequestMessage = 8;
public static final int GroupQuitResponseMessage = 9;
public static final int GroupChatRequestMessage = 10;
public static final int GroupChatResponseMessage = 11;
public static final int GroupMembersRequestMessage = 12;
public static final int GroupMembersResponseMessage = 13;
public static final int PingMessage = 14;
public static final int PongMessage = 15;
}
在这个音讯父类中,界说了多种音讯类型的状况码,不同的音讯类型对应不同数字,一起其间还规划了一个笼统办法,即getMessageType()
,该办法交给详细的子类完结,每个子类回来各自的音讯类型,为了便利后续拓宽,这儿又创立了一个笼统类作为中间类,如下:
public abstract class AbstractResponseMessage extends Message {
private boolean success;
private String reason;
public AbstractResponseMessage() {
}
public AbstractResponseMessage(boolean success, String reason) {
this.success = success;
this.reason = reason;
}
@Override
public String toString() {
return "AbstractResponseMessage{" +
"success=" + success +
", reason='" + reason + '\'' +
'}';
}
public boolean isSuccess() {
return success;
}
public void setSuccess(boolean success) {
this.success = success;
}
public String getReason() {
return reason;
}
public void setReason(String reason) {
this.reason = reason;
}
}
这个类首要是供给给呼应时运用的,其间包含了呼应状况以及呼应信息,接着再规划两个登录时会用到的音讯方针,如下:
public class LoginRequestMessage extends Message {
private String username;
private String password;
public LoginRequestMessage() {
}
@Override
public String toString() {
return "LoginRequestMessage{" +
"username='" + username + '\'' +
", password='" + password + '\'' +
'}';
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public LoginRequestMessage(String username, String password) {
this.username = username;
this.password = password;
}
@Override
public int getMessageType() {
return LoginRequestMessage;
}
}
上述这个音讯类,首要是供给给客户端登录时运用,本质上也便是一个包含用户名、用户暗码的方针算了,一起还有一个用来给服务端呼应时的呼应类,如下:
public class LoginResponseMessage extends AbstractResponseMessage {
public LoginResponseMessage(boolean success, String reason) {
super(success, reason);
}
@Override
public int getMessageType() {
return LoginResponseMessage;
}
}
登录呼应类的完结非常简略,由登录状况和登录音讯组成,OK,接着来看看登录的详细完结。
首要在客户端中,再经过pipeline
增加一个处理器,如下:
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
AtomicBoolean LOGIN = new AtomicBoolean(false);
AtomicBoolean EXIT = new AtomicBoolean(false);
Scanner scanner = new Scanner(System.in);
ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 担任接纳用户在控制台的输入,担任向服务器发送各种音讯
new Thread(() -> {
System.out.println("请输入用户名:");
String username = scanner.nextLine();
if(EXIT.get()){
return;
}
System.out.println("请输入暗码:");
String password = scanner.nextLine();
if(EXIT.get()){
return;
}
// 结构音讯方针
LoginRequestMessage message = new LoginRequestMessage(username, password);
System.out.println(message);
// 发送音讯
ctx.writeAndFlush(message);
System.out.println("等候后续操作...");
try {
WAIT_FOR_LOGIN.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 假如登录失利
if (!LOGIN.get()) {
ctx.channel().close();
return;
}
}).start();
}
在与服务端树立衔接成功之后,就提示用户需求登录,接着接纳用户输入的用户名、暗码,然后构建出一个LoginRequestMessage
音讯方针,接着将其发送给服务端,因为前面装载了自界说的协议编解码器,所以音讯在出站时,这个Message
方针会被序列化成字节码,接着再服务端入站时,又会被反序列化成音讯方针,接着来看看服务端的完结,如下:
@ChannelHandler.Sharable
public class LoginRequestMessageHandler
extends SimpleChannelInboundHandler<LoginRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx,
LoginRequestMessage msg) throws Exception {
String username = msg.getUsername();
String password = msg.getPassword();
boolean login = UserServiceFactory.getUserService().login(username, password);
LoginResponseMessage message;
if (login) {
SessionFactory.getSession().bind(ctx.channel(), username);
message = new LoginResponseMessage(true, "登录成功");
} else {
message = new LoginResponseMessage(false, "用户名或暗码不正确");
}
ctx.writeAndFlush(message);
}
}
在服务端中,新增了一个处理器类,承继自SimpleChannelInboundHandler
这个处理器,其间指定的泛型为LoginRequestMessage
,这表明当时处理器只重视这个类型的音讯,当呈现登录类型的音讯时,会进入该处理器并触发内部的channelRead0()
办法。
在该办法中,获取了登录音讯中的用户名、暗码,接着对其做了根本的登录效验,假如用户名存在并且暗码正确,就会回来登录成功,不然会回来登录失利,终究登录后的状况会被封装成一个LoginResponseMessage
方针,然后写回客户端的通道中。
当然,为了该处理器能够成功收效,这儿需求将其装载到服务端的pipeline
上,如下:
LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();
ch.pipeline().addLast(LOGIN_HANDLER);
装载好登录处理器后,接着别离发动服务端、客户端,测验成果如下:
从图中的作用来看,这儿完结了最根本的登录功用,估量有些小伙伴看到这儿就有些晕了,但其实非常简略,只是只是经过Netty
在做数据交互算了,客户端则供给输入用户名、暗码的功用,然后将用户输入的称号、暗码发送给服务端,服务端供给登录判别的功用,终究依据判别成果再向客户端回来数据算了。
2.2、依据Netty完结点对点单聊
有了根本的用户登录功用后,接着来看看怎样完结点对点的单聊功用呢?首要我界说了一个会话接口,如下:
public interface Session {
void bind(Channel channel, String username);
void unbind(Channel channel);
Channel getChannel(String username);
}
这个接口中仍旧只要三个办法,释义如下:
-
bind()
:传入一个用户名和Socket
通道,让两者之间的产生绑定联系。 -
unbind()
:撤销一个用户与某个Socket
通道的绑定联系。 -
getChannel()
:依据一个用户名,获取与其存在绑定联系的通道。
该接口的完结类如下:
public class SessionMemoryImpl implements Session {
private final Map<String, Channel> usernameChannelMap = new ConcurrentHashMap<>();
private final Map<Channel, String> channelUsernameMap = new ConcurrentHashMap<>();
@Override
public void bind(Channel channel, String username) {
usernameChannelMap.put(username, channel);
channelUsernameMap.put(channel, username);
channelAttributesMap.put(channel, new ConcurrentHashMap<>());
}
@Override
public void unbind(Channel channel) {
String username = channelUsernameMap.remove(channel);
usernameChannelMap.remove(username);
channelAttributesMap.remove(channel);
}
@Override
public Channel getChannel(String username) {
return usernameChannelMap.get(username);
}
@Override
public String toString() {
return usernameChannelMap.toString();
}
}
该完结类最要害的是其间的两个Map
容器,usernameChannelMap
用来存储一切用户名与Socket
通道的绑定联系,而channelUsernameMap
则是反过来的顺序,这首要是为了便利,即能够经过用户名获得对应通道,也能够经过通道判别出用户名,实践上一个Map
也能搞定,但仍是那句话,首要为了简略嘛~
有了上述这个最简略的会话办理功用后,就要着手完结详细的功用了,其实在前面完结登录功用的时候,就用过这其间的bind()
办法,也便是当登录成功之后,就会将当时发送登录音讯的通道,与正在登录的用户名产生绑定联系,这样就便利后续完结单聊、群聊的功用。
2.2.1、界说单聊的音讯方针
与登录时相同,因为需求在服务端和客户端之间完结数据的转发,因而这儿也需求两个音讯方针,用来作为数据交互的音讯格局,如下:
public class ChatRequestMessage extends Message {
private String content;
private String to;
private String from;
public ChatRequestMessage() {
}
public ChatRequestMessage(String from, String to, String content) {
this.from = from;
this.to = to;
this.content = content;
}
// 省掉Get/Setting、toString()办法.....
}
上述这个类,是供给给客户端用来发送音讯数据的,其间首要包含了三个值,谈天的音讯内容、发送人与接纳人,因为这儿是需求完结一个IM
谈天程序,所以并不是客户端与服务端进行数据交互,而是客户端与客户端之间进行数据交互,服务端只是只供给音讯转发的功用,接着再构建一个音讯类,如下:
public class ChatResponseMessage extends AbstractResponseMessage {
private String from;
private String content;
@Override
public String toString() {
return "ChatResponseMessage{" +
"from='" + from + '\'' +
", content='" + content + '\'' +
'}';
}
public ChatResponseMessage(boolean success, String reason) {
super(success, reason);
}
public ChatResponseMessage(String from, String content) {
this.from = from;
this.content = content;
}
@Override
public int getMessageType() {
return ChatResponseMessage;
}
// 省掉Get/Setting、toString()办法.....
}
这个类是供给给服务端用来转发的,当服务端收到一个谈天音讯后,因为谈天音讯中包含了接纳人,所以能够先依据接纳人的用户名,找到对应的客户端通道,然后再封装成一个呼应音讯,转发给对应的客户端即可,下面来做详细完结。
2.2.2、完结点对点单聊功用
因为谈天功用是供给给客户端运用的,所以当一个客户端登录成功之后,应该露出给用户一个操作菜单,所以直接在本来客户端的channelActive()
办法中,登录成功之后持续加代码即可,代码如下:
while (true) {
System.out.println("==================================");
System.out.println("\t1、发送单聊音讯");
System.out.println("\t2、发送群聊音讯");
System.out.println("\t3、创立一个群聊");
System.out.println("\t4、获取群聊成员");
System.out.println("\t5、参加一个群聊");
System.out.println("\t6、退出一个群聊");
System.out.println("\t7、退出谈天体系");
System.out.println("==================================");
String command = scanner.nextLine();
}
首要会开启一个死循环,然后不断接纳用户的操作,接着运用switch
语法来对详细的菜单功用进行完结,先完结单聊功用,如下:
switch (command){
case "1":
System.out.print("请挑选你要发送音讯给谁:");
String toUserName = scanner.nextLine();
System.out.print("请输入你要发送的音讯内容:");
String content = scanner.nextLine();
ctx.writeAndFlush(new ChatRequestMessage(username, toUserName, content));
break;
}
假如用户挑选了单聊,接着会提示用户挑选要发送音讯给谁,这儿也便是让用户输入对方的用户名,实践上假如有界面的话,这一步是并不需求用户自己输入的,而是供给窗口让用户点击,比方QQ、微信一样,想要给某个人发送音讯时,只需求点击“他”的头像私聊即可。
等用户挑选了谈天方针,并且输入了音讯内容后,接着会构建一个ChatRequestMessage
音讯方针,然后会发送给服务端,但这儿先不看服务端的完结,客户端这边还需求重写一个办法,如下:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到音讯:" + msg);
if ((msg instanceof LoginResponseMessage)) {
LoginResponseMessage response = (LoginResponseMessage) msg;
if (response.isSuccess()) {
// 假如登录成功
LOGIN.set(true);
}
// 唤醒 system in 线程
WAIT_FOR_LOGIN.countDown();
}
}
前面的逻辑是在channelActive()
办法中完结的,也便是衔接树立成功后,就会让用户登录,接着登录成功之后会给用户一个菜单栏,供给给用户进行操作,但前面的逻辑中一向没有对服务端呼应的音讯进行处理,因而channelRead()
办法中会对服务端呼应的数据进行处理。
channelRead()
办法会在有数据可读时被触发,所以当服务端呼应数据时,首要会判别一下:现在服务端呼应的是不是登录音讯,假如是的话,则需求依据登录的成果来唤醒前面channelActive()
办法中的线程。假如现在服务端呼应的不是登录音讯,这也就意味着客户端前面现已登录成功了,所以接着会直接打印一下收到的数据。
OK,有了上述客户端的代码完结后,接着再来服务端多创立一个处理器,如下:
@ChannelHandler.Sharable
public class ChatRequestMessageHandler
extends SimpleChannelInboundHandler<ChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx,
ChatRequestMessage msg) throws Exception {
String to = msg.getTo();
Channel channel = SessionFactory.getSession().getChannel(to);
// 在线
if (channel != null) {
channel.writeAndFlush(new ChatResponseMessage(
msg.getFrom(), msg.getContent()));
}
// 不在线
else {
ctx.writeAndFlush(new ChatResponseMessage(
false, "对方用户不存在或许不在线"));
}
}
}
这儿仍旧经过承继SimpleChannelInboundHandler
类的形式,来特别重视ChatRequestMessage
单聊类型的音讯,假如现在服务端收到的是单聊音讯,则会进入触发该处理器的channelRead0()
办法,该处理器内部的逻辑也并不杂乱,首要依据单聊音讯的接纳人,去找一下与之对应的通道:
- 假如依据用户名查到了通道,表明接纳人现在是登录在线状况。
- 反之,假如无法依据用户名找到通道,表明对应的用户不存在或许没有登录。
接着会依据上面的查询成果,进行对应的成果回来:
- 假如在线:把要发送的单聊音讯,直接写入至找到的通道中。
- 假如不在线:向发送单聊音讯的客户端,回来用户不存在或用户不在线。
有了这个处理器之后,接着还需求把该处理器装载到服务端上,如下:
ChatRequestMessageHandler CHAT_HANDLER = new ChatRequestMessageHandler();
ch.pipeline().addLast(CHAT_HANDLER);
装载好单聊处理器后,接着别离发动一个服务端、两个客户端,测验成果如下:
从测验成果中能够显着看出作用,其间的单聊功用确完成已完结,能够完结A→B
用户之间的单聊功用,两者之间借助服务器转发,能够完结两人私聊的功用。
2.3、依据Netty打造多人谈天室
前面完结了两个用户之间的私聊功用,接着再来完结一个多人谈天室的功用,究竟像QQ、微信、钉钉….等任何通讯软件,都支撑多人树立群聊的功用,但多人谈天室的功用,完结之前还需求先完结建群的功用,究竟假如群都没树立,天然无法向某个群内发送数据。
完结拉群也好,群聊也罢,其完结过程仍旧和前面相同,如下:
- ①先界说对应的音讯方针。
- ②完结客户端发送对应音讯数据的功用。
- ③再写一个服务端的群聊处理器,然后装载到服务端上。
2.3.1、界说拉群的音讯体
首要来界说两个拉群时用的音讯体,如下:
public class GroupCreateRequestMessage extends Message {
private String groupName;
private Set<String> members;
public GroupCreateRequestMessage(String groupName, Set<String> members) {
this.groupName = groupName;
this.members = members;
}
@Override
public int getMessageType() {
return GroupCreateRequestMessage;
}
// 省掉其他Get/Settings、toString()办法.....
}
上述这个音讯体是供给给客户端运用的,其间首要存在两个成员,也便是群称号与群成员列表,寄存一切群成员的容器选用了Set
调集,因为Set
调集具有不行重复性,因而能够有效的避免同一用户屡次进群,接着再来看看服务端呼应时用的音讯体,如下:
public class GroupCreateResponseMessage extends AbstractResponseMessage {
public GroupCreateResponseMessage(boolean success, String reason) {
super(success, reason);
}
@Override
public int getMessageType() {
return GroupCreateResponseMessage;
}
}
这个音讯体的完结尤为简略,只是只是给客户端回来了拉群状况以及拉群的附加信息。
2.3.2、界说群聊会话办理
前面单聊有单聊的会话办理机制,而完结多人群聊时,仍旧需求有群聊的会话办理机制,首要封装了一个群聊实体类,如下:
public class Group {
// 谈天室称号
private String name;
// 谈天室成员
private Set<String> members;
public static final Group EMPTY_GROUP = new Group("empty", Collections.emptySet());
public Group(String name, Set<String> members) {
this.name = name;
this.members = members;
}
// 省掉其他Get/Settings、toString()办法.....
}
接着界说了一个群聊会话的顶级接口,如下:
public interface GroupSession {
// 创立一个群聊
Group createGroup(String name, Set<String> members);
// 参加某个群聊
Group joinMember(String name, String member);
// 移除群聊中的某个成员
Group removeMember(String name, String member);
// 闭幕一个群聊
Group removeGroup(String name);
// 获取一个群聊的成员列表
Set<String> getMembers(String name);
// 获取一个群聊一切在线用户的Channel通道
List<Channel> getMembersChannel(String name);
}
上述接口中,供给了几个接口办法,其实也首要是群聊体系中的一些日常操作,如创群、加群、踢人、闭幕群、查看群成员….等功用,接着来看看该接口的完结者,如下:
public class GroupSessionMemoryImpl implements GroupSession {
private final Map<String, Group> groupMap = new ConcurrentHashMap<>();
@Override
public Group createGroup(String name, Set<String> members) {
Group group = new Group(name, members);
return groupMap.putIfAbsent(name, group);
}
@Override
public Group joinMember(String name, String member) {
return groupMap.computeIfPresent(name, (key, value) -> {
value.getMembers().add(member);
return value;
});
}
@Override
public Group removeMember(String name, String member) {
return groupMap.computeIfPresent(name, (key, value) -> {
value.getMembers().remove(member);
return value;
});
}
@Override
public Group removeGroup(String name) {
return groupMap.remove(name);
}
@Override
public Set<String> getMembers(String name) {
return groupMap.getOrDefault(name, Group.EMPTY_GROUP).getMembers();
}
@Override
public List<Channel> getMembersChannel(String name) {
return getMembers(name).stream()
.map(member -> SessionFactory.getSession().getChannel(member))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
}
这个完结类没啥好说的,要点记住里边有个Map
容器即可,这个容器首要担任存储一切群称号与Group
群聊方针的联系,后续能够经过群聊称号,在这个容器中找到一个对应群聊方针。一起为了便利后续调用这些接口,还供给了一个东西类,如下:
public abstract class GroupSessionFactory {
private static GroupSession session = new GroupSessionMemoryImpl();
public static GroupSession getGroupSession() {
return session;
}
}
很简略,只是只实例化了一个群聊会话办理的完结类,因为这儿没有结合Spring
来完结,所以并不能依托IOC
技术来自动办理Bean
,因而我们需求手动创立出一个实例,以供于后续运用。
2.3.3、完结拉群功用
前面客户端的功用菜单中,3
对应着拉群功用,所以我们需求对3
做详细的功用完结,逻辑如下:
case "3":
System.out.print("请输入你要创立的群聊昵称:");
String newGroupName = scanner.nextLine();
System.out.print("请挑选你要邀请的群成员(不同成员用、切割):");
String members = scanner.nextLine();
Set<String> memberSet = new HashSet<>(Arrays.asList(members.split("、")));
memberSet.add(username); // 参加自己
ctx.writeAndFlush(new GroupCreateRequestMessage(newGroupName, memberSet));
break;
在该分支完结中,首要会要求用户输入一个群聊昵称,接着需求输入需求拉入群聊的用户称号,多个用户之间运用、
切割,接着会把用户输入的群成员以及自己,悉数放入到一个Set
调集中,终究组装成一个拉群音讯体,发送给服务端处理,服务端的处理器如下:
@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler
extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx,
GroupCreateRequestMessage msg) throws Exception {
String groupName = msg.getGroupName();
Set<String> members = msg.getMembers();
// 群办理器
GroupSession groupSession = GroupSessionFactory.getGroupSession();
Group group = groupSession.createGroup(groupName, members);
if (group == null) {
// 发生成功音讯
ctx.writeAndFlush(new GroupCreateResponseMessage(true,
groupName + "创立成功"));
// 发送拉群音讯
List<Channel> channels = groupSession.getMembersChannel(groupName);
for (Channel channel : channels) {
channel.writeAndFlush(new GroupCreateResponseMessage(
true, "您已被拉入" + groupName));
}
} else {
ctx.writeAndFlush(new GroupCreateResponseMessage(
false, groupName + "现已存在"));
}
}
}
这儿仍旧承继了SimpleChannelInboundHandler
类,只关怀拉群的音讯,当客户端呈现拉群音讯时,首要会获取用户输入的群昵称和群成员,接着经过前面供给的创群接口,尝试创立一个群聊,假如群聊现已存在,则会创立失利,反之则会创立成功,在创立群聊成功的情况下,会给一切的群成员发送一条“你已被拉入[XXX]”的音讯。
终究,相同需求将该处理器装载到服务端上,如下:
GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER =
new GroupCreateRequestMessageHandler();
ch.pipeline().addLast(GROUP_CREATE_HANDLER);
终究别离发动一个服务端、两个客户端进行作用测验,如下:
从上图的测验成果来看,确实完结了我们的拉群作用,一个用户拉群之后,被邀请的成员都会收到来自于服务端的拉群提醒,这也就为后续群聊功用奠定了根底。
2.3.4、界说群聊的音讯体
这儿就不重复赘述了,仍是之前的套路,界说一个客户端用的音讯体,如下:
public class GroupChatRequestMessage extends Message {
private String content;
private String groupName;
private String from;
public GroupChatRequestMessage(String from, String groupName, String content) {
this.content = content;
this.groupName = groupName;
this.from = from;
}
@Override
public int getMessageType() {
return GroupChatRequestMessage;
}
// 省掉其他Get/Settings、toString()办法.....
}
这个是客户端用来发送群聊音讯的音讯体,其间存在三个成员,发送人、群聊昵称、音讯内容,经过这三个成员,能够描绘清楚任何一条群聊记载,接着来看看服务端呼应时用的音讯体,如下:
public class GroupChatResponseMessage extends AbstractResponseMessage {
private String from;
private String content;
public GroupChatResponseMessage(boolean success, String reason) {
super(success, reason);
}
public GroupChatResponseMessage(String from, String content) {
this.from = from;
this.content = content;
}
@Override
public int getMessageType() {
return GroupChatResponseMessage;
}
// 省掉其他Get/Settings、toString()办法.....
}
在这个音讯体中,就省去了群聊昵称这个成员,因为这个音讯体的用途,首要是给服务端转发给客户端时运用的,因而不需求群聊昵称,当然,要也能够,我这儿就直接省去了。
2.3.5、完结群聊功用
仍旧先来做客户端的完结,完结了客户端之后再去完结服务端的完结,客户端完结如下:
case "2":
System.out.print("请挑选你要发送音讯的群聊:");
String groupName = scanner.nextLine();
System.out.print("请输入你要发送的音讯内容:");
String groupContent = scanner.nextLine();
ctx.writeAndFlush(new GroupChatRequestMessage(username, groupName, groupContent));
break;
因为发送群聊音讯对应着之前菜单中的2
,所以这儿对该分支进行完结,当用户挑选发送群聊音讯时,首要会让用户自己先挑选一个群聊,接着输入要发送的音讯内容,接着组装成一个群聊音讯方针,发送给服务端处理,服务端的完结如下:
@ChannelHandler.Sharable
public class GroupChatRequestMessageHandler
extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx,
GroupChatRequestMessage msg) throws Exception {
List<Channel> channels = GroupSessionFactory.getGroupSession()
.getMembersChannel(msg.getGroupName());
for (Channel channel : channels) {
channel.writeAndFlush(new GroupChatResponseMessage(
msg.getFrom(), msg.getContent()));
}
}
}
这儿仍旧界说了一个处理器,关于原因就不再重复烦琐了,服务端关于群聊音讯的完结额定简略,也便是先依据用户挑选的群昵称,找到该群一切的群成员,然后依次遍历成员列表,获取对应的Socket
通道,转发音讯即可。
接着将该处理器装载到服务端pipeline
上,然后别离发动一个服务端、两个客户端,进行作用测验,如下:
作用如上图的注释,依据上述的代码测验,作用确实达到了我们需求的群聊作用~
2.3.6、谈天室的其他功用完结
到这儿为止,完结了最根本的建群、群聊的功用,但关于踢人、加群、闭幕群….等一系列群聊功用还未曾完结,但我这儿就不持续重复了,究竟仍是那个套路:
- ①界说对应功用的音讯体。
- ②客户端向服务端发送对应格局的音讯。
- ③服务端编写处理器,对特定的音讯进行处理。
所以我们感爱好的情况下,能够依据上述过程持续进行完结,完结的过程没有任何难度,要点便是时间问题算了。
三、Netty实战篇总结
看到这儿,其实Netty
实战篇的内容也就大致结束了,个人关于实战篇的内容并不怎样满意,因为与最初设想的完结存在很大偏差,这是因为近期作业、日子状况不对,所以内容输出也没那么夯实,关于这篇中的完好代码完结,也包含前面两篇中的一些代码完结,这儿给出完好的GitHub
链接:>>>戳我访问<<<,我们感爱好能够自行Down
下去玩玩。
在我所编撰的事例中,自界说协议能够持续优化,挑选功用更强的序列化办法,而谈天室也能够进一步拓宽,比方将用户信息、群聊信息、联系人信息都结合数据库完结,进一步完结离线音讯功用,但因为该事例的规划之初就有问题,所以是存在功用问题的,想要打造一款真实高功用的IM
程序,那诸位可参阅《计算机网络总述-腾讯QQ原理》其间的内容。