本文紧接着算法技巧-时刻轮(一)
经典完成
Netty中时刻轮完成
dubbo中时刻轮完成参阅的便是netty中的,根本完成都差不多。这里不一一详述了。
时刻轮 | 时刻轮的格子 | 格子里的使命 | 时刻轮作业线程 |
---|---|---|---|
HashedWheelTimer | HashedWheelBucket | HashedWheelTimeout | Worker |
运用场景
-
推迟音讯发送
:当需求在必守时刻后发送音讯时,能够运用时刻轮来办理这些音讯的守时触发时刻。 -
心跳检测
:当需求守时向客户端发送心跳包时,能够运用时刻轮来办理心跳包的发送时刻。 -
连接超时检测
:当需求检测客户端连接是否超时时,能够运用时刻轮来办理连接的超时时刻。 -
断线重连
:当客户端与服务器的连接断开时,能够运用时刻轮来办理重连的时刻距离。 -
限流操控
:当需求约束某些操作的频率时,能够运用时刻轮来办理操作的触发时刻,然后完成限流操控。
KAFKA中时刻轮完成
Kafka内部有许多延时性的操作,如延时出产
,延时拉取
,延时数据删去
等,这些延时功能由内部的延时操作办理器来做专门的处理,其底层是采用时刻轮完成的。
根本概念
Kafka 中的时刻轮( TimingWheel )是一个存储守时使命的环形行列, 底层采用数组完成,数组中的每个元素能够寄存一个守时使命列表( TimerTaskList )。 TimerTaskList是一个环形的双向链表,链表中的每一项便是守时使命项( TimerTaskEntry ),其中封装了真实的守时使命( TimerTask )。
- tickMs:单位时刻跨度
- wheelSize:时刻轮槽长度
- interval:总体时刻跨度 tickMs * wheelSize
若整个时刻轮的总体时刻跨度interval = tickMs * wheelSize
,比方20ms,那么关于守时为350ms 的使命该如何处理?此刻现已超出了时刻轮能表明的时刻跨度。
除了前文DUBBO中的剩下履行轮数
remainingRounds
优化,还能够运用层级时刻轮
当使命的到期时刻超越了当时时刻轮所表明的时刻规模时,就会测验增加到上层时刻轮中。比方关于20ms跨度的时刻轮,它的上级是interval = 20 * 20 = 400ms
,关于400ms跨度的时刻轮,它的上级是interval = 400 * 20 = 8000ms
,以此类推:
举个比方,关于450ms 的守时使命:
- 首要,会升级寄存到第三层时刻轮中,被刺进到第三层时刻轮的时刻格1所对应的 TimerTaskList;
- 随着时刻的消逝,当此 TimerTaskList 到期之时,本来守时为 450ms 的使命还剩下 50ms 的时刻,还不能履行这个使命的到期操作;
- 于是履行时刻轮降级,将剩下时刻为 50ms 的守时使命重新提交到第二层到期时刻为 [40ms,60ms)的时刻格中;
- 再经历 40ms 之后,此刻这个使命又被 “发觉 ”,不过还剩下 10ms ,所以还要降级一次,放到第一层时刻轮的[ 10ms, 11ms)的时刻格中;
- 最后,经历 l0ms 后,此使命真实到期,最终履行相应的到期操作。
还存在一个问题,实践场景中,并不是每个槽都有守时使命,这个假如按照DUBBO时刻轮履行方式,每个槽都需求履行一下,然后再sleep,详细代码可参阅
private long waitForNextTick() {
long deadline = tickDuration * (tick + 1);
for (; ; ) {
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
if (isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
那么在KAFKA中怎样做的呢,实践是运用了JDK 中的 DelayQueue 来推动时刻轮。详细做法是将每个运用到的TimerTaskList都加入到一个DelayQueue中,DelayQueue 会依据 TimerTaskList的超时时刻来排序,最短超时时刻的TimerTaskList会被排在 DelayQueue 的队头。
详细做法:
-
关于每个运用到的TimerTaskList 调用delayQueue.offer加入DelayQueue,超时时刻为TimerTaskList对应的expired;
-
DelayQueue会依据TimerTaskList 对应的超时时刻expiration来排序, 最短expiration 的TimerTaskList会被排在DelayQueue的队头。
-
Kafka 中会有一个线程通过调用delayQueue.take来获取DelayQueue中到期的使命列表,这个线程叫作“ExpiredOperationReaper”,能够直译为“过期操作收割机”。
-
对获取到的使命列表,履行详细的使命
PS:KAFKA时刻轮涉及来源于网上
去哪儿QMQ完成
讲到QMQ之前,咱们先讲一下ROCKETMQ
ROCKETMQ
RocketMQ 开源版别支撑延时音讯,可是只支撑 18 个 Level 的延时,并不支撑恣意时刻。只不过这个 Level 在 RocketMQ 中能够自定义的,所幸来说对一般事务算是够用的。默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。
浅显的讲,设定了延时 Level 的音讯会被暂存在名为SCHEDULE_TOPIC_XXXX
的topic中,并依据 level 存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延时的音讯,确保具有相同发送延时的音讯能够次序消费。 broker会调度地消费SCHEDULE_TOPIC_XXXX,将音讯写入真实的topic。
优点:
- Level 数固定,每个 Level 有自己的守时器,开支不大
- 将 Level 相同的音讯放入到同一个 Queue 中,确保了同一 Level 音讯的次序性;不同 Level 放到不同的 Queue 中,确保了投递的时刻准确性;
- 通过只支撑固定的Level,将不同延时音讯的排序变成了固定Level Topic 的追加写操作
缺点:
- Level 装备的修正代价太大,固定 Level 不灵活
- CommitLog 会由于延时音讯的存在变得很大
固定延时Level完成简单,可是实践运用时不太实用。ROCKETMQ商业版支撑恣意时刻消费延时音讯,不过需求运用阿里云中间件,付费。
QMQ概述
QMQ是去哪儿网内部广泛运用的音讯中间件,自2012年诞生以来在去哪儿网一切事务场景中广泛的运用,包括跟买卖休戚相关的订单场景; 也包括报价查找等高吞吐量场景。官网
QMQ 的根本构成组件如下:
- Meta Server:供给集群办理和集群发现的效果
- Server:供给实时音讯服务
- Delay Server:供给延时 / 守时音讯服务,延时音讯先在 delay server 排队,时刻到之后再发送给 server
- Producer:音讯出产者
- Consumer:音讯顾客
QMQ供给恣意时刻的延时/守时音讯,你能够指定音讯在未来两年内(可装备)恣意时刻内投递。里边规划的中心简单来说便是 多级时刻轮 + 延时加载 + 延时音讯独自磁盘存储 。
QMQ的延时/守时音讯运用的是两层 hash wheel 来完成的。
- message log:和实时音讯里的 message log 类似,收到音讯后 append 到该 log 就返回给 producer,相当于 WAL。
- schedule log:按照投递时刻安排,每个小时一个。该 log 是回放 message log 后依据延时时刻放置对应的 log 上,这是上面描绘的两层 hash wheel 的 第一层,坐落磁盘上。schedule log 里是包括完好的音讯内容的,由于音讯内容从 message log 同步到了 schedule log,所以历史 message log 都 能够删去 (所以 message log 只需求占用极小的存储空间,所以咱们能够运用低容量高性能的 ssd 来获取极高的吞吐量)。另外,schedule log 是按照延时 时刻安排的,所以延时时刻已过的 schedule log 文件也能够删去。
- 第一层坐落磁盘上,每个小时为一个刻度(默认为一个小时一个刻度,能够依据实践情况在装备里进行调整),每个刻度会生成一个日志文件(schedule log),由于QMQ支撑两年内的延时音讯(默认支撑两年内,能够进行装备修正),则最多会生成 2 * 366 * 24 = 17568 个文件(假如需求支撑的最大延时时刻更短,则生成的文件更少)。
- 第二层在内存中,当音讯的投递时刻即将到来的时分,会将这个小时的音讯索引(索引包括音讯在schedule log中的offset和size)从磁盘文件加载到内存中的hash wheel上,内存中的hash wheel则是以500ms为一个刻度 。
- dispatch log:延时 / 守时音讯投递成功后写入,首要用于在运用重启后能够确认哪些音讯现已投递,dispatch log 里写入的是音讯的 offset,不包括音讯 内容。当延时 server 中途重启时,咱们需求判断出当时这个刻度 (比方一个小时) 里的音讯有哪些现已投递了则不重复投递。
QMQ中心完成
首要完成如下:
大体流程图:
- Delay Server 中包括的几个周期守时使命
- messageLogFlushService:担任 delay server 承受音讯后,将 messagelog 刷盘
- dispatchLogFlushService:delay message 到期发送后,写 offset 到 dispatchlog,其首要担任将 dispatchlog 刷盘
- iterateOffsetFlushService:首要担任回放 messagelog,并办理回放进展,进展保存在 message_log_iterate_checkpoint.json
- WheelTickManager 首要作业
- start timer: 初始化并启动时刻轮
- recover:依据 dispatchlog 和 回放进展恢复时刻轮数据
- load schedulelog:周期加载 schedulelog 数据来填充时刻轮数据
- 监听 messagelog 的回放事情,回放增加 schedulelog 的时分判断 (改延时音讯是否归于当时推迟刻度,eg. 1h 内) 是否需求将其增加到时刻轮中
- HashedWheelTimer 时刻轮完成
时刻轮 和 Dubbo 大体是一致的,都是先将使命增加到 Queuetimeouts 中,然后周期从这个列表中获取 100000 个来增加到 HashedWheel 中对应的 HashedWheelBucket 中。
总结
时刻轮是一种高效的延时行列,能够参阅上述业界完成来运用到实践场景中。
参阅
1.RocketMQ 音讯集成:多类型事务音讯——守时音讯
2.19 TimingWheel:探究Kafka守时器背面的高效时刻轮算法
3.透彻了解Kafka(十)——时刻轮调度
4.延时音讯常见完成方案
5.深入 RocketMQ- 音讯原理篇
6.延时使命一锅端