作者:张森泽

随着 RocketMQ 5.1.0 的正式发布,多级存储作为 RocketMQ 一个新的独立模块抵达了 Technical Preview 里程碑:答应用户将音讯从本地磁盘卸载到其他更便宜的存储介质,能够用较低的本钱延长音讯保存时刻。本文详细介绍 RocketMQ 多级存储规划与完成。

规划总览

RocketMQ 多级存储旨在不影响热数据读写的前提下将数据卸载到其他存储介质中,适用于两种场景:

  1. 冷热数据分离:RocketMQ 新近产生的音讯会缓存在 page cache 中,咱们称之为热数据;当缓存超过了内存的容量就会有热数据被换出成为冷数据。假如有少许顾客测验消费冷数据就会从硬盘中从头加载冷数据到 page cache,这会导致读写 IO 竞争并揉捏 page cache 的空间。而将冷数据的读取链路切换为多级存储就能够防止这个问题;
  2. 延长音讯保存时刻:将音讯卸载到更大更便宜的存储介质中,能够用较低的本钱完成更长的音讯保存时刻。一起多级存储支撑为 topic 指定不同的音讯保存时刻,能够依据事务需求灵敏装备音讯 TTL。

RocketMQ 多级存储对比 Kafka 和 Pulsar 的完成最大的不同是咱们运用准实时的方法上传音讯,而不是等一个 CommitLog 写满后再上传,首要基于以下几点考虑:

  1. 均摊本钱:RocketMQ 多级存储需求将大局 CommitLog 转换为 topic 维度并从头构建音讯索引,一次性处理整个 CommitLog 文件会带来性能毛刺;
  2. 对小标准实例更友爱:小标准实例往往装备较小的内存,这意味着热数据会更快换出成为冷数据,等候 CommitLog 写满再上传本身就有冷读危险。采纳准实时上传的方法既能规避音讯上传时的冷读危险,又能赶快使得冷数据能够从多级存储读取。

Quick Start

多级存储在规划上期望下降用户心智担负:用户无需改变客户端就能完成无感切换冷热数据读写链路,经过简略的修正服务端装备即可具有多级存储的才能,只需以下两步:

  1. 修正 Broker 装备,指定运用 org.apache.rocketmq.tieredstore.TieredMessageStore 作为 messageStorePlugIn
  2. 装备你想运用的贮存介质,以卸载音讯到其他硬盘为例:装备 tieredBackendServiceProvider 为 org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment,一起指定新贮存的文件路径:tieredStoreFilepath

可选项:支撑修正 tieredMetadataServiceProvider 切换元数据存储的完成,默认是基于 json 的文件存储

更多运用说明和装备项能够在 GitHub 上检查多级存储的README [ 1]

技能架构

RocketMQ 多级存储设计与实现

architecture

接入层:TieredMessageStore/TieredDispatcher/TieredMessageFetcher

接入层完成 MessageStore 中的部分读写接口,并为他们添加了异步语意。TieredDispatcher 和 TieredMessageFetcher 分别完成了多级存储的上传/下载逻辑,相比于底层接口这儿做了较多的性能优化:包括运用独立的线程池,防止慢 IO 堵塞访问热数据;运用预读缓存优化性能等。

容器层:TieredCommitLog/TieredConsumeQueue/TieredIndexFile/TieredFileQueue

容器层完成了和 DefaultMessageStore 类似的逻辑文件抽象,相同将文件划分为 CommitLog、ConsumeQueue、IndexFile,并且每种逻辑文件类型都经过 FileQueue 持有底层物理文件的引用。有所不同的是多级存储的 CommitLog 改为 queue 维度。

驱动层:TieredFileSegment

驱动层负责维护逻辑文件到物理文件的映射,经过完成 TieredStoreProvider 对接底层文件体系读写接口(Posix、S3、OSS、MinIO 等)。目前提供了 PosixFileSegment 的完成,能够将数据转移到其他硬盘或经过 fuse 挂载的目标存储上。

音讯上传

RocketMQ 多级存储的音讯上传是由 dispatch 机制触发的:初始化多级存储时会将 TieredDispatcher 注册为 CommitLog 的 dispacher。这样每逢有音讯发送到 Broker 会调用 TieredDispatcher 进行音讯分发,TieredDispatcher 将该音讯写入到 upload buffer 后当即返回成功。整个 dispatch 流程中不会有任何堵塞逻辑,保证不会影响本地 ConsumeQueue 的构建。

RocketMQ 多级存储设计与实现

TieredDispatcher

TieredDispatcher 写入 upload buffer 的内容仅为音讯的引用,不会将音讯的 body 读入内存。因为多级贮存以 queue 维度构建 CommitLog,此时需求从头生成 commitLog offset 字段。

RocketMQ 多级存储设计与实现

upload buffer

触发 upload buffer 上传时读取到每条音讯的 commitLog offset 字段时选用拼接的方法将新的 offset 嵌入到原音讯中。

上传进展操控

每个行列都会有两个关键位点操控上传进展:

  1. dispatch offset:已经写入缓存可是未上传的音讯位点
  2. commit offset:已上传的音讯位点

RocketMQ 多级存储设计与实现

upload progress

类比顾客,dispatch offset 相当于拉取音讯的位点,commit offset 相当于承认消费的位点。commit offset 到 dispatch offset 之间的部分相当于已拉取未消费的音讯。

音讯读取

TieredMessageStore 完成了 MessageStore 中的音讯读取相关接口,经过恳求中的逻辑位点(queue offset)判别是否从多级存储中读取音讯,依据装备(tieredStorageLevel)有四种战略:

  • DISABLE:制止从多级存储中读取音讯;
  • NOT_IN_DISK:不在 DefaultMessageStore 中的音讯从多级存储中读取;
  • NOT_IN_MEM:不在 page cache 中的音讯即冷数据从多级存储读取;
  • FORCE:强制一切音讯从多级存储中读取,目前仅供测试运用。
/**
  * Asynchronous get message
  * @see #getMessage(String, String, int, long, int, MessageFilter) 
  getMessage
  *
  * @param group Consumer group that launches this query.
  * @param topic Topic to query.
  * @param queueId Queue ID to query.
  * @param offset Logical offset to start from.
  * @param maxMsgNums Maximum count of messages to query.
  * @param messageFilter Message filter used to screen desired 
  messages.
  * @return Matched messages.
  */
CompletableFuture<GetMessageResult> getMessageAsync(final String group, final String topic, final int queueId,
    final long offset, final int maxMsgNums, final MessageFilter 
messageFilter);

需求从多级存储中读取的音讯会交由 TieredMessageFetcher 处理:首先校验参数是否合法,然后按照逻辑位点(queue offset)建议拉取恳求。TieredConsumeQueue/TieredCommitLog 将逻辑位点换算为对应文件的物理位点从 TieredFileSegment 读取音讯。

// TieredMessageFetcher#getMessageAsync similar with 
TieredMessageStore#getMessageAsync
public CompletableFuture<GetMessageResult> getMessageAsync(String 
group, String topic, int queueId,
        long queueOffset, int maxMsgNums, final MessageFilter 
messageFilter)

TieredFileSegment 维护每个贮存在文件体系中的物理文件位点,并经过为不同存储介质完成的接口从中读取所需的数据。

/**
  * Get data from backend file system
  *
  * @param position the index from where the file will be read
  * @param length the data size will be read
  * @return data to be read
  */
CompletableFuture<ByteBuffer> read0(long position, int length);

预读缓存

TieredMessageFetcher 读取音讯时会预读一部分音讯供下次运用,这些音讯暂存在预读缓存中。

protected final Cache<MessageCacheKey /* topic, queue id and queue
offset */,
SelectMappedBufferResultWrapper /* message data */> readAheadCache;

预读缓存的规划参阅了 TCP Tahoe 拥塞操控算法,每次预读的音讯量类似拥塞窗口选用加法增、乘法减的机制操控:

  • 加法增:从最小窗口开端,每次添加等同于客户端 batchSize 的音讯量。
  • 乘法减:当缓存的音讯超过了缓存过期时刻仍未被悉数拉取,在整理缓存的一起会将下次预读音讯量减半。

预读缓存支撑在读取音讯量较大时分片并发恳求,以取得更大带宽和更小的推迟。

某个 topic 音讯的预读缓存由消费这个 topic 的一切 group 同享,缓存失效战略为:

  1. 一切订阅这个 topic 的 group 都访问了缓存
  2. 抵达缓存过期时刻

故障康复

上文中咱们介绍上传进展由 commit offset 和 dispatch offset 操控。多级存储会为每个 topic、queue、fileSegment 创建元数据并持久化这两种位点。当 Broker 重启后会从元数据中康复,持续从 commit offset 开端上传音讯,之前缓存的音讯会从头上传并不会丢失。

RocketMQ 多级存储设计与实现

开发计划

面向云原生的存储体系要最大化使用云上存储的价值,而目标存储正是云核算盈利的表现。RocketMQ 多级存储期望一方面使用目标存储低本钱的优势延长音讯存储时刻、拓展数据的价值;另一方面使用其同享存储的特性在多副本架构中兼得本钱和数据牢靠性,以及未来向 Serverless 架构演进。

tag 过滤

多级存储拉取音讯时没有核算音讯的 tag 是否匹配,tag 过滤交给客户端处理。这样会带来额外的网络开销,计划后续在服务端添加 tag 过滤才能。

广播消费以及多个消费进展不同的顾客

预读缓存失效需求一切订阅这个 topic 的 group 都访问了缓存,这在多个 group 消费进展不一致的情况下很难触发,导致无用的音讯在缓存中堆积。

需求核算出每个 group 的消费 qps 来预算某个 group 能否在缓存失效前用上缓存的音讯。假如缓存的音讯预期在失效前都不会被再次访问,那么它应该被当即过期。相应的关于广播消费,音讯的过期战略应被优化为一切 Client 都读取这条音讯后才失效。

和高可用架构的交融

目前首要面临以下三个问题:

  1. 元数据同步:怎么牢靠的在多个节点间同步元数据,slave 提升时怎么校准和补全缺失的元数据;
  2. 制止上传超过 confirm offset 的音讯:为了防止音讯回退,上传的最大 offset 不能超过 confirm offset;
  3. slave 提升时快速启动多级存储:只要 master 节点具有写权限,在 slave 节点提升后需求快速拉起多级存储断点续传。

相关链接:

[1]README

github.com/apache/rock…

点击此处检查音讯行列 RocketMQ 产品详情