注:

  • 本文绝非对零复制机制的否定
  • 笔者才能有限,了解误差请大家多多指正

不行否认零复制关于Rocket MQ的高功能表现有着活跃正面的作用,但是笔者以为仅仅如虎添翼,并非决议性因素。Rocket MQ功能杰出的原因绝非零复制就能够一言以蔽之。本文侧重点也不在这儿,关于零复制已经有许多帖子:

  • 深化剖析Linux IO原理和几种零复制机制的完成
  • 看一遍就了解:零复制原理详解

笔者妄图从源码以及Linux内核背面讨论一下其他或许的原因。

预热机制

Rocket MQ选用内存映射来进步文件I/O拜访功能,MappedFile、MappedFileQueue办理存储文件。MappedFileQueue对存储文件进行封装能够了解为MappedFile的办理容器。譬如CommitLog文件存储位置:${ROCKE_HOME}/store/commitlog/该目录下存在多个MappedFile文件。

MappedFile是内存映射的具体完成:结构办法包括文件名称、文件巨细、以及一个transientStorePool标识位,假如敞开transientStorePoolEnable机制则表示内容先存储在堆外内存,然后经过Commit线程将数据提交到FileChannel,然后Flush线程担任耐久化。

public MappedFile(String fileName, int fileSize) throws IOException {
    init(fileName, fileSize);
}
public MappedFile(String fileName, int fileSize,
    TransientStorePool transientStorePool) throws IOException {
    init(fileName, fileSize, transientStorePool);
}

两个结构办法不谋而合的都会走init(),咱们看两参数的那个即可。

private void init(String fileName, int fileSize) throws IOException {
    this.fileName = fileName;
    this.fileSize = fileSize;
    this.file = new File(fileName);
    this.fileFromOffset = Long.parseLong(this.file.getName());
    boolean ok = false;
    ensureDirOK(this.file.getParent());
    try {
        this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        /**
         * fileChannel.map(MapMode mode, long position, long size)
         * 将此 fileChannel 对应的一个区域直接映射到内存中
         */
        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
        /* 映射内存巨细累加 */
        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
        /* 映射文件个数累加 */
        TOTAL_MAPPED_FILES.incrementAndGet();
        ok = true;
    } catch (FileNotFoundException e) {
        log.error("Failed to create file " + this.fileName, e);
        throw e;
    } catch (IOException e) {
        log.error("Failed to map file " + this.fileName, e);
        throw e;
    } finally {
        if (!ok && this.fileChannel != null) {
            this.fileChannel.close();
        }
    }
}

Java中在测验文件映射的时分供给三种模式:

  • MapMode.READ_ONLY: 任何修正缓冲区的测验将导致抛出ReadOnlyBufferException
  • MapMode.READ_WRITE:对结果缓冲区所做的更改将终究被传播到文件中
  • MapMode.PRIVATE: 对结果缓冲区所做的更改不会传播到文件中,其他程序不行见

值得注意的是映射一旦建立成功,就不再依靠fileChannel,即使此刻封闭通道也不会影响映射的有效性,因此能够依据实际状况决议要不要close。

假如了解Linux内核的话,请您必定要注意直到此刻为该文件分配的映射空间都是虚拟内存,并没有真的关联物理内存,当程序需求而物理内存又没有分配的时分则会触发一个Page Fault交由内核处理:

Rocket MQ : 拒绝神化零拷贝
上图展示的仅仅一个大概进程,实际状况复杂许多,由于缺页处理程序有必要应对多种细分的特殊状况,(参见《深化了解LINUX内核》378页),CommitLog文件巨细固定为1G,如此大内存空间读写操作势必形成很多的缺页中止,明显这儿肯定存在很多优化空间的。咱们看看Rocket MQ作者怎么优化。

无妨跟从笔者视角一探CommitLog怎么获取MappedFile文件。

public MappedFile getLastMappedFile(long startOffset) {
    return getLastMappedFile(startOffset, true);
}

getLastMappedFile办法会往AllocateMappedFileService#requestQueue堵塞队列提交AllocateRequest使命。AllocateMappedFileService服务线程此刻会被唤醒执行mmapOperation办法。大致流程:

  • 堵塞队列requestQueue.take()出来一个使命对象,服务线程被唤醒,拿到AllocateRequest对象
  • 判断是否敞开内存读写别离机制,决议选择怎么结构MappedFile。
/* 是否敞开内存读写别离 */
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
    try {
        /* Rocket答应自己定制完成细节 */
        mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
        mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
    } catch (RuntimeException e) {
        /* 没有自定义完成,运用体系默许完成 */
        log.warn("Use default implementation.");
        /* 注意这儿三参结构 */
        mappedFile = new MappedFile(
            req.getFilePath(),
            req.getFileSize(), 
            messageStore.getTransientStorePool()
        );
    }
}
else {
    /* 注意这儿两参结构 */
    mappedFile = new MappedFile(
        req.getFilePath(),
        req.getFileSize()
    );
}
  • 源码这儿将文件预热叫Pre write mappedFile,warmMappedFile办法担任具体的预热行为。这儿这么做的原因是直接将缺页中止提早至初始化阶段,后续就不会由于频频中止导致功能下降
public void warmMappedFile(FlushDiskType type, int pages) {
    long beginTime = System.currentTimeMillis();
    ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
    int flush = 0;
    long time = System.currentTimeMillis();
    for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
        byteBuffer.put(i, (byte) 0);
        /* force flush when flush disk type is sync */
        if (type == FlushDiskType.SYNC_FLUSH) {
            /* 每写入 pages 个内存页时刷盘一次 */
            if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
                flush = i;
                mappedByteBuffer.force();
            }
        }
        /* prevent gc */
        if (j % 1000 == 0) {
            log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
            time = System.currentTimeMillis();
            try {
                Thread.sleep(0);
            } catch (InterruptedException e) {
                log.error("Interrupted", e);
            }
        }
    }
    /* force flush when prepare load finished */
    if (type == FlushDiskType.SYNC_FLUSH) {
        log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
            this.getFileName(), System.currentTimeMillis() - beginTime
        );
        mappedByteBuffer.force();
    }
    log.info("mapped file warm-up done. mappedFile={}, costTime={}",
        this.getFileName(), System.currentTimeMillis() - beginTime
    );
    /* !!! 这一行超级重要 !!! */
    this.mlock();
}

mlock

  • 真香!Linux 原来是这么办理内存的
  • 一步一图带你深化了解 Linux 虚拟内存办理
  • 一步一图带你深化了解 Linux 物理内存办理

这儿需求一点点Linux内核办理内存的前置常识:不了解的朋友能够稍微了解一下swap的概念。内存能够说是计算机体系中最为宝贵的资源了,再怎么多也不够用,当体系运行时间长了之后,难免会遇到内存严重的时分,这时分就需求内核将那些不常常运用的内存页面回收起来,或许将那些能够迁移的页面进行内存规整,然后能够腾出连续的物理内存页面供内核分配。

简而言之便是当物理内存严重的时分Linux内核会将其他进程的占用的物理内存swap到交流区(现在个人了解大部分都是磁盘)。

如此一来同一物理机假如有愈加需求内存资源的进程,Linux内核彻底有或许将咱们经过预热机制十分困难全部都分配好的内存全部交流出去,这样Rocket MQ的功能必定呈现断崖式的下跌。

有没有一种机制使得进程能够独占一部分物理内存,不答应内核交流呢?神说要有光,于是Linux就暴露了mlock system call,而且Rocket MQ便是这么做的,上文说到的warmMappedFile办法的最后一行this.mlock便是用来lock memory的。

查阅一下手册就知道Linux供给了mlock, mlock2, munlock, mlockall, munlockall用来locK和unlock内存。

#include <sys/mman.h>
int mlock(const void *addr, size_t len);
int mlock2(const void *addr, size_t len, int flags);
int munlock(const void *addr, size_t len);
int mlockall(int flags);
int munlockall(void);

总结一下便是Rocket MQ为了本身的高功能拒绝内存被操作体系交流

madvise

为了避免剧透,刚刚一直没有带大家看看MappedFile#mlock其实该办法还有其他妙处。

public void mlock() {
    long beginTime = System.currentTimeMillis();
    long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
    Pointer pointer = new Pointer(address);
    {
        int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
        log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
    }
    {
        /**
         * MADV_WILLNEED 表示应用程序希望很快拜访此地址规模
         */
        int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
        log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
    }
}

为了愈加极致的功能体会,Linux操作体系暴露了madvise sysytem call ,madvise()体系调用,用于向内核供给关于开始地址为addr,长度为length内存空间的操作主张或许指示。在大多数状况下,此类主张的目标是进步体系或许应用程序的功能。

#include <sys/mman.h>
int madvise(void *addr, size_t length, int advice);

最初,此体系调用,仅仅支撑一组惯例的(conventional)主张值,这些主张值在各种体系中也有完成,(但是请注意,POSIX中并没有指定madvise()),后来又添加了许多特定于Linux的主张值。第三个参数advice其实便是一个标识,依据标识不同Linux内核采取的策略也有所区别。

  • Conventional advice values
    • MADV_NORMAL:不做任何特殊处理,这是默许操作
    • MADV_RANDOM:期望以随机的次序拜访page,这等价于告诉内核,随机性强,局部性弱,预读机制含义不大
    • MADV_SEQUENTIAL:与MADV_RANDOM相反,期望次序的拜访page,因此内核应该活跃的预读给定规模内的page,并在拜访过后快速开释
    • MADV_WILLNEED:估计不久将会被拜访,因此提早预读几页是个不错的主意
    • MADV_DONTNEED:与MADV_WILLNEED相反,估计未来长期不会被拜访,能够以为应用程序完成了对这部分内容的拜访,因此内核能够开释与之相关的资源
  • Linux-specific advice values:Rocket MQ用的便是惯例值,然后Linux特定值又特别多,所以这儿挑选几个讲一下
    • MADV_DONTFORK:在执行fork(2)后,子进程不答应运用此规模的页面。这样是为了避免COW机制导致父进程在写入页面时更改页面的物理位置
    • MADV_DOFORK:吊销MADV_DONTFORK的作用,恢复默许行为
    • MADV_NOHUGEPAGE:保证指定规模内的页面不会运用透明大页。

Rocket MQ运用的是MADV_WILLNEED主张值,每次会预取进步功能。

文件体系设计

针对Producer和Consumer别离选用了数据和索引部分相别离的存储结构,Producer发送音讯至Broker端,然后Broker端运用同步或许异步的方法对音讯刷盘耐久化,保存至CommitLog文件。

Rocket MQ选用混合型存储结构,多个Topic的音讯实体内容都存储于一个CommitLog(不包括由于文件写满,更换下一个文件的状况),这使得所有的音讯数据全部都是次序写入该文件。然后RocketMQ运用Broker端的后台服务线程—ReputMessageService不停地分发恳求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。虽然说由于还要落盘另外两种索引文件导致Rocket MQ其实没有办法保证大局的次序写,但这两种文件其实足够小,况且索引文件本身也是次序写,一起由于索引文件的特殊作用,也不行能将他们与数据文件相兼并,能够说Rocket MQ已经尽最大努力保证大局次序写了。

硬件加持

页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行次序读写的速度简直挨近于内存的读写速度,主要原因便是由于OS运用PageCache机制对读写拜访操作进行了功能优化,将一部分的内存用作PageCache。关于数据的写入,OS会先写入至Cache内,随后经过异步的方法由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。关于数据的读取,假如一次读取文件时出现未命中PageCache的状况,OS从物理磁盘上拜访读取文件的一起,会次序对其他相邻块的数据文件进行预读取。

在RocketMQ中,ConsumeQueue逻辑消费队列存储的数据较少,并且是次序读取,在page cache机制的预读取作用下,Consume Queue文件的读功能简直挨近读内存,即使在有音讯堆积状况下也不会影响功能。而关于CommitLog音讯存储的日志数据文件来说,读取音讯内容时分会发生较多的随机拜访读取,严重影响功能。假如选择合适的体系IO调度算法,比方设置调度算法为“Deadline”(此刻块存储选用SSD的话),随机读的功能也会有所提高。

另外,RocketMQ主要经过MappedByteBuffer对文件进行读写操作。其间,利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中,将对文件的操作转化为直接对内存地址进行操作,然后极大地进步了文件的读写功率(正由于需求运用内存映射机制,故RocketMQ的文件存储都运用定长结构来存储,便利一次将整个文件映射至内存)。

上面说到的相邻文件预读、Mmap内存映射本质原因都是由于能够向内存借力,没有更强壮的内存硬件一切都是空谈。其实软件工程师所能做的相对有限,咱们仅仅在最大限度的发挥硬件的才能。

总结

笔者以为Rocket MQ高功能的关键是:

  • 内存加持,充分发挥硬件才能
  • 文件预热,将中止响应提早到初始化阶段
  • mlock禁止Linux交流内存
  • madvise向操作体系提出内存空间的操作主张或许指示
  • 优秀的文件体系设计,尽最大或许保证次序写
  • 将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方法减少了传统IO将磁盘文件数据在操作体系内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行复制的功能开支)

引证

  • 《深化了解LINUX内核》
  • 《linux手册翻译——madvise(2)》
  • 《官方文档》