敞开生长之旅!这是我参与「日新方案 12 月更文挑战」的第3天,点击检查活动详情
写在前面
这是 MQTT 协议学习的第二篇,上一篇通过 MQTT 学习协议制定与完成(一)介绍了 MQTT 协议的报文格局。
Fixed header 固定报头,一切操控报文都包括 |
---|
Variable header 可变报头,部分操控报文包括 |
Payload 有效载荷,部分操控报文包括 |
本篇通过 Netty 简单完成一个 MQTT Server,运用 MQTTX 客户端发送操控报文,通过 Wiresharks 抓包来剖析不同类型的操控报文,固定报头、可变报头、有效载荷部分。
简单 MQTT Server 完成
Netty MQTT Server 界说
界说一个 Netty MQTT Server,通过 ServerBootstrap 完成,关于 Netty 我想后面也系统性的写文章介绍,对于不了解 Netty 的同学能够将 Netty 先看成黑盒,就简单的了解成起了一个 Server 绑定 60000端口。
这儿要点关注初始化 channel 在 channel pipeline 中添加 MqttEncoder.INSTANCE
、MqttDecoder
、mqttHander
,对网络通道中传输的 MQTT 字节流进行编码、解码以及业务逻辑处理。
@Component
@RequiredArgsConstructor
public class MqttAgent {
private final MqttHandler mqttHandler;
private EventLoopGroup _bossGroup;
private EventLoopGroup _workGroup;
@PostConstruct
public void start() throws Exception {
try {
_bossGroup = new NioEventLoopGroup(1);
_workGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(_bossGroup, _workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast("mqttEncoder", MqttEncoder.INSTANCE);
ch.pipeline().addLast("mqttDecoder", new MqttDecoder());
ch.pipeline().addLast("mqttHandler", mqttHandler);
}
});
serverBootstrap.bind(
"0.0.0.0",
60000
).sync();
} catch (Exception e) {
try {
_bossGroup.shutdownGracefully();
_workGroup.shutdownGracefully();
} catch (Exception shutdownException) {
// ignore
}
// rethrow exception while agent does not start up normally.
throw e;
}
}
}
MQTT Handler 完成
MqttHandler 主要处理 Mqtt 报文的处理逻辑,如衔接建立时对用户账号密码等权限检查,这儿做最简单的处理,打印出 Mqtt 报文然后回复一个 MqttConnAckMessage
,代码如下:
@Component
@Slf4j
@ChannelHandler.Sharable
public class MqttHandler extends SimpleChannelInboundHandler<MqttMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage mqttMessage) throws Exception {
//打印mqtt 音讯
log.info("mqtt message:{}", mqttMessage);
//回复 connect ack 操控报文
ctx.channel().writeAndFlush(connAckMessageInstance(MqttConnectReturnCode.CONNECTION_ACCEPTED));
}
//构造一个 MqttConnAckMessage
public static MqttConnAckMessage connAckMessageInstance(MqttConnectReturnCode returnCode) {
//衔接确认标志+衔接回来码(2位)
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false,
MqttQoS.AT_MOST_ONCE, false, 2);
MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(returnCode, false);
return new MqttConnAckMessage(fixedHeader, variableHeader);
}
}
MqttConnAckMessage 报文只要固定报头和可变报头,没有有效载荷部分。
- 固定报头:操控报文类型值为2,标志位0,剩下长度为两个字节的可变报头
- 可变报头:2个字节,一个字节衔接确认标志,第二个字节是衔接回来码
验证程序
运用 MQTTX 客户端,编辑衔接信息,mqtt server host:127.0.0.1, port:60000,发动服务端。
- 建立衔接
- 衔接成功,发送音讯
服务端成功接纳到音讯
2022-11-24 22:39:49.304 INFO 8299 --- [ntLoopGroup-3-1] com.demo.mqttdemo.MqttHandler: mqtt message:MqttFixedHeader[messageType=PUBLISH, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=22]
Wiresharks 抓包
打开 Wiresharks 客户端,MQTTX 发送衔接恳求,检查 Wiresharks 抓包:如图所示 No61,No62,No63 则是大名鼎鼎的三次握手,No67,No69 则是 MQTT Connect 操控报文与 ConnAck 操控报文。
Connect 操控报文
由报文 DATA 部分能够看出
固定报头两字节:10 1a,10为 Connect 操控报文类型,1a则为剩下长度
CONNECT 报文的可变报头:按下列次第包括四个字段:协议名(Protocol Name),协议等级(Protocol Level),衔接标志(Connect Flags)和坚持衔接(Keep Alive)。
00 04 4d 51 54 54 这固定的6个字节就表明晰 MQTT 协议名
阐明 | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
---|---|---|---|---|---|---|---|---|---|
协议名 | |||||||||
byte 1 | 长度 MSB (0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
byte 2 | 长度 LSB (4) | 0 | 0 | 0 | 0 | 0 | 1 | 0 | 0 |
byte 3 | ‘M’ | 0 | 1 | 0 | 0 | 1 | 1 | 0 | 1 |
byte 4 | ‘Q’ | 0 | 1 | 0 | 1 | 0 | 0 | 0 | 1 |
byte 5 | ‘T’ | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 0 |
Byte 6 | ‘T’ | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 0 |
ConnAck 操控报文
固定报头:20 02 两个字节表示操控报文类型以及剩下长度,ConnAck 固定报头格局为:
Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|---|---|---|---|---|---|---|---|
byte 1 | 0 | 0 | 1 | 0 | 0 | 0 | 0 | 0 |
byte 2 | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 0 |
可变报头:00 00 两个字节表示衔接确认标志与衔接回来码,衔接已接纳回来码则是 0x00 所以两个字节为 00 00