为什么要自定义协议
Netty自带了一些编解码器没,比方 StringDecode,StringEncoder,在实践事务中,协议往往需求带着一些咱们自定义的特点,比方版别号,imei号,appId等,这时分Netty供给的编解码器就无法满足咱们的需求,所以咱们需求自定义协议和自定义的编解码器
自定义协议规划
咱们能够仿制HTTP协议,比方 恳求头 + 恳求体 的格局
恳求头
HTTP协议的恳求头有 恳求办法(GET,POST),版别号等,既然是自定义协议,那么肯定是要满足自己实践事务需求的,所以咱们的恳求头包含以下信息,也能够依据自己的事务去添加一些自定义的特点
commond: 指令,比方说你发送给Netty的音讯是【登录】仍是【单聊音讯】
或许是【群发音讯】又或许是【踢人下线】的恳求.
version:版别号,在后期假如晋级版别的话,要兼容老版别,咱们能够做判别,假如是老版别的就走A逻辑分支,新版别就走B逻辑分支
clientType:客户端拜访咱们的IM系统是经过WEb端,仍是IOS,或许是Android端
messageType:将客户端发送的数据解析成哪种格局,比方JSON,Protobuf,仍是Xml格局
imeiLen:imei号的长度(imei号在恳求体中)
appId:咱们的IM是以服务的方式供给出去的,咱们需求知道这个恳求是从哪个服务进来的,每个服务都有一个自定义仅有的appId
bodyLen:咱们的数据长度
恳求体
imei号:登录设备的仅有标识,虽然有了clientType来判别是从WEB端仍是IOS端拜访的,
可是并不知道是从哪台设备登录的,后期咱们要做踢人下线,比方一个账号只能一台设备登录,
或许是一个账号能一同登录WEB,或许是IOS端或许是Android端,咱们就需求跟clientType一同判别
data:咱们要发送的数据
自定义协议完结
1:创立一个Maven项目,引进Netty依靠,完好的依靠如下
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.69.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.32</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.0.M2</version>
</dependency>
</dependencies>
2:完结咱们的协议恳求头
package com.chat.model;
import lombok.Data;
import java.io.Serializable;
@Data
public class MessageHead implements Serializable {
/**
* 指令
*/
private Integer commond;
/**
* 版别号
*/
private Integer version;
/**
* clientType(WEB,IOS,Android)
*/
private Integer clientType;
/**
* 数据解析类型 和具体事务无关,后续依据解析类型解析data数据 0x0:Json,0x1:ProtoBuf,0x2:Xml,默许:0x0
*/
private Integer messageType = 0x0;
/**
* imei号长度
*/
private Integer imeiLen;
/**
* appId
*/
private Integer appId;
/**
* bodyLen,数据长度
*/
private Integer bodyLen;
}
3:完结咱们的协议恳求体
package com.chat.model;
import lombok.Data;
import java.io.Serializable;
@Data
public class MessageBody implements Serializable {
/**
* imei号
*/
private String imei;
/**
* 数据
*/
private Object data;
}
4:完结咱们的协议恳求类
package com.chat.model;
import lombok.Data;
import java.io.Serializable;
// Message便是咱们Netty服务接收到的完好的(恳求头+恳求体)数据包
@Data
public class Message implements Serializable {
private MessageHead messageHead;
private MessageBody messageBody;
}
5:完结自定义编码器
package com.chat.codec;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.chat.model.Message;
import com.chat.model.MessageBody;
import com.chat.model.MessageHead;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* 自定义编码器
*/
public class MessageDecoder extends ByteToMessageDecoder {
/**
* 协议格局:恳求头 +imei号 + 恳求体
* 恳求头: 指令(commond) + 版别号 + clientType + 音讯解析类型 + imei长度 + appId + bodyLen
* 指令:这条音讯是做什么的,比方是登录,仍是群发音讯,仍是单聊音讯,仍是踢人下线....
* 版别号:协议的版别号,对于版别晋级有帮助,比方A版别的走A逻辑,B版别的走B逻辑
* clientType:web端,IOS,Android
* 音讯解析类型:把这条音讯解析成什么样的类型,有JSON,仍是String等
* imei:虽然有clientType来标识出该用户是从WEB拜访的仍是IOS或许Android端登录的,可是这时分有二台IOS手机登录你就分辨不了了
* 所以imei号是设备的仅有标识,这样能够在用户多端登录的时分踢人下线,来完结一个账号只能一台设备登录
* appId:假如咱们的IM系统是以服务方式供给的,appId表明的是哪个服务来拜访的
* bodyLen:数据长度
* 所以恳求头的长度是:7 * 4 = 28字节
*/
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {
//咱们的恳求头有7个特点,每个特点都是int型,所以占4个字节,假如小于28个字节阐明这个恳求数据是有问题的,
if(in.readableBytes() < 28) {
return;
}
//拿到指令
int command = in.readInt();
//拿到版别号
int version = in.readInt();
//拿到clientType
int clientType = in.readInt();
//拿到音讯解析类型
int messageType = in.readInt();
//拿到imei号的长度
int imeiLen = in.readInt();
//拿到appId
int appId = in.readInt();
//拿到数据内容长度
int bodyLen = in.readInt();
//咱们的数据是以流的形式读取的,当读取到的数据长度小于 imei号长度+data长度,阐明还没有获取到完好的恳求数据,需求从头再次读取接下来TCP发送过来的数据,直到等于了就代表
//咱们现已读取到一条完好的数据了,其实这也是一种解决TCP粘包和拆包的问题
if(in.readableBytes() < (bodyLen + imeiLen)) {
//表明读取的数据还不行
in.resetReaderIndex();
return;
}
//经过imei号长度读取imei号
byte[] imeiData = new byte[imeiLen];
in.readBytes(imeiData);
String imei = new String(imeiData);
//经过bodyLen读取数据内容
byte[] bodyData = new byte[bodyLen];
in.readBytes(bodyData);
/**
* 设置恳求头
*/
MessageHead messageHead = new MessageHead();
messageHead.setCommond(command);
messageHead.setAppId(appId);
messageHead.setBodyLen(bodyData.length);
messageHead.setImeiLen(imeiData.length);
messageHead.setVersion(version);
messageHead.setClientType(clientType);
messageHead.setMessageType(messageType);
/**
* 设置恳求体
*/
MessageBody messageBody = new MessageBody();
messageBody.setImei(imei);
Message message = new Message();
message.setMessageHead(messageHead);
/**
* 依据messageType来封装恳求数据
*/
if(messageType == 0x0) {
//解析成JSON格局
String body = new String(bodyData);
com.alibaba.fastjson.JSONObject jsonObject = new com.alibaba.fastjson.JSONObject();
jsonObject.put("body",body);
messageBody.setData(jsonObject);
}else if(messageType == 0x1) {
//解析成Protobuf
}else if(messageType == 0x2) {
//解析成Xml
}
message.setMessageBody(messageBody);
//更新读索引
in.markReaderIndex();
//最后经过管道写出去
out.add(message);
}
}
6:完结自定义解码器
package com.chat.codec;
import com.chat.model.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.nio.charset.Charset;
public class MessageEncoder extends MessageToByteEncoder<Message> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception {
out.writeInt(message.getMessageHead().getCommond());
out.writeInt(message.getMessageHead().getVersion());
out.writeInt(message.getMessageHead().getClientType());
out.writeInt(message.getMessageHead().getMessageType());
out.writeInt(message.getMessageBody().getImei().getBytes(Charset.forName("utf-8")).length);
out.writeInt(message.getMessageHead().getAppId());
out.writeInt(message.getMessageBody().getData().toString().getBytes(Charset.forName("utf-8")).length);
out.writeBytes(message.getMessageBody().getImei().getBytes(Charset.forName("utf-8")));
out.writeBytes(message.getMessageBody().getData().toString().getBytes(Charset.forName("utf-8")));
}
}
7:Netty Server端
package com.chat.server;
import com.chat.codec.MessageDecoder;
import com.chat.codec.MessageEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class Server {
public static void main(String[] args) throws Exception{
// 创立两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默许为cpu核数的两倍
// bossGroup只是处理衔接恳求 ,真正的和客户端事务处理,会交给workerGroup完结
EventLoopGroup bossGroup = new NioEventLoopGroup(3);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
// 创立服务器端的发动对象
ServerBootstrap bootstrap = new ServerBootstrap();
// 运用链式编程来装备参数
bootstrap.group(bossGroup, workerGroup) //设置两个线程组
// 运用NioServerSocketChannel作为服务器的通道完结
.channel(NioServerSocketChannel.class)
// 初始化服务器衔接行列巨细,服务端处理客户端衔接恳求是次序处理的,所以同一时间只能处理一个客户端衔接。
// 多个客户端一同来的时分,服务端将不能处理的客户端衔接恳求放在行列中等候处理
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {//创立通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MessageDecoder());
ch.pipeline().addLast(new MessageEncoder());
ch.pipeline().addLast(new MyServerHandler());
//ch.pipeline().addLast(new ServerHandler());
}
});
System.out.println("netty server start。。");
// 绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,经过isDone()等办法能够判别异步事情的执行情况
// 发动服务器(并绑定端口),bind是异步操作,sync办法是等候异步操作执行完毕
ChannelFuture cf = bootstrap.bind(9000).sync();
// 给cf注册监听器,监听咱们关心的事情
/*cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口9000成功");
} else {
System.out.println("监听端口9000失利");
}
}
});*/
// 等候服务端监听端口关闭,closeFuture是异步操作
// 经过sync办法同步等候通道关闭处理完毕,这里会堵塞等候通道关闭完结,内部调用的是Object的wait()办法
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
8:Netty Server端处理器
package com.chat.server;
import cn.hutool.json.JSONUtil;
import com.chat.model.Message;
import com.chat.model.MessageBody;
import com.chat.model.MessageHead;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyServerHandler extends SimpleChannelInboundHandler<Message> {
private final static Logger logger = LoggerFactory.getLogger(MyServerHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message message) throws Exception {
System.out.println("这是客户端发送的音讯" + JSONUtil.toJsonPrettyStr(message));
Message messageResponse = new Message();
MessageHead messageHead = new MessageHead();
messageHead.setCommond(9988);
messageHead.setMessageType(0x0);
messageHead.setClientType(1);
messageHead.setVersion(2);
messageHead.setAppId(3);
String msg = "这是服务端发送给你的音讯";
messageHead.setBodyLen(msg.getBytes().length);
String imei = "12-euri-1234";
messageHead.setImeiLen(imei.getBytes().length);
MessageBody messageBody = new MessageBody();
messageBody.setImei(imei);
messageBody.setData(msg);
messageResponse.setMessageHead(messageHead);
messageResponse.setMessageBody(messageBody);
ctx.writeAndFlush(messageResponse);
}
}
9:Netty Client端处理器
package com.chat.client;
import cn.hutool.json.JSONUtil;
import com.chat.model.Message;
import com.chat.model.MessageBody;
import com.chat.model.MessageHead;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class ClientHandler extends SimpleChannelInboundHandler<Message> {
/**
* 当客户端衔接服务器完结就会触发该办法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
for(int i = 0; i < 20; i ++) {
Message message = new Message();
MessageHead messageHead = new MessageHead();
messageHead.setCommond(9988);
messageHead.setMessageType(0x0);
messageHead.setClientType(1);
messageHead.setVersion(2);
messageHead.setAppId(3);
String msg = "hello-" + i;
messageHead.setBodyLen(msg.getBytes().length);
String imei = "12-euri";
messageHead.setImeiLen(imei.getBytes().length);
MessageBody messageBody = new MessageBody();
messageBody.setImei(imei);
messageBody.setData(msg);
message.setMessageHead(messageHead);
message.setMessageBody(messageBody);
ctx.writeAndFlush(message);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
//当通道有读取事情时会触发,即服务端发送数据给客户端
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Message message) throws Exception {
System.out.println(JSONUtil.toJsonPrettyStr(message));
}
}
10:Netty Client端
package com.chat.client;
import com.chat.codec.MessageDecoder;
import com.chat.codec.MessageEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class Client {
public static void main(String[] args) throws Exception{
//客户端需求一个事情循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
//创立客户端发动对象
//留意客户端运用的不是ServerBootstrap而是Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) // 运用NioSocketChannel作为客户端的通道完结
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//加入处理器
ch.pipeline().addLast(new MessageDecoder());
ch.pipeline().addLast(new MessageEncoder());
ch.pipeline().addLast(new ClientHandler());
}
});
System.out.println("netty client start。。");
//发动客户端去衔接服务器端
ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();
//对通道关闭进行监听
cf.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
11:测试
1: 先发动Server端的main办法
2:再发动Client端的main办法
3:检查控制台
服务端控制台:
客户端控制台:
完好代码
悉数代码便是下图这几个类,上面现已贴出每个类的悉数代码,直接复制就行了