一、简介

RocketMQ 是阿里巴巴开源的分布式音讯中间件,它学习了 Kafka 完结,支撑音讯订阅与发布、次序音讯、事务音讯、定时音讯、音讯回溯、死信行列等功能。RocketMQ 架构上首要分为四部分,如下图所示:

深入剖析 RocketMQ 源码 - 消息存储模块

  • Producer:音讯出产者,支撑分布式集群办法部署。

  • Consumer:音讯顾客,支撑分布式集群办法部署。

  • NameServer:姓名服务,是一个非常简略的 Topic 路由注册中心,支撑 Broker 的动态注册与发现,Producer 和 Consumer 经过 NameServer 动态感知 Broker 的路由信息。

  • Broker:Broker 首要担任音讯的存储、转发和查询。

本文依据 Apache RocketMQ 4.9.1 版本剖析 Broker 中的音讯存储模块是怎样规划的。

二、存储架构

RocketMQ 的音讯文件途径如图所示。

深入剖析 RocketMQ 源码 - 消息存储模块

CommitLog

音讯主体以及元数据的存储主体,存储 Producer 端写入的音讯主体内容,音讯内容不是定长的。单个文件巨细默许1G, 文件名长度为 20 位,左面补零,剩下为开始偏移量,比如 00000000000000000000 代表了第一个文件,开始偏移量为 0,文件巨细为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,开始偏移量为 1073741824,以此类推。

ConsumeQueue

音讯消费行列,Consumequeue 文件能够看成是依据 CommitLog 的索引文件。ConsumeQueue 文件采取定长规划,每一个条目共 20 个字节,分别为 8 字节的 CommitLog 物理偏移量、4 字节的音讯长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,能够像数组一样随机拜访每一个条目,每个 ConsumeQueue 文件巨细约 5.72M。

IndexFile

索引文件,供给了一种能够经过 key 或时刻区间来查询音讯的办法。单个 IndexFile 文件巨细约为 400M,一个 IndexFile 能够保存 2000W 个索引,IndexFile 的底层存储规划类似 JDK 的 HashMap 数据结构。

其他文件:包含 config 文件夹,寄存运行时装备信息;abort 文件,阐明 Broker 是否正常封闭;checkpoint 文件,存储 Commitlog、ConsumeQueue、Index 文件最后一次刷盘时刻戳。这些不在本文评论的规模。

同 Kafka 比较,Kafka 每个 Topic 的每个 partition 对应一个文件,次序写入,定时刷盘。但一旦单个 Broker 的 Topic 过多,次序写将退化为随机写。而 RocketMQ 单个 Broker 所有 Topic 在同一个 CommitLog 中次序写,是能够保证严厉次序写。RocketMQ 读取音讯需求从 ConsumeQueue 中拿到音讯实践物理偏移再去 CommitLog 读取音讯内容,会形成随机读取。

2.1 Page Cache 和 mmap

在正式介绍 Broker 音讯存储模块完结前,先阐明下 Page Cache 和 mmap 这两个概念。

Page Cache 是 OS 对文件的缓存,用于加快对文件的读写。一般来说,程序对文件进行次序读写的速度几乎接近于内存的读写速度,首要原因便是由于 OS 运用 Page Cache 机制对读写拜访操作进行了功能优化,将一部分的内存用作 Page Cache。关于数据的写入,OS 会先写入至 Cache 内,随后经过异步的办法由 pdflush 内核线程将 Cache 内的数据刷盘至物理磁盘上。关于数据的读取,假如一次读取文件时呈现未射中 Page Cache 的状况,OS 从物理磁盘上拜访读取文件的一起,会次序对其他相邻块的数据文件进行预读取。

mmap 是将磁盘上的物理文件直接映射到用户态的内存地址中,削减了传统 IO 将磁盘文件数据在操作体系内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行复制的功能开销。Java NIO 中的 FileChannel 供给了 map() 办法能够完结 mmap。FileChannel (文件通道)和 mmap (内存映射) 读写功能比较能够参照这篇文章。

2.2 Broker 模块

下图是 Broker 存储架构图,展现了 Broker 模块从收到音讯到回来呼应事务流经进程。

深入剖析 RocketMQ 源码 - 消息存储模块

事务接入层:RocketMQ 依据 Netty 的 Reactor 多线程模型完结了底层通讯。Reactor 主线程池 eventLoopGroupBoss 担任创立 TCP 衔接,默许只要一个线程。衔接建立后,再丢给 Reactor 子线程池 eventLoopGroupSelector 进行读写事情的处理。

defaultEventExecutorGroup 担任 SSL 验证、编解码、闲暇检查、网络衔接办理。然后依据 RomotingCommand 的事务恳求码 code 去 processorTable 这个本地缓存变量中找到对应的 processor,封装成 task 任务后,提交给对应的事务 processor 处理线程池来履行。Broker 模块经过这四级线程池提升体系吞吐量。

事务处理层:处理各种经过 RPC 调用过来的事务恳求,其间:

  • SendMessageProcessor 担任处理 Producer 发送音讯的恳求;

  • PullMessageProcessor 担任处理 Consumer 消费音讯的恳求;

  • QueryMessageProcessor 担任处理依照音讯 Key 等查询音讯的恳求。

存储逻辑层:DefaultMessageStore 是 RocketMQ 的存储逻辑中心类,供给音讯存储、读取、删除等才能。

文件映射层:把 Commitlog、ConsumeQueue、IndexFile 文件映射为存储目标 MappedFile。

数据传输层:支撑依据 mmap 内存映射进行读写音讯,一起也支撑依据 mmap 进行读取音讯、堆外内存写入音讯的办法进行读写音讯。

下面章节将从源码角度来剖析 RocketMQ 是怎样完结高功能存储。

三、音讯写入

以单个音讯出产为例,音讯写入时序逻辑如下图,事务逻辑如上文 Broker 存储架构所示在各层之间进行流通。

深入剖析 RocketMQ 源码 - 消息存储模块

最底层音讯写入中心代码在 CommitLog 的 asyncPutMessage 办法中,首要分为获取 MappedFile、往缓冲区写音讯、提交刷盘恳求三步。需求留意的是在这三步前后有自旋锁或 ReentrantLock 的加锁、开释锁,保证单个 Broker 写音讯是串行的。

//org.apache.rocketmq.store.CommitLog::asyncPutMessage
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        ...
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            //获取最新的 MappedFile
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
            ...
            //向缓冲区写音讯
            result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
            ...
            //提交刷盘恳求
            CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
            ...
        } finally {
            putMessageLock.unlock();
        }
        ...
    }

下面介绍这三步详细做了什么事情。

3.1 MappedFile 初始化

在 Broker 初始化时会发动办理 MappedFile 创立的 AllocateMappedFileService 异步线程。音讯处理线程 和 AllocateMappedFileService 线程经过行列 requestQueue 关联。

音讯写入时调用 AllocateMappedFileService 的 putRequestAndReturnMappedFile 办法往 requestQueue 放入提交创立 MappedFile 恳求,这边会一起构建两个 AllocateRequest 放入行列。

AllocateMappedFileService 线程循环从 requestQueue 获取 AllocateRequest 来创立 MappedFile。音讯处理线程经过 CountDownLatch 等候获取第一个 MappedFile 创立成功就回来。

当音讯处理线程需求再次创立 MappedFile 时,此时能够直接获取之前已预创立的 MappedFile。这样经过预创立 MappedFile ,削减文件创立等候时刻。

//org.apache.rocketmq.store.AllocateMappedFileService::putRequestAndReturnMappedFile
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
    //恳求创立 MappedFile
    AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
    boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
    ...
    //恳求预先创立下一个 MappedFile
    AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
    boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
    ...
    //获取本次创立 MappedFile
    AllocateRequest result = this.requestTable.get(nextFilePath);
    ...
}
//org.apache.rocketmq.store.AllocateMappedFileService::run
public void run() {
    ..
    while (!this.isStopped() && this.mmapOperation()) {
    }
    ...
}
//org.apache.rocketmq.store.AllocateMappedFileService::mmapOperation
private boolean mmapOperation() {
    ...
    //从行列获取 AllocateRequest
    req = this.requestQueue.take();
    ...
    //判别是否敞开堆外内存池
    if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
        //敞开堆外内存的 MappedFile
        mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
        mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
    } else {
        //一般 MappedFile
        mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
    }
    ...
    //MappedFile 预热
    if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
        .getMappedFileSizeCommitLog()
        &&
        this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
        mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
            this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
    }
    req.setMappedFile(mappedFile);
    ...
}

每次新建一般 MappedFile 恳求,都会创立 mappedByteBuffer,下面代码展现了 Java mmap 是怎样完结的。

//org.apache.rocketmq.store.MappedFile::init
private void init(final String fileName, final int fileSize) throws IOException {
    ...
    this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
    this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
    ...
}

假如敞开堆外内存,即 transientStorePoolEnable = true 时,mappedByteBuffer 仅仅用来读音讯,堆外内存用来写音讯,然后完结关于音讯的读写别离。堆外内存目标不是每次新建 MappedFile 都需求创立,而是体系发动时依据堆外内存池巨细就初始化好了。每个堆外内存 DirectByteBuffer 都与 CommitLog 文件巨细相同,经过确定住该堆外内存,保证不会被置换到虚拟内存中去。

//org.apache.rocketmq.store.TransientStorePool
public void init() {
    for (int i = 0; i < poolSize; i++) {
        //分配与 CommitLog 文件巨细相同的堆外内存
        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
        final long address = ((DirectBuffer) byteBuffer).address();
        Pointer pointer = new Pointer(address);
        //确定堆外内存,保证不会被置换到虚拟内存中去
        LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
        availableBuffers.offer(byteBuffer);
    }
}

上面的 mmapOperation 办法中有段 MappedFile 预热逻辑。为什么需求文件预热呢?文件预热怎样做的呢?

因为经过 mmap 映射,仅仅建立了进程虚拟内存地址与物理内存地址之间的映射联系,并没有将 Page Cache 加载至内存。读写数据时假如没有射中写 Page Cache 则产生缺页中断,从磁盘从头加载数据至内存,这样会影响读写功能。为了避免缺页反常,阻止操作体系将相关的内存页调度到交换空间(swap space),RocketMQ 经过对文件预热,文件预热完结如下。

//org.apache.rocketmq.store.MappedFile::warmMappedFile
public void warmMappedFile(FlushDiskType type, int pages) {
        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
        int flush = 0;
        //经过写入 1G 的字节 0 来让操作体系分配物理内存空间,假如没有填充值,操作体系不会实践分配物理内存,避免在写入音讯时产生缺页反常
        for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
            byteBuffer.put(i, (byte) 0);
            // force flush when flush disk type is sync
            if (type == FlushDiskType.SYNC_FLUSH) {
                if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
                    flush = i;
                    mappedByteBuffer.force();
                }
            }
            //prevent gc
            if (j % 1000 == 0) {
                Thread.sleep(0);
            }
        }
        //force flush when prepare load finished
        if (type == FlushDiskType.SYNC_FLUSH) {
            mappedByteBuffer.force();
        }
        ...
        this.mlock();
}
//org.apache.rocketmq.store.MappedFile::mlock
public void mlock() {
    final long beginTime = System.currentTimeMillis();
    final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
    Pointer pointer = new Pointer(address);
    //经过体系调用 mlock 确定该文件的 Page Cache,避免其被交换到 swap 空间
    int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
    //经过体系调用 madvise 给操作体系主张,阐明该文件在不久的将来要被拜访
    int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
}

综上所述,RocketMQ 每次都预创立一个文件来削减文件创立延迟,经过文件预热避免了读写时缺页反常。

3.2 音讯写入

3.2.1 写入 CommitLog

CommitLog 中每条音讯存储的逻辑视图如下图所示, TOTALSIZE 是整个音讯占用存储空间巨细。

深入剖析 RocketMQ 源码 - 消息存储模块

下面表格阐明下每条音讯包含哪些字段,以及这些字段占用空间巨细和字段简介。

深入剖析 RocketMQ 源码 - 消息存储模块

音讯的写入是调用MappedFile 的 appendMessagesInner办法。

//org.apache.rocketmq.store.MappedFile::appendMessagesInner
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
        PutMessageContext putMessageContext) {
    //判别运用 DirectBuffer 仍是 MappedByteBuffer 进行写操作
    ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
    ..
    byteBuffer.position(currentPos);
    AppendMessageResult result  = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                    (MessageExtBrokerInner) messageExt, putMessageContext);
    ..
    return result;
}
//org.apache.rocketmq.store.CommitLog::doAppend
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
    final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
    ...
    ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
    ...
    //这边仅仅将音讯写入缓冲区,还未实践刷盘
    byteBuffer.put(preEncodeBuffer);
    msgInner.setEncodedBuff(null);
    ...
    return result;
}

至此,音讯终究写入 ByteBuffer,还没有耐久到磁盘,详细何时耐久化,下一小节会详细讲刷盘机制。这边有个疑问 ConsumeQueue 和 IndexFile 是怎样写入的?

答案是在存储架构图中存储逻辑层的 ReputMessageService。MessageStore 在初始化的时分,会发动一个 ReputMessageService 异步线程,它发动后便会在循环中不断调用 doReput 办法,用来通知 ConsumeQueue 和 IndexFile 进行更新。ConsumeQueue 和 IndexFile 之所以能够异步更新是因为 CommitLog 中保存了康复 ConsumeQueue 和 IndexFile 所需行列和 Topic 等信息,即便 Broker 服务反常宕机,Broker 重启后能够依据 CommitLog 康复 ConsumeQueue 和IndexFile。

//org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService::run
public void run() {
    ...
    while (!this.isStopped()) {
        Thread.sleep(1);
         this.doReput();
    }
    ...
}
//org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService::doReput
private void doReput() {
    ...
    //获取CommitLog中存储的新音讯
    DispatchRequest dispatchRequest =
        DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
    int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
    if (dispatchRequest.isSuccess()) {
        if (size > 0) {
            //假如有新音讯,则分别调用 CommitLogDispatcherBuildConsumeQueue、CommitLogDispatcherBuildIndex 进行构建 ConsumeQueue 和 IndexFile
            DefaultMessageStore.this.doDispatch(dispatchRequest);
    }
    ...
}

3.2.2 写入 ConsumeQueue

如下图所示,ConsumeQueue 每一条记载共 20 个字节,分别为 8 字节的 CommitLog 物理偏移量、4 字节的音讯长度、8字节 tag hashcode。

深入剖析 RocketMQ 源码 - 消息存储模块

ConsumeQueue 记载耐久化逻辑如下。

//org.apache.rocketmq.store.ConsumeQueue::putMessagePositionInfo
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
    final long cqOffset) {
    ...
    this.byteBufferIndex.flip();
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    this.byteBufferIndex.putLong(offset);
    this.byteBufferIndex.putInt(size);
    this.byteBufferIndex.putLong(tagsCode);
    final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
    if (mappedFile != null) {
        ...
        return mappedFile.appendMessage(this.byteBufferIndex.array());
    }
}

3.2.3 写入 IndexFile

IndexFile 的文件逻辑结构如下图所示,类似于 JDK 的 HashMap 的数组加链表结构。首要由 Header、Slot Table、Index Linked List 三部分组成。

深入剖析 RocketMQ 源码 - 消息存储模块

Header:IndexFile 的头部,占 40 个字节。首要包含以下字段:

  • beginTimestamp:该 IndexFile 文件中包含音讯的最小存储时刻。

  • endTimestamp:该 IndexFile 文件中包含音讯的最大存储时刻。

  • beginPhyoffset:该 IndexFile 文件中包含音讯的最小 CommitLog 文件偏移量。

  • endPhyoffset:该 IndexFile 文件中包含音讯的最大 CommitLog 文件偏移量。

  • hashSlotcount:该 IndexFile 文件中包含的 hashSlot 的总数。

  • indexCount:该 IndexFile 文件中已运用的 Index 条目个数。

Slot Table:默许包含 500w 个 Hash 槽,每个 Hash 槽存储的是相同 hash 值的第一个 IndexItem 存储方位 。

Index Linked List:默许最多包含 2000w 个 IndexItem。其组成如下所示:

  • Key Hash:音讯 key 的 hash,当依据 key 查找时比较的是其 hash,在之后会比较 key 本身。

  • CommitLog Offset:音讯的物理位移。

  • Timestamp:该音讯存储时刻与第一条音讯的时刻戳的差值。

  • Next Index Offset:产生 hash 冲突后保存的下一个 IndexItem 的方位。

Slot Table 中每个 hash 槽寄存的是 IndexItem 在 Index Linked List 的方位,假如 hash 冲突时,新的 IndexItem 插入链表头, 它的 Next Index Offset 中寄存之前链表头 IndexItem 方位,一起覆盖 Slot Table 中的 hash 槽为最新 IndexItem 方位。代码如下:

//org.apache.rocketmq.store.index.IndexFile::putKey
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    int keyHash = indexKeyHashMethod(key);
    int slotPos = keyHash % this.hashSlotNum;
    int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
    ...
    //从 Slot Table 获取当前最新音讯方位
    int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
    ...
    int absIndexPos =
        IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
            + this.indexHeader.getIndexCount() * indexSize;
    this.mappedByteBuffer.putInt(absIndexPos, keyHash);
    this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
    this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
    //寄存之前链表头 IndexItem 方位
    this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
    //更新 Slot Table 中 hash 槽的值为最新音讯方位
    this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
    if (this.indexHeader.getIndexCount() <= 1) {
        this.indexHeader.setBeginPhyOffset(phyOffset);
        this.indexHeader.setBeginTimestamp(storeTimestamp);
    }
    if (invalidIndex == slotValue) {
        this.indexHeader.incHashSlotCount();
    }
    this.indexHeader.incIndexCount();
    this.indexHeader.setEndPhyOffset(phyOffset);
    this.indexHeader.setEndTimestamp(storeTimestamp);
    return true;
    ...
}

综上所述一个完好的音讯写入流程包含:同步写入 Commitlog 文件缓存区,异步构建 ConsumeQueue、IndexFile 文件。

3.3 音讯刷盘

RocketMQ 音讯刷盘首要分为同步刷盘和异步刷盘。

(1) 同步刷盘:只要在音讯真实耐久化至磁盘后 RocketMQ 的 Broker 端才会真实回来给 Producer 端一个成功的 ACK 呼应。同步刷盘对 MQ 音讯可靠性来说是一种不错的保障,可是功能上会有较大影响,一般金融事务运用该模式较多。

(2) 异步刷盘:能够充分利用 OS 的 Page Cache 的优势,只要音讯写入 Page Cache 即可将成功的 ACK 回来给 Producer 端。音讯刷盘选用后台异步线程提交的办法进行,降低了读写延迟,提高了 MQ 的功能和吞吐量。异步刷盘包含敞开堆外内存和未敞开堆外内存两种办法。

在 CommitLog 中提交刷盘恳求时,会依据当前 Broker 相关装备决定是同步刷盘仍是异步刷盘。

//org.apache.rocketmq.store.CommitLog::submitFlushRequest
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
    //同步刷盘
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (messageExt.isWaitStoreMsgOK()) {
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                    this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            service.putRequest(request);
            return request.future();
        } else {
            service.wakeup();
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }
    //异步刷盘
    else {
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            flushCommitLogService.wakeup();
        } else  {
            //敞开堆外内存的异步刷盘
            commitLogService.wakeup();
        }
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }
}

GroupCommitService、FlushRealTimeService、CommitRealTimeService 三者承继联系如图;

深入剖析 RocketMQ 源码 - 消息存储模块

GroupCommitService:同步刷盘线程。如下图所示,音讯写入到 Page Cache 后经过 GroupCommitService 同步刷盘,音讯处理线程堵塞等候刷盘结果。

深入剖析 RocketMQ 源码 - 消息存储模块

//org.apache.rocketmq.store.CommitLog.GroupCommitService::run
public void run() {
    ...
    while (!this.isStopped()) {
        this.waitForRunning(10);
        this.doCommit();
    }
    ...
}
//org.apache.rocketmq.store.CommitLog.GroupCommitService::doCommit
private void doCommit() {
    ...
    for (GroupCommitRequest req : this.requestsRead) {
        boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
        for (int i = 0; i < 2 && !flushOK; i++) {
            CommitLog.this.mappedFileQueue.flush(0);
            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
        }
        //唤醒等候刷盘完结的音讯处理线程
        req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
    }
    ...
}
//org.apache.rocketmq.store.MappedFile::flush
public int flush(final int flushLeastPages) {
    if (this.isAbleToFlush(flushLeastPages)) {
        ...
        //运用到了 writeBuffer 或者 fileChannel 的 position 不为 0 时用 fileChannel 进行强制刷盘
        if (writeBuffer != null || this.fileChannel.position() != 0) {
            this.fileChannel.force(false);
        } else {
            //运用 MappedByteBuffer 进行强制刷盘
            this.mappedByteBuffer.force();
        }
        ...
    }
}

FlushRealTimeService:未敞开堆外内存的异步刷盘线程。如下图所示,音讯写入到 Page Cache 后,音讯处理线程当即回来,经过 FlushRealTimeService 异步刷盘。

深入剖析 RocketMQ 源码 - 消息存储模块

//org.apache.rocketmq.store.CommitLog.FlushRealTimeService
public void run() {
    ...
    //判别是否需求周期性进行刷盘
    if (flushCommitLogTimed) {
        //固定休眠 interval 时刻距离
        Thread.sleep(interval);
    } else {
        // 假如被唤醒就刷盘,非周期性刷盘
        this.waitForRunning(interval);
    }
    ...
    // 这边和 GroupCommitService 用的是同一个强制刷盘办法
    CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
    ...
}

CommitRealTimeService:敞开堆外内存的异步刷盘线程。如下图所示,音讯处理线程把音讯写入到堆外内存后当即回来。后续先经过 CommitRealTimeService 把音讯由堆外内存异步提交至 Page Cache,再由 FlushRealTimeService 线程异步刷盘。

留意:在音讯异步提交至 Page Cache 后,事务就能够从 MappedByteBuffer 读取到该音讯。

深入剖析 RocketMQ 源码 - 消息存储模块

音讯写入到堆外内存 writeBuffer 后,会经过 isAbleToCommit 办法判别是否积累到至少提交页数(默许4页)。假如页数到达最小提交页数,则批量提交;否则仍是驻留在堆外内存,这边有丢失音讯危险。经过这种批量操作,读和写的 Page Cahe 会距离数页,降低了 Page Cahe 读写冲突的概率,完结了读写别离。详细完结逻辑如下:

//org.apache.rocketmq.store.CommitLog.CommitRealTimeService
class CommitRealTimeService extends FlushCommitLogService {
    @Override
    public void run() {
        while (!this.isStopped()) {
            ...
            int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
            ...
            //把音讯 commit 到内存缓冲区,终究调用的是 MappedFile::commit0 办法,只要到达最少提交页数才能提交成功,否则还在堆外内存中
            boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
            if (!result) {
                //唤醒 flushCommitLogService,进行强制刷盘
                flushCommitLogService.wakeup();
            }
            ...
            this.waitForRunning(interval);
        }
    }
}
//org.apache.rocketmq.store.MappedFile::commit0
protected void commit0() {
    int writePos = this.wrotePosition.get();
    int lastCommittedPosition = this.committedPosition.get();
    //音讯提交至 Page Cache,并未实践刷盘
    if (writePos - lastCommittedPosition > 0) {
        ByteBuffer byteBuffer = writeBuffer.slice();
        byteBuffer.position(lastCommittedPosition);
        byteBuffer.limit(writePos);
        this.fileChannel.position(lastCommittedPosition);
        this.fileChannel.write(byteBuffer);
        this.committedPosition.set(writePos);
    }
}

下面总结一下三种刷盘机制的运用场景及优缺点。

深入剖析 RocketMQ 源码 - 消息存储模块

四、音讯读取

音讯读取逻辑比较写入逻辑简略许多,下面着重剖析下依据 offset 查询音讯和依据 key 查询音讯是怎样完结的。

4.1 依据 offset 查询

读取音讯的进程便是先从 ConsumeQueue 中找到音讯在 CommitLog 的物理偏移地址,然后再从 CommitLog 文件中读取音讯的实体内容。

//org.apache.rocketmq.store.DefaultMessageStore::getMessage
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
    final int maxMsgNums,
    final MessageFilter messageFilter) {
    long nextBeginOffset = offset;
    GetMessageResult getResult = new GetMessageResult();
    final long maxOffsetPy = this.commitLog.getMaxOffset();
    //找到对应的 ConsumeQueue
    ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
    ...
    //依据 offset 找到对应的 ConsumeQueue 的 MappedFile
    SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
    status = GetMessageStatus.NO_MATCHED_MESSAGE;
    long maxPhyOffsetPulling = 0;
    int i = 0;
    //能回来的最大信息巨细,不能大于 16M
    final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
    for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
        //CommitLog 物理地址
        long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
        int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
        maxPhyOffsetPulling = offsetPy;
        ...
        //依据 offset 和 size 从 CommitLog 拿到详细的 Message
        SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
        ...
        //将 Message 放入结果集
        getResult.addMessage(selectResult);
        status = GetMessageStatus.FOUND;
    }
    //更新 offset
    nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
    long diff = maxOffsetPy - maxPhyOffsetPulling;
    long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
        * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
    getResult.setSuggestPullingFromSlave(diff > memory);
    ...
    getResult.setStatus(status);
    getResult.setNextBeginOffset(nextBeginOffset);
    return getResult;
}

4.2 依据 key 查询

读取音讯的进程便是用 topic 和 key 找到 IndexFile 索引文件中的一条记载,依据记载中的 CommitLog 的 offset 从 CommitLog 文件中读取音讯的实体内容。

//org.apache.rocketmq.store.DefaultMessageStore::queryMessage
public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
    QueryMessageResult queryMessageResult = new QueryMessageResult();
    long lastQueryMsgTime = end;
    for (int i = 0; i < 3; i++) {
        //获取 IndexFile 索引文件中记载的音讯在 CommitLog 文件物理偏移地址
        QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
        ...
        for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {
            long offset = queryOffsetResult.getPhyOffsets().get(m);
            ...
            MessageExt msg = this.lookMessageByOffset(offset);
            if (0 == m) {
                lastQueryMsgTime = msg.getStoreTimestamp();
            }
            ...
            //在 CommitLog 文件获取音讯内容
            SelectMappedBufferResult result = this.commitLog.getData(offset, false);
            ...
            queryMessageResult.addMessage(result);
            ...
        }
    }
    return queryMessageResult;
}

在 IndexFile 索引文件,查找 CommitLog 文件物理偏移地址完结如下:

//org.apache.rocketmq.store.index.IndexFile::selectPhyOffset
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end, boolean lock) {
    int keyHash = indexKeyHashMethod(key);
    int slotPos = keyHash % this.hashSlotNum;
    int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
    //获取相同 hash 值 key 的第一个 IndexItme 存储方位,即链表的首节点
    int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
    //遍历链表节点
    for (int nextIndexToRead = slotValue; ; ) {
        if (phyOffsets.size() >= maxNum) {
            break;
        }
        int absIndexPos =
            IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                + nextIndexToRead * indexSize;
        int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
        long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
        long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
        int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
        if (timeDiff < 0) {
            break;
        }
        timeDiff *= 1000L;
        long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
        boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
        //符合条件的结果参加 phyOffsets
        if (keyHash == keyHashRead && timeMatched) {
            phyOffsets.add(phyOffsetRead);
        }
        if (prevIndexRead <= invalidIndex
            || prevIndexRead > this.indexHeader.getIndexCount()
            || prevIndexRead == nextIndexToRead || timeRead < begin) {
            break;
        }
        //继续遍历链表
        nextIndexToRead = prevIndexRead;
    }
    ...
}

五、总结

本文从源码的角度介绍了 RocketMQ 存储体系的中心模块完结,包含存储架构、音讯写入和音讯读取。

RocketMQ 把所有 Topic 下的音讯都写入到 CommitLog 里边,完结了严厉的次序写。经过文件预热避免 Page Cache 被交换到 swap 空间,削减读写文件时缺页中断。运用 mmap 对 CommitLog 文件进行读写,将对文件的操作转化为直接对内存地址进行操作,然后极大地提高了文件的读写效率。

关于功能要求高、数据一致性要求不高的场景下,能够经过敞开堆外内存,完结读写别离,提升磁盘的吞吐量。总之,存储模块的学习需求对操作体系原理有一定了解。作者选用的功能极致优化计划值得我们好好学习。

六、参考文献

1.RocketMQ 官方文档

作者:vivo互联网服务器团队-Zhang Zhenglin