作者:京东零售张路瑶
本文正在参加「金石方案」
1.运用场景
现在体系中有许多需求用到延时处理的功用:支付超时撤销、排队超时、短信、微信等提示推迟发送、token改写、会员卡过期等等。经过延时处理,极大的节省体系的资源,不用轮询数据库处理使命。
现在大部分功用经过守时使命完结,守时使命还分运用quartz及xxljob两种类型轮询时刻短,每秒履行一次,对数据库造成必定的压力,而且会有1秒的差错。轮询时刻久,如30分钟一次,03:01刺进一条数据,正常3:31履行过期,可是3:30履行轮询时,扫描3:00-3:30的数据,是扫描不到3:31的数据的,需求4:00的时分才干扫描到,相当于多推迟了29分钟!
2.延时处理方法调研
1.DelayQueue
1.完结方法:
jvm供给的推迟堵塞行列,经过优先级行列对不同推迟时刻使命进行排序,经过condition进行堵塞、睡觉dealy时刻 获取推迟使命。
当有新使命参加时,会判别新使命是否是第一个待履行的使命,若是,会免除行列睡觉,防止新参加的元素时需求履行的元素而不能正常被履行线程获取到。
2.存在的问题:
1.单机运转,体系宕机后,无法进行有用的重试
2.没有履行记载和备份
3.没有重试机制
4.体系重启时,会将使命清空!
5.不能分片消费
3.优势:完结简略,无使命时堵塞,节省资源,履行时刻精确
2.推迟行列mq
完结方法:依托mq,经过设置推迟消费时刻,到达推迟消费功用。像rabbitMq、jmq都能够设置推迟消费时刻。RabbitMq经过将音讯设置过期时刻,放入死信行列进行消费完结。
存在的问题:
1.时刻设置不灵敏,每个queue是固定的到期时刻,每次新创建延时行列,需求创建新的音讯行列
长处:依托jmq,能够有用的监控、消费记载、重试,具有多机同时消费才能,不惧怕宕机
3.守时使命
经过守时使命轮询符合条件的数据
缺陷:
1.必须要读事务数据库,对数据库造成必定的压力,
2.存在延时
3.一次扫描数据量过大时,占用过多的体系资源。
4. 无法分片消费
长处:
1.消费失利后,下次还能持续消费,具有重试才能,
2.消费才能安稳
4.redis
使命存储在redis中,运用redis的 zset行列依据score进行排序,程序经过线程不断获取行列数据消费,完结延时行列
长处:
1、查询redis相比较数据库快,set行列长度过大,会依据跳表结构进行查询,效率高
2、redis可依据时刻戳进行排序,只需求查询当时时刻戳内的分数的使命即可
3、无惧机器重启
4、分布式消费
缺陷:
1.受限于redis功能,并发10W
2.多个命令无法确保原子性,运用lua脚本会要求一切数据都在一个redis分片上。
5. 时刻轮
经过时刻轮完结的推迟使命履行,也是依据jvm单机运转,如kafka、netty都有完结时刻轮,redisson的看门狗也是经过netty的时刻轮完结的。
缺陷:不适合分布式服务的运用,宕机后,会丢掉使命。
3.完结目标
兼容现在在运用的异步事情组件,并供给更牢靠,可重试、有记载、可监控报警、高功能的推迟组件。
•音讯传输牢靠性:音讯进入到推迟行列后,确保至少被消费一次。
•Client支撑丰厚:支撑多重语言。
•高可用性:支撑多实例布置。挂掉一个实例后,还有后备实例持续供给服务。
•实时性:允许存在必定的时刻差错。
•支撑音讯删去:事务运用方,能够随时删去指定音讯。
•支撑消费查询
•支撑手动重试
•对当时异步事情的履行增加监控
4.架构规划
5.推迟组件完结方法
1.完结原理
现在挑选运用jimdb经过zset完结延时功用,将使命id和对应的履行时刻作为score存在在zset行列中,默认会按照score排序,每次取0-当时时刻内的score的使命id,
发送推迟使命时,会依据时刻戳+机器ip+queueName+sequence 生成唯一的id,结构音讯体,加密后放入zset行列中。
经过搬运线程,将到达履行时刻的使命移动到发布行列中,等待顾客获取。
监控方经过集成ump
消费记载经过redis备份+数据库持久化完结。
经过缓存完结的方法,仅仅完结的一种,能够经过参数操控运用哪一种完结方法,并可经过spi自在扩展。
2.音讯结构
每个Job必须包括一下几个属性:
•Topic:Job类型,即QueueName
•Id:Job的唯一标识。用来检索和删去指定的Job信息。
•Delay:Job需求推迟的时刻。单位:秒。(服务端会将其转换为绝对时刻)
•Body:Job的内容,供顾客做详细的事务处理,以json格式存储。
•traceId:发送线程的traceId,待后续pfinder支撑设置traceId后,可与发送线程共用同一个traceiD,便于日志追踪
详细结构如下图表示:
TTR的规划意图是为了确保音讯传输的牢靠性。
3.数据流转及流程图
依据redis-disruptor方法进行发布、消费,能够作为音讯来进行运用,顾客选用原有异步事情的disruptor无锁行列消费,不同运用、不同queue之间无锁
1.支撑运用只发布,不消费,到达音讯行列的功用。
2:支撑分桶,针对大key问题,若事情多,能够设置推迟行列和使命行列桶的数量,减小因大key造成的redis堵塞问题。
3: 经过ducc装备,进行功能的扩展,现在只支撑开启消费和关闭消费。
4: 支撑设置超时时刻装备,防止消费线程履行过久
瓶颈: 消费速度慢,生产速度过快,会导致ringbuffer行列占满,当时运用既是生产者也是顾客时,生产者会休眠,功能取决于消费速度,可经过水平扩展机器,直接提升功能。监控redis行列的长度,若不断增加,可考虑增加顾客,直接提高功能。
可能出现的状况: 因一个运用共用一个disruptor,拥有64个顾客线程,如果某一个事情消费过慢,导致64个线程都在消费这个事情,会导致其他事情无消费线程消费,生产者线程也被堵塞,导致一切事情的消费都被堵塞。
后期调查是否有这个功能瓶颈,可给每一个queue一个顾客线程池。
6.demo示例
增加装备文件
判别是否开启jd.event.enable:true
<dependency> <groupId>com.jd.car</groupId>
<artifactId>senna-event</artifactId>
<version>1.0-SNAPSHOT</version> </dependency>
装备
jd:
senna:
event:
enable: true
queue:
retryEventQueue:
bucketNum: 1
handleBean: retryHandle
消费代码:
package com.jd.car.senna.admin.event;
import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@Component("retryHandle")
public class RetryQueueEvent extends EventHandler {
@Override
protected void onHandle(String key, String eventType) {
log.info("Handler开端消费:{}", key);
}
@Override
protected void onDelayHandle(String key, String eventType) {
log.info("delayHandler开端消费:{}", key);
}
}
注解形式:
package com.jd.car.senna.admin.event;
import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;
/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@SennaEvent(queueName = "testQueue", bucketNum = 5,delayBucketNum = 5,delayEnable = true)
public class TestQueueEvent extends EventHandler {
@Override
protected void onHandle(String key, String eventType) {
log.info("Handler开端消费:{}", key);
}
@Override
protected void onDelayHandle(String key, String eventType) {
log.info("delayHandler开端消费:{}", key);
}
}
发送代码
package com.jd.car.senna.admin.controller;
import com.jd.car.senna.event.queue.IEventQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;
/**
* @author zly
*/
@RestController
@Slf4j
public class DemoController {
@Lazy
@Resource(name = "testQueue")
private IEventQueue eventQueue;
@ResponseBody
@GetMapping("/api/v1/demo")
public String demo() {
log.info("发送无推迟音讯");
eventQueue.push("no delay 5000 millseconds message 3");
return "ok";
}
@ResponseBody
@GetMapping("/api/v1/demo1")
public String demo1() {
log.info("发送推迟5秒音讯");
eventQueue.push(" delay 5000 millseconds message,name",1000*5L);
return "ok";
}
@ResponseBody
@GetMapping("/api/v1/demo2")
public String demo2() {
log.info("发送推迟到2022-04-02 00:00:00履行的音讯");
eventQueue.push(" delay message,name to 2022-04-02 00:00:00", new Date(1648828800000));
return "ok";
}
}
参考有赞规划:tech.youzan.com/queuing_del…
7.现在运用:
1.云修到店排队24小时后主动撤销
2..美团恳求token守时改写。
3.质保卡延期24小时生成
5. 结算单延期生成
6.短信推迟发送