截止目前,Rocket MQ的官方文档尚未明确表示自身具备的Crash-Safe的能力。所以这个概念是笔者(-颜如玉)根据自身理解提出的。指明这个事实是因为想象到了有的读者如若遇到“咬文嚼字”老学究式的交流对象可能不太愿意认可官方之外的内容。那我们只需要清楚背后的实现原理,设计思想即可,不必过分较真概念与名词。

Crash-Safe

MySQL中Crash-Safe能力是指宕机、异常重启之后提供的强数据一致性保证:

  • 所有已经提交的事务的数据仍然存在
  • 所有没有提交的事务的数据能够回滚

Rocket MQ中也有类似的机制保证数据的一致性:

  • CommitLog已持久化的数据ConsumeQueue、IndexFile中的索引数据仍然有效
  • CommitLog未持久化的数据ConsumeQueue、IndexFile中的索引数据必须销毁

其实这个很好理解,ConsumeQueue、IndexFile相当于都是索引文件,并不负责存储任何消息的具体数据。所以即使索引文件已经构建完备,但是如果没有CommitLog的加持也将一文不值。

Crash-Safe与存储息息相关,Rocket MQ中涉及到持久化的功能绝大部分集中在Broker节点(鲜有例外,但是客观上确实存在,比如广播模式下的消费进度管理)。况且上文提到的三个文件全部都是Broker节点负责管理,因此本文只关注ta。

正常退出流程

Broker启动之初已经注册了JVM ShutdownHook,虚拟机停止运行之前会尽可能的走完漫长的BrokerController#shutdown过程。

public class BrokerStartup {
    /* 启动入口 */
    public static void main(String[] args) {
        start(createBrokerController(args));
    }
    /* 创建BrokerController对象 */
    public static BrokerController createBrokerController(String[] args) {
        ......
        BrokerController controller = new BrokerController(
            brokerConfig, nettyServerConfig,
            nettyClientConfig, messageStoreConfig
        );
        ......
        /* 注册shutdown函数 */
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() 
            private volatile boolean hasShutdown = false;
            private final AtomicInteger shutdownTimes = new AtomicInteger(0);
            @Override
            public void run() {
                synchronized (this) {
                    if (!this.hasShutdown) {
                        this.hasShutdown = true;
                        controller.shutdown();
                    }
                }
            }
            }, "ShutdownHook")
        );
        return controller;
     }
}

Broker shutdown时需要关闭的资源非常多,包含但不仅限于多维度数据统计结束、主从高可用数据同步结束、多个文件进行刷盘、持久化消费进度、关闭网络连接、关闭线程资源……笔者只分析跟本文相关的部分。对CommitLog、ConsumeQueue、IndexFile三个文件的善后处理,全部集中在DefaultMessageStore#shutdown,同样该方法也很繁琐,笔者截取部分关键代码。

public void shutdown() {
    ......
    if (!this.shutdown) {
        this.indexService.shutdown();
        this.commitLog.shutdown();
        this.reputMessageService.shutdown();
        this.flushConsumeQueueService.shutdown();
    }
    ......
}

退出之前Rocket MQ会将三个文件,尽可能的全部刷盘持久化。笔者认为除非存在代码缺陷,否则可以认为这三个关键文件是完整的、可以信任的。日后重启,获得FileChannel而后map进内存即可,不需要复杂的恢复逻辑。但Rocket MQ面对正常退出的情景,依然会有check过程。

abort

虽然都会进行文件修复工作,但是两者逻辑有较大区别,随之而来的问题是我们如何来断定当时服务所处的情形呢?总不会每次重启都要人工判断吧,既不快捷,也不可靠。

作为一个高效、自洽的中间件,Rocket MQ坚持最小化外部依赖,所以当时的状态绝对不可以保存在第三方应用中。假如你是Rocket MQ的作者你会怎么设计呢?本来Broker就可以看作是一个优秀的文件管理系统,难道不能设计一个文件来保存当时的状态吗。 实不相瞒Rocket MQ真的就是这么做的。只不过他不在文件中记录信息,而是以abort文件是否存在来辨别。

保存日志数据的根目录:

public class MessageStoreConfig {
    /* The root directory in which the log data is kept */
    @ImportantField
    private String storePathRootDir = 
        System.getProperty("user.home") 
            + File.separator
            + "store";
}

Rocket MQ  Crash-Safe机制浅析
根据Rocket MQ自身约定,storePathRootDir下入如果存在名为”abort”的文件,则说明异常退出,该判断逻辑会在Broker服务初始化DefaultMessageStore#load这一环节触发。

public class DefaultMessageStore implements MessageStore {
    /* 检查abort文件是否存在 */
    private boolean isTempFileExist() {
        String fileName = StorePathConfigHelper
            .getAbortFile(
                this.messageStoreConfig.getStorePathRootDir()
            );
        File file = new File(fileName);
        return file.exists();
    }
}
public class StorePathConfigHelper {
    /* 获取abort文件路径,通过根目录与文件名拼接而成 */
    public static String getAbortFile(String rootDir) {
        return rootDir + File.separator + "abort";
    }
}

那么abort文件是什么时候删除的呢?其实DefaultMessageStore#shutdown中如果符合条件,会删除这个文件,只不过刚刚害怕剧透,笔者没有展示。

public void shutdown() {
    ......
    if (this.runningFlags.isWriteable() && dispatchBehindBytes() == 0) {
        this.deleteFile(
            StorePathConfigHelper.getAbortFile(
                this.messageStoreConfig.getStorePathRootDir()
            )
        );
        shutDownNormal = true;
    } else {
        log.warn("the store may be wrong, so shutdown abnormally, and keep abort file.");
    }
    ......
}

经过上面isTempFileExist方法的检查,已经可以明确得知上次服务退出的状况。如果上一次是正常退出,意味着此时abort文件是不存在的,下一次重启的时候就没有办法确定是否正常退出了,因此启动之初我们肯定要创建这个文件,确实如我们所料,在Broker启动之时会触发此逻辑。

private void createTempFile() throws IOException {
    String fileName = StorePathConfigHelper.getAbortFile(
        this.messageStoreConfig.getStorePathRootDir()
    );
    File file = new File(fileName);
    MappedFile.ensureDirOK(file.getParent());
    /* 上一次是异常退出,则文件已经存在,不用重新创建 */
    boolean result = file.createNewFile();
    log.info(fileName + (result ? " create OK" : " already exists"));
}

CheckPoint

前面提到的一切都无法解决异常退出时候的问题,完全有可能三个重要文件的刷盘进度都不一致,那我应该怎么确定可信范围呢?不可能我将文件全部丢弃,那样损失的消息实在太多。任何业务系统都不允许这么沉重的代价。所以Rocket MQ也引入了CheckPoint机制,尽可能的减少消息丢失。

不同之处在于Rocket MQ并没有MySQL那么复杂的机制,而且CheckPoint是以文件的形式存在,里面包含多种文件的最后刷盘点。但是有一点是高度相似的,即check point之前的数据是一定可信的。

public class StorePathConfigHelper {
    /**
     * 获取checkpoint的文件路径
     *
     * 存储内容:
     *    CommitLog文件最后一次刷盘时间戳、
     *    ConsumerQueue最后一次刷盘时间戳、
     *    IndexFile索引最后一次刷盘时间戳
     *
     * @see DefaultMessageStore#load
     * @see DefaultMessageStore#destroy
     *
     * @param rootDir ${ROCKET_HOME}/store
     */
    public static String getStoreCheckpoint(String rootDir) {
        return rootDir + File.separator + "checkpoint";
    }
}

与abort文件相同,checkpoint文件也是DefaultMessageStore#load之时初始化,二者所在的目录也相同,不同的是abort的价值仅仅在于存在与否,checkpoint则需要时时刻刻准备同步刷盘点。为了性能考虑,该文件也使用了内存映射技术。

public static final int OS_PAGE_SIZE = 1024 * 4;
public StoreCheckpoint(String scpPath) throws IOException {
    File file = new File(scpPath);
    MappedFile.ensureDirOK(file.getParent());
    boolean fileExists = file.exists();
    /* "rw"标记,表示文件权限可读、可写 */
    this.randomAccessFile = new RandomAccessFile(file, "rw");
    this.fileChannel = this.randomAccessFile.getChannel();
    /* 内存映射 */
    this.mappedByteBuffer = fileChannel.map(
        MapMode.READ_WRITE,
        0, 
        MappedFile.OS_PAGE_SIZE
    );
    if (fileExists) {
        log.info("store checkpoint file exists, " + scpPath);
        /* !!!!!! */
        this.physicMsgTimestamp = this.mappedByteBuffer.getLong(0);
        this.logicsMsgTimestamp = this.mappedByteBuffer.getLong(8);
        this.indexMsgTimestamp = this.mappedByteBuffer.getLong(16);
        /* !!!!!! */
    } else {
        log.info("store checkpoint file not exists, " + scpPath);
    }
}

着重标出三行代码,每行代码都表示获取一个long型整数,分别取自文件中的0-7、8-15、16-23字节。也就是说虽然消耗了4kb的硬盘空间,4kb的内存空间,但是该文件发挥作用的只有前24byte。

  • physicMsgTimestamp: CommitLog文件最后刷盘时间点
  • logicsMsgTimestamp: ConsumeQueue文件最后刷盘时间点
  • indexMsgTimestamp : IndexFile文件最后刷盘时间点

Rocket MQ  Crash-Safe机制浅析
每当单个文件刷盘之后就会更新该文件,但是此时只是在内存中维护进度,并没有持久化。
Rocket MQ  Crash-Safe机制浅析
Rocket MQ  Crash-Safe机制浅析
Rocket MQ  Crash-Safe机制浅析
只有调用StoreCheckpoint#flush方法才会持久化到磁盘。IndexFile、ConsumeQueue刷盘时都会触发。

public void flush() {
    this.mappedByteBuffer.putLong(0, this.physicMsgTimestamp);
    this.mappedByteBuffer.putLong(8, this.logicsMsgTimestamp);
    this.mappedByteBuffer.putLong(16, this.indexMsgTimestamp);
    this.mappedByteBuffer.force();
}

文件恢复

是否正常退出将直接决定文件恢复的方式。

private void recover(boolean lastExitOK) {
    long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
    /* abort文件已经不存在 */
    if (lastExitOK) {
        /* 正常终止 */
        this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
    } else {
        /* 异常终止 */
        this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
    }
    this.recoverTopicQueueTable();
}

但是无论何种方式,都会使用maxPhyOffsetOfConsumeQueue这个关键信息。

private long recoverConsumeQueue() {
    long maxPhysicOffset = -1;
    for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
        for (ConsumeQueue logic : maps.values()) {
            logic.recover();
            if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
                maxPhysicOffset = logic.getMaxPhysicOffset();
            }
        }
    }
    return maxPhysicOffset;
}

consumeQueueTable是一个二维的Map结构保留了当前Broker下所有的Queue信息。ConcurrentMap<Topic, ConcurrentMap<QueueId, ConsumeQueue>>这么一表示应该就很好理解了。这个方法就是遍历所有的ConsumeQueue文件,拿到最大的偏移量。也就是说maxPhysicOffset偏移量之前的消息都已被构建索引。

正常退出

正常退出时的恢复过程

  • 获取checkCRCOnRecover配置决定消息要不要进行CRC32检查,开启的话会有部分性能损失,建议开启
  • 判断映射文件的数量,如果 > 3,则从倒数第三个文件开始恢复,如果不足三个则从正数第一个开始
  • 循环处理文件中的每一条消息,如果消息验证通过则将验证的偏移量往上累加,同时该文件的读指针也往后推动
  • 处理完文件如果发现CommitLog实际落盘的偏移量 < maxPhysicOffset,则将索引文件中多余的信息清除掉
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
    boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
    List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {
        /* 重启时从倒数第三个文件开始恢复 */
        int index = mappedFiles.size() - 3;
        /* 不足三个则从第一个开始 */
        if (index < 0) index = 0;
        MappedFile mappedFile = mappedFiles.get(index);
        /* byteBuffer与mappedFile内部的mappedByteBuffer是同一份内存,只是独立维护了读指针 */
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        /* 已确认的物理偏移量,即当前文件起始偏移量 */
        long processOffset = mappedFile.getFileFromOffset();
        /* 当前文件已经校验通过的offset */
        long mappedFileOffset = 0;
        while (true) {
            DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
            int size = dispatchRequest.getMsgSize();
            /* Normal data */
            if (dispatchRequest.isSuccess() && size > 0) {
                mappedFileOffset += size;
            } else if (dispatchRequest.isSuccess() && size == 0) {
                /* 说明该文件已经检验完毕 */
                index++;
                if (index >= mappedFiles.size()) {
                    /* Current branch can not happen */
                    break;
                } else {
                    /* 还有其余的文件需要校验 */
                    mappedFile = mappedFiles.get(index);
                    byteBuffer = mappedFile.sliceByteBuffer();
                    processOffset = mappedFile.getFileFromOffset();
                    mappedFileOffset = 0;
                }
            }
            /* Intermediate file read error */
            else if (!dispatchRequest.isSuccess()) {
                log.info("recover physics file end, " + mappedFile.getFileName());
                break;
            }
        }
        processOffset += mappedFileOffset;
        /* 更新当前刷盘指针,表示该指针之前的所有数据已经持久化到磁盘 */
        this.mappedFileQueue.setFlushedWhere(processOffset);
        /* 当前数据提交指针,内存中ByteBuffer当前的写指针 */
        this.mappedFileQueue.setCommittedWhere(processOffset);
        /* 截断CommitLog脏文件 */
        this.mappedFileQueue.truncateDirtyFiles(processOffset);
        /* Clear ConsumeQueue redundant data */
        if (maxPhyOffsetOfConsumeQueue >= processOffset) {
            /* 截断ConsumeQueue脏文件 */
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        }
    } else {
        /* CommitLog case files are deleted */
        log.warn("The commitlog files are deleted, and delete the consume queue files");
        this.mappedFileQueue.setFlushedWhere(0);
        this.mappedFileQueue.setCommittedWhere(0);
        this.defaultMessageStore.destroyLogics();
    }
}

其实根据我对Rocket MQ的理解,因为ConsumeQueue、IndexFile信息滞后于CommitLog,本质是ReputMessageService线程异步构建的,故而这两者中的索引信息记录的偏移量小于processOffset是可以理解的,但是我不太理解为什么会出现processOffset < maxPhyOffsetOfConsumeQueue的情况,就是索引信息反而跑到了元信息之前。

异常宕机

异常宕机时的恢复过程有一些不同

  • 获取checkCRCOnRecover配置决定消息要不要进行CRC32检查,开启的话会有部分性能损失,建议开启
  • 从最后一个文件开始扫描,直到找到合适的文件,如果没有一个符合条件则选中正数第一个,参见CommitLog#isMappedFileMatchedRecover方法
    • 校验文件的正数第4-7字节是否等于CommitLog#MESSAGE_MAGIC_CODE
    • 根据Flag判断读取第48 + 8或者48 + 20往后的两个字节获得该消息的存储时间戳是否为零
    • 检查messageIndexEnable、messageIndexSafe配置项如果都为true,则取三个文件中最早的刷盘时间点,默认只选取CommitLog、ConsumeQueue两个文件参与计算(checkpoint文件中的时间)
    • 比较当前文件第一条消息的落盘时间如果小于上述时间,则说明可以从该文件开始修复数据。如果大于则选择上一个文件。
  • 经历上述步骤,如果能找到符合要求的文件,则遍历该文件,检查消息是否合法,合法的消息将重新分发给ConsumeQueue、IndexFile文件建立索引。
  • 如果没有找到则设置MappedFileQueue的flushedWhere、committedWhere指针都为零,删除ConsumeQueue文件

在这期间同一个消息完全可能被构造了两次索引,但是Rocket MQ本来就不保证只投递一次,瑕不掩瑜。

总结

如果你愿意牺牲部分性能选择同步发送,同步刷盘,那么只要返回成功,那消息一定是已经持久化,这个无论是否正常退出,你都不用担心消息丢失。

如果不是上述配置,Rocket MQ本来就不保证不丢失消息,在这基础之上根据checkpoint尽可能的帮助你挽回损失,能保证checkpoint之前的消息一定不会丢失可以说是具备Crash-Safe能力。