本文紧接着算法技巧-时刻轮(一)

经典完成

Netty中时刻轮完成

dubbo中时刻轮完成参阅的便是netty中的,根本完成都差不多。这里不一一详述了。

时刻轮 时刻轮的格子 格子里的使命 时刻轮作业线程
HashedWheelTimer HashedWheelBucket HashedWheelTimeout Worker

运用场景

  1. 推迟音讯发送:当需求在必守时刻后发送音讯时,能够运用时刻轮来办理这些音讯的守时触发时刻。
  2. 心跳检测:当需求守时向客户端发送心跳包时,能够运用时刻轮来办理心跳包的发送时刻。
  3. 连接超时检测:当需求检测客户端连接是否超时时,能够运用时刻轮来办理连接的超时时刻。
  4. 断线重连:当客户端与服务器的连接断开时,能够运用时刻轮来办理重连的时刻距离。
  5. 限流操控:当需求约束某些操作的频率时,能够运用时刻轮来办理操作的触发时刻,然后完成限流操控。

KAFKA中时刻轮完成

Kafka内部有许多延时性的操作,如延时出产延时拉取延时数据删去等,这些延时功能由内部的延时操作办理器来做专门的处理,其底层是采用时刻轮完成的。

根本概念

Kafka 中的时刻轮( TimingWheel )是一个存储守时使命的环形行列, 底层采用数组完成,数组中的每个元素能够寄存一个守时使命列表( TimerTaskList )。 TimerTaskList是一个环形的双向链表,链表中的每一项便是守时使命项( TimerTaskEntry ),其中封装了真实的守时使命( TimerTask )。

算法技巧-时间轮(二)

  • tickMs:单位时刻跨度
  • wheelSize:时刻轮槽长度
  • interval:总体时刻跨度 tickMs * wheelSize

若整个时刻轮的总体时刻跨度interval = tickMs * wheelSize,比方20ms,那么关于守时为350ms 的使命该如何处理?此刻现已超出了时刻轮能表明的时刻跨度。

除了前文DUBBO中的剩下履行轮数remainingRounds优化,还能够运用层级时刻轮

算法技巧-时间轮(二)
当使命的到期时刻超越了当时时刻轮所表明的时刻规模时,就会测验增加到上层时刻轮中。比方关于20ms跨度的时刻轮,它的上级是interval = 20 * 20 = 400ms,关于400ms跨度的时刻轮,它的上级是interval = 400 * 20 = 8000ms,以此类推: 举个比方,关于450ms 的守时使命:

  1. 首要,会升级寄存到第三层时刻轮中,被刺进到第三层时刻轮的时刻格1所对应的 TimerTaskList;
  2. 随着时刻的消逝,当此 TimerTaskList 到期之时,本来守时为 450ms 的使命还剩下 50ms 的时刻,还不能履行这个使命的到期操作;
  3. 于是履行时刻轮降级,将剩下时刻为 50ms 的守时使命重新提交到第二层到期时刻为 [40ms,60ms)的时刻格中;
  4. 再经历 40ms 之后,此刻这个使命又被 “发觉 ”,不过还剩下 10ms ,所以还要降级一次,放到第一层时刻轮的[ 10ms, 11ms)的时刻格中;
  5. 最后,经历 l0ms 后,此使命真实到期,最终履行相应的到期操作。

还存在一个问题,实践场景中,并不是每个槽都有守时使命,这个假如按照DUBBO时刻轮履行方式,每个槽都需求履行一下,然后再sleep,详细代码可参阅

private long waitForNextTick() {
        long deadline = tickDuration * (tick + 1);
        for (; ; ) {
            final long currentTime = System.nanoTime() - startTime;
            long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
            if (sleepTimeMs <= 0) {
                if (currentTime == Long.MIN_VALUE) {
                    return -Long.MAX_VALUE;
                } else {
                    return currentTime;
                }
            }
            if (isWindows()) {
                sleepTimeMs = sleepTimeMs / 10 * 10;
            }
            try {
                Thread.sleep(sleepTimeMs);
            } catch (InterruptedException ignored) {
                if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                    return Long.MIN_VALUE;
                }
            }
        }
    }

那么在KAFKA中怎样做的呢,实践是运用了JDK 中的 DelayQueue 来推动时刻轮。详细做法是将每个运用到的TimerTaskList都加入到一个DelayQueue中,DelayQueue 会依据 TimerTaskList的超时时刻来排序,最短超时时刻的TimerTaskList会被排在 DelayQueue 的队头。
详细做法:

  • 关于每个运用到的TimerTaskList 调用delayQueue.offer加入DelayQueue,超时时刻为TimerTaskList对应的expired;

  • DelayQueue会依据TimerTaskList 对应的超时时刻expiration来排序, 最短expiration 的TimerTaskList会被排在DelayQueue的队头。

  • Kafka 中会有一个线程通过调用delayQueue.take来获取DelayQueue中到期的使命列表,这个线程叫作“ExpiredOperationReaper”,能够直译为“过期操作收割机”。

  • 对获取到的使命列表,履行详细的使命

PS:KAFKA时刻轮涉及来源于网上

去哪儿QMQ完成

讲到QMQ之前,咱们先讲一下ROCKETMQ

ROCKETMQ

RocketMQ 开源版别支撑延时音讯,可是只支撑 18 个 Level 的延时,并不支撑恣意时刻。只不过这个 Level 在 RocketMQ 中能够自定义的,所幸来说对一般事务算是够用的。默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。

浅显的讲,设定了延时 Level 的音讯会被暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并依据 level 存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延时的音讯,确保具有相同发送延时的音讯能够次序消费。 broker会调度地消费SCHEDULE_TOPIC_XXXX,将音讯写入真实的topic。

算法技巧-时间轮(二)

优点:

  • Level 数固定,每个 Level 有自己的守时器,开支不大
  • 将 Level 相同的音讯放入到同一个 Queue 中,确保了同一 Level 音讯的次序性;不同 Level 放到不同的 Queue 中,确保了投递的时刻准确性;
  • 通过只支撑固定的Level,将不同延时音讯的排序变成了固定Level Topic 的追加写操作

缺点:

  • Level 装备的修正代价太大,固定 Level 不灵活
  • CommitLog 会由于延时音讯的存在变得很大

固定延时Level完成简单,可是实践运用时不太实用。ROCKETMQ商业版支撑恣意时刻消费延时音讯,不过需求运用阿里云中间件,付费。

QMQ概述

QMQ是去哪儿网内部广泛运用的音讯中间件,自2012年诞生以来在去哪儿网一切事务场景中广泛的运用,包括跟买卖休戚相关的订单场景; 也包括报价查找等高吞吐量场景。官网
QMQ 的根本构成组件如下:

算法技巧-时间轮(二)

  • Meta Server:供给集群办理和集群发现的效果
  • Server:供给实时音讯服务
  • Delay Server:供给延时 / 守时音讯服务,延时音讯先在 delay server 排队,时刻到之后再发送给 server
  • Producer:音讯出产者
  • Consumer:音讯顾客

QMQ供给恣意时刻的延时/守时音讯,你能够指定音讯在未来两年内(可装备)恣意时刻内投递。里边规划的中心简单来说便是 多级时刻轮 + 延时加载 + 延时音讯独自磁盘存储

QMQ的延时/守时音讯运用的是两层 hash wheel 来完成的。

算法技巧-时间轮(二)

  • message log:和实时音讯里的 message log 类似,收到音讯后 append 到该 log 就返回给 producer,相当于 WAL。
  • schedule log:按照投递时刻安排,每个小时一个。该 log 是回放 message log 后依据延时时刻放置对应的 log 上,这是上面描绘的两层 hash wheel 的 第一层,坐落磁盘上。schedule log 里是包括完好的音讯内容的,由于音讯内容从 message log 同步到了 schedule log,所以历史 message log 都 能够删去 (所以 message log 只需求占用极小的存储空间,所以咱们能够运用低容量高性能的 ssd 来获取极高的吞吐量)。另外,schedule log 是按照延时 时刻安排的,所以延时时刻已过的 schedule log 文件也能够删去。
    • 第一层坐落磁盘上,每个小时为一个刻度(默认为一个小时一个刻度,能够依据实践情况在装备里进行调整),每个刻度会生成一个日志文件(schedule log),由于QMQ支撑两年内的延时音讯(默认支撑两年内,能够进行装备修正),则最多会生成 2 * 366 * 24 = 17568 个文件(假如需求支撑的最大延时时刻更短,则生成的文件更少)。
    • 第二层在内存中,当音讯的投递时刻即将到来的时分,会将这个小时的音讯索引(索引包括音讯在schedule log中的offset和size)从磁盘文件加载到内存中的hash wheel上,内存中的hash wheel则是以500ms为一个刻度
  • dispatch log:延时 / 守时音讯投递成功后写入,首要用于在运用重启后能够确认哪些音讯现已投递,dispatch log 里写入的是音讯的 offset,不包括音讯 内容。当延时 server 中途重启时,咱们需求判断出当时这个刻度 (比方一个小时) 里的音讯有哪些现已投递了则不重复投递。

QMQ中心完成

首要完成如下:

算法技巧-时间轮(二)

大体流程图
算法技巧-时间轮(二)

  1. Delay Server 中包括的几个周期守时使命
  • messageLogFlushService:担任 delay server 承受音讯后,将 messagelog 刷盘
  • dispatchLogFlushService:delay message 到期发送后,写 offset 到 dispatchlog,其首要担任将 dispatchlog 刷盘
  • iterateOffsetFlushService:首要担任回放 messagelog,并办理回放进展,进展保存在 message_log_iterate_checkpoint.json
  1. WheelTickManager 首要作业
  • start timer: 初始化并启动时刻轮
  • recover:依据 dispatchlog 和 回放进展恢复时刻轮数据
  • load schedulelog:周期加载 schedulelog 数据来填充时刻轮数据
  • 监听 messagelog 的回放事情,回放增加 schedulelog 的时分判断 (改延时音讯是否归于当时推迟刻度,eg. 1h 内) 是否需求将其增加到时刻轮中
  1. HashedWheelTimer 时刻轮完成
    时刻轮 和 Dubbo 大体是一致的,都是先将使命增加到 Queuetimeouts 中,然后周期从这个列表中获取 100000 个来增加到 HashedWheel 中对应的 HashedWheelBucket 中。

总结

时刻轮是一种高效的延时行列,能够参阅上述业界完成来运用到实践场景中。

参阅
1.RocketMQ 音讯集成:多类型事务音讯——守时音讯
2.19 TimingWheel:探究Kafka守时器背面的高效时刻轮算法
3.透彻了解Kafka(十)——时刻轮调度
4.延时音讯常见完成方案
5.深入 RocketMQ- 音讯原理篇
6.延时使命一锅端