全体架构
RocketMQ 是一个典型的发布订阅体系,经过 Broker 节点中转和耐久化数据、解耦上下游。Broker 是真实存储数据的节点,由多个水平布置但不必定完全对等的副本组构成,单个副本组的不同节点的数据会到达最终共同。RocketMQ 优异的功能体现,绕不开其优秀的存储模型。
存储机制设计
在存储方法上,RocketMQ/Kafka/RabbitMQ 均选用的是音讯刷盘至所布置虚拟机/物理机的文件体系做耐久化。ActiveMQ(默许选用的 KahaDB 做音讯存储)可选用 JDBC 做音讯耐久化,经过简略的 xml 装备信息即可完成 JDBC 音讯存储。运用文件体系做耐久化的状况下,可获得更高效的 I/O 读写。
- Broker Store 目录结构
storePathRootDir=/cache1/rocketmq/broker/data
├── abort // 该文件在 Broker 发动后会主动创建,正常封闭 Broker,该文件会主动消失。若在没有发动 Broker 的状况下,发现这个文件是存在的,则说明之前 Broker 的封闭对错正常封闭
├── checkpoint // 其间存储着 commitlog、consumequeue、index 文件的最终刷盘时刻戳
├── commitlog // 其间寄存着 commitlog 文件,而音讯是写在 commitlog 文件中的
│ ├── 00000000000000000000
│ ├── 00000000001073741824
│ └── 00000000002147483648
├── config // 寄存着 Broker 运行期间的一些装备数据
│ ├── consumerFilter.json // 消费者的过滤器
│ ├── consumerFilter.json.bak
│ ├── consumerOffset.json // offsetTable记载消费进展偏移量
│ ├── consumerOffset.json.bak
│ ├── delayOffset.json
│ ├── delayOffset.json.bak
│ ├── subscriptionGroup.json // 消费者订阅联系
│ ├── subscriptionGroup.json.bak
│ ├── topics.json // topic装备
│ └── topics.json.bak
├── consumequeue // 其间寄存着 consumequeue 文件,行列就寄存在这个目录中
│ ├── TopicTest1
│ ├── 0
│ └── 00000000000000000000
│ └── 1
│ └── 00000000000000000000
│ └── TopicTest2
├── index // 音讯索引文件 indexFile,加快音讯查询速度
│ └── 20230902163452641 //文件名以创建时刻戳命名
└── lock // 运行期间运用到的全局资源锁
CommitLog
RocketMQ Broker 单个实例下所有的 Topic 都运用同一个 CommitLog 来存储,即单个实例音讯全体有序。CommitLog 单个文件巨细默许 1G,文件文件名是起始偏移量,一共 20 位,左边补零,起始偏移量是 0。假定文件依照默许巨细 1G 来算:
- 第一个文件的文件名为 00000000000000000000 ,当第一个文件被写满之后,开端写入第二个文件;
- 第二个文件的文件名为 00000000001073741824 ,1G=1073741824=102410241024;
- 第三个文件的文件名是 00000000002147483648,(文件名相差1G=1073741824=102410241024)。
CommitLog 依照上述命名的优点是给出任意一个音讯的物理偏移量,能够经过二分法进行查找,快速定位这个文件的方位,然后用音讯物理偏移量减去地点文件的称号,得到的差值就是在该文件中的绝对地址。
音讯存储格式
- MagicCode:MagicCode 是一个特殊的字段,它能够标志 Buffer 中的某个 CommitLog 是一个正常的CommitLog,仍是由于 Buffer 没有剩余的空间寄存该 CommitLog,导致该 CommitLog 是一个空的 CommitLog。MagicCode 有两个值,如下所示:
// Message's MAGIC CODE daa320a7
public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 8;
// End of file empty MAGIC CODE cbd43194
private final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 8;
- BodyCRC:CRC 即循环冗余校验码,是数据通信范畴中最常用的一种查错校验码,经过 CRC 就能够知道数据的正确性和完整性。RocketMQ 经过 CRC 来校验音讯部分:
if (checkCRC) {
int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
if (crc != bodyCRC) {
log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
return new DispatchRequest(-1, false/* success */);
}
}
- QueueId:音讯发往哪个行列,QueueId 在 Producer 发送音讯时会挑选出来。
- QueueOffset:寄存了音讯记载应该在 ConsumerQueue 中的方位,这样构建 ConsumerQueue 的时分,就知道该条记载在 ConsummerQueue 的方位次序,在消费音讯的时分很有用途。
- PhysicalOffset:音讯在 CommitLog 中的物理方位。需求注意的是,我们 CommitLog 对应着磁盘上的多个文件,这里的偏移量不是从某个文件开端算的,而是从第一个文件偏移开端算起的。
- SysFlag:是 RocketMQ 内部运用的符号位,经过位运算进行符号。例如是否对音讯进行了紧缩、是否属于业务音讯。SysFlag 初始值为 0,可与下面的符号进行位运算。
- BornTimestamp:Producer 发送音讯的时刻。
- BornHost:Producer 发送音讯运用的套接字地址。
- StoreTimestamp:音讯在 Broker 上存储时刻。
- StoreHostAddress:Broker 的套接字地址,存储方法同 BornHost。
- ReconsumeTimes:重复消费次数,初始为 0。Broker 重试的时分,这个 ReconsumeTimes 就会 1,默许最大重试次数是 16 次。
- PreparedTransactionOffset:业务音讯相关的一个特点(RocketMQ 业务音讯依据两阶段提交)。
- Properties:寄存了 RocketMQ 内部用到的一些特点,也寄存了用户的一些特点。
次序写
RocketMQ 的 Commitlog 文件、Consumequeue 文件都是次序写入的。磁盘次序写入速度能够到达几百兆/s,而随机写入速度只有几百 KB /s,相差上千倍。
PageCache 机制
Broker 在将音讯次序写入 CommitLog,大大提升功能。但还不够,毕竟仍是磁盘 I/O 操作,要想进一步提升功能,须利用内存。所以 Broker 将数据写入 CommitLog 文件时,不是直接写入物理磁盘文件,而是先进入 OS 的 PageCache 内存缓存,后续由 OS 后台线程异步将 PageCache 数据刷入底层磁盘文件。消费音讯时,选用随机读的方法,由于 PageCache 局部性热门原理且全体状况下仍是从旧到新的有序读,大部分 Case 音讯还能直接从 Page Cache 读,不会产生太多缺页(Page Fault)中断而从磁盘读取。
- 异步刷盘若 Broker 将音讯写入 PageCahe 并响应给生产者后突然宕机,此刻音讯在缓存中没有写入底层磁盘文件,就会形成音讯丢掉:生产者认为发送成功,实际上音讯写入失利。
- 遇到 OS 进行脏页回写,内存收回,内存 Swap 等状况时,或许引起较大的音讯读写延迟。
扩展: Java NIO 依据零拷贝的完成
mmap:
-
FileChannel#map():把文件对象映射到虚拟内存。
-
MappedByteBuffer/DirectByteBuffer.get(): 获取内存数据。
- 因其占用虚拟内存(非 JVM 的堆内存),不受 JVM -Xmx 参数约束,但其巨细也遭到 OS 虚拟内存巨细约束。一般一次只能映射 1.5~2G 的文件至用户态的虚拟内存空间,这也是为何 RocketMQ 默许设置单 CommitLog 日志数据文件为 1G。
sendfile:
- FileChannel.transferFrom()/transferTo():底层调用了sendfile()内核函数。
RocketMQ 挑选 mmap 原因:
(1) sendfile 在用户态不行见,而当时场景下有读有写。
(2) 在 Linux 体系中关于 1G 的文件,mmap 处理的功能要优于 sendfile。
ConsumeQueue
ConsumeQueue 不担任存储音讯,只担任记载它所属 Topic 的音讯在 CommitLog 中的偏移量,这样当消费者从 Broker 拉取音讯的时分,就能够快速依据偏移量定位到音讯。
ConsumeQueue 存储的格式如下,包含起始物理方位偏移量
,音讯长度
,音讯Tag的哈希值
,一共 20B:
每个 ConsumeQueue 都有一个 QueueId,QueueId 的值为 0 到 TopicConfig 装备的行列数量。比如某个 Topic 的消费行列数量为 4,那么四个 ConsumeQueue 的 QueueId 就分别为 0、1、2、3。
消费者消费时会先从 ConsumeQueue 中查找音讯在 CommitLog 中的 Offset,再去 CommitLog 中找原始音讯数据。假如某个音讯只在 CommitLog 中有数据而没在 ConsumeQueue 中则消费者无法进行消费。
ConsumeQueue 类对应的是每个topic和queuId下面的所有文件。默许存储路径是$HOME/store/consumequeue/{topic}/{queueId}/{fileName},每个文件由 30w 条数据组成,单个文件的巨细是 30w x 20Byte,即每个文件为 600w 字节,单个消费行列的文件巨细约为 5.722M=(600w/(1024*1024))。
ConsumeQueue 构建流程:
IndexFile
Broker 除了经过 ConsumeQueue 提供给 Consumer 消费之外,还支持经过 MsgID 或者 MessageKey 来查询音讯;运用 ID 查询时,由于 ID 就是用 broker offset 生成的(这里 MsgID 指的是服务端的),所以很容易就找到对应的 CommitLog 文件来读取音讯。关于用 MessageKey 来查询音讯,MessageStore 经过构建一个 Index 来提高读取速度。IndexFile 结构如下图:
Checkpoint
Checkpoint 主要记载 CommitLog、ConsumeQueue、Index 文件的刷盘时刻点,假如在上一次 Broker 反常结束时,会依据 StoreCheckpoint 的数据进行康复。
火山引擎依据字节跳动内部的大规模实践,推出的音讯行列产品包括音讯行列 Kafka / RabbitMQ / RocketMQ 版及云原生音讯引擎 BMQ,欢迎咨询了解!
参考资料
zhuanlan.zhihu.com/p/574215600…
来源团队|字节跳动 IBF 业财研发部 伍楼华