作者:京东零售张路瑶

本文正在参加「金石方案」

1.运用场景

现在体系中有许多需求用到延时处理的功用:支付超时撤销、排队超时、短信、微信等提示推迟发送、token改写、会员卡过期等等。经过延时处理,极大的节省体系的资源,不用轮询数据库处理使命。

现在大部分功用经过守时使命完结,守时使命还分运用quartz及xxljob两种类型轮询时刻短,每秒履行一次,对数据库造成必定的压力,而且会有1秒的差错。轮询时刻久,如30分钟一次,03:01刺进一条数据,正常3:31履行过期,可是3:30履行轮询时,扫描3:00-3:30的数据,是扫描不到3:31的数据的,需求4:00的时分才干扫描到,相当于多推迟了29分钟!

2.延时处理方法调研

1.DelayQueue

1.完结方法:

jvm供给的推迟堵塞行列,经过优先级行列对不同推迟时刻使命进行排序,经过condition进行堵塞、睡觉dealy时刻 获取推迟使命。

当有新使命参加时,会判别新使命是否是第一个待履行的使命,若是,会免除行列睡觉,防止新参加的元素时需求履行的元素而不能正常被履行线程获取到。

2.存在的问题:

1.单机运转,体系宕机后,无法进行有用的重试

2.没有履行记载和备份

3.没有重试机制

4.体系重启时,会将使命清空!

5.不能分片消费

3.优势:完结简略,无使命时堵塞,节省资源,履行时刻精确

2.推迟行列mq

完结方法:依托mq,经过设置推迟消费时刻,到达推迟消费功用。像rabbitMq、jmq都能够设置推迟消费时刻。RabbitMq经过将音讯设置过期时刻,放入死信行列进行消费完结。

存在的问题:

1.时刻设置不灵敏,每个queue是固定的到期时刻,每次新创建延时行列,需求创建新的音讯行列

长处:依托jmq,能够有用的监控、消费记载、重试,具有多机同时消费才能,不惧怕宕机

3.守时使命

经过守时使命轮询符合条件的数据

缺陷:

1.必须要读事务数据库,对数据库造成必定的压力,

2.存在延时

3.一次扫描数据量过大时,占用过多的体系资源。

4. 无法分片消费

长处:

1.消费失利后,下次还能持续消费,具有重试才能,

2.消费才能安稳

4.redis

使命存储在redis中,运用redis的 zset行列依据score进行排序,程序经过线程不断获取行列数据消费,完结延时行列

长处:

1、查询redis相比较数据库快,set行列长度过大,会依据跳表结构进行查询,效率高

2、redis可依据时刻戳进行排序,只需求查询当时时刻戳内的分数的使命即可

3、无惧机器重启

4、分布式消费

缺陷:

1.受限于redis功能,并发10W

2.多个命令无法确保原子性,运用lua脚本会要求一切数据都在一个redis分片上。

5. 时刻轮

经过时刻轮完结的推迟使命履行,也是依据jvm单机运转,如kafka、netty都有完结时刻轮,redisson的看门狗也是经过netty的时刻轮完结的。

缺陷:不适合分布式服务的运用,宕机后,会丢掉使命。

一种异步延迟队列的实现方式

3.完结目标

兼容现在在运用的异步事情组件,并供给更牢靠,可重试、有记载、可监控报警、高功能的推迟组件。

•音讯传输牢靠性:音讯进入到推迟行列后,确保至少被消费一次。

•Client支撑丰厚:支撑多重语言。

•高可用性:支撑多实例布置。挂掉一个实例后,还有后备实例持续供给服务。

•实时性:允许存在必定的时刻差错。

•支撑音讯删去:事务运用方,能够随时删去指定音讯。

•支撑消费查询

•支撑手动重试

•对当时异步事情的履行增加监控

4.架构规划

一种异步延迟队列的实现方式

5.推迟组件完结方法

1.完结原理

现在挑选运用jimdb经过zset完结延时功用,将使命id和对应的履行时刻作为score存在在zset行列中,默认会按照score排序,每次取0-当时时刻内的score的使命id,

发送推迟使命时,会依据时刻戳+机器ip+queueName+sequence 生成唯一的id,结构音讯体,加密后放入zset行列中。

经过搬运线程,将到达履行时刻的使命移动到发布行列中,等待顾客获取。

监控方经过集成ump

消费记载经过redis备份+数据库持久化完结。

经过缓存完结的方法,仅仅完结的一种,能够经过参数操控运用哪一种完结方法,并可经过spi自在扩展。

2.音讯结构

每个Job必须包括一下几个属性:

•Topic:Job类型,即QueueName

•Id:Job的唯一标识。用来检索和删去指定的Job信息。

•Delay:Job需求推迟的时刻。单位:秒。(服务端会将其转换为绝对时刻)

•Body:Job的内容,供顾客做详细的事务处理,以json格式存储。

•traceId:发送线程的traceId,待后续pfinder支撑设置traceId后,可与发送线程共用同一个traceiD,便于日志追踪

详细结构如下图表示:

一种异步延迟队列的实现方式

TTR的规划意图是为了确保音讯传输的牢靠性。

3.数据流转及流程图

一种异步延迟队列的实现方式

依据redis-disruptor方法进行发布、消费,能够作为音讯来进行运用,顾客选用原有异步事情的disruptor无锁行列消费,不同运用、不同queue之间无锁

1.支撑运用只发布,不消费,到达音讯行列的功用。

2:支撑分桶,针对大key问题,若事情多,能够设置推迟行列和使命行列桶的数量,减小因大key造成的redis堵塞问题。

3: 经过ducc装备,进行功能的扩展,现在只支撑开启消费和关闭消费。

4: 支撑设置超时时刻装备,防止消费线程履行过久

瓶颈: 消费速度慢,生产速度过快,会导致ringbuffer行列占满,当时运用既是生产者也是顾客时,生产者会休眠,功能取决于消费速度,可经过水平扩展机器,直接提升功能。监控redis行列的长度,若不断增加,可考虑增加顾客,直接提高功能。

可能出现的状况: 因一个运用共用一个disruptor,拥有64个顾客线程,如果某一个事情消费过慢,导致64个线程都在消费这个事情,会导致其他事情无消费线程消费,生产者线程也被堵塞,导致一切事情的消费都被堵塞。

后期调查是否有这个功能瓶颈,可给每一个queue一个顾客线程池。

6.demo示例

增加装备文件

判别是否开启jd.event.enable:true

<dependency> <groupId>com.jd.car</groupId>
 <artifactId>senna-event</artifactId>
 <version>1.0-SNAPSHOT</version> </dependency>

装备

jd:
senna:
event:
enable: true
queue:
retryEventQueue:
bucketNum: 1
handleBean: retryHandle

消费代码:

package com.jd.car.senna.admin.event;
import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@Component("retryHandle")
public class RetryQueueEvent extends EventHandler {
@Override
protected void onHandle(String key, String eventType) {
log.info("Handler开端消费:{}", key);
}
@Override
protected void onDelayHandle(String key, String eventType) {
log.info("delayHandler开端消费:{}", key);
}
}

注解形式:

package com.jd.car.senna.admin.event;
import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;
/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@SennaEvent(queueName = "testQueue", bucketNum = 5,delayBucketNum = 5,delayEnable = true)
public class TestQueueEvent extends EventHandler {
@Override
protected void onHandle(String key, String eventType) {
log.info("Handler开端消费:{}", key);
}
@Override
protected void onDelayHandle(String key, String eventType) {
log.info("delayHandler开端消费:{}", key);
}
}

发送代码


package com.jd.car.senna.admin.controller;
import com.jd.car.senna.event.queue.IEventQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;
/**
* @author zly
*/
@RestController
@Slf4j
public class DemoController {
@Lazy
@Resource(name = "testQueue")
private IEventQueue eventQueue;
@ResponseBody
@GetMapping("/api/v1/demo")
public String demo() {
log.info("发送无推迟音讯");
eventQueue.push("no delay 5000 millseconds message 3");
return "ok";
}
@ResponseBody
@GetMapping("/api/v1/demo1")
public String demo1() {
log.info("发送推迟5秒音讯");
eventQueue.push(" delay 5000 millseconds message,name",1000*5L);
return "ok";
}
@ResponseBody
@GetMapping("/api/v1/demo2")
public String demo2() {
log.info("发送推迟到2022-04-02 00:00:00履行的音讯");
eventQueue.push(" delay message,name to 2022-04-02 00:00:00", new Date(1648828800000));
return "ok";
} 
}

参考有赞规划:tech.youzan.com/queuing_del…

7.现在运用:

1.云修到店排队24小时后主动撤销

2..美团恳求token守时改写。

3.质保卡延期24小时生成

5. 结算单延期生成

6.短信推迟发送