1 先回忆一下Java原生自带的DelayQueue

  1. 自己完成java.util.concurrent.Delayed这个接口,重写getDelay(核算剩余时刻)办法,重写compareTo(比较哪个使命更早过期,核算优先级)办法。
  2. 调用add()办法增加推迟使命,add()办法里边会调用offer()办法,offer()办法会调用q.offer(),q是PriorityQueue优先级行列,所以数据最终是存放在PriorityQueue优先级行列里边的。offer()办法会先加锁,同一时刻只能有一个线程增加数据。 q.offer()办法内部会调用咱们自己重写的compareTo()办法进行从头排序。
  3. add()办法在调用q.offer()办法增加完结并排序成功之后,add()办法会将排序后的第一个推迟使命拿出来跟咱们其时的推迟使命进行比较,假如是同一个推迟使命阐明最早到期的推迟使命便是咱们刚增加进去的这个推迟使命而且一起也阐明之前行列里边的优先级有变化,此刻需求调用signal()办法告知(唤醒)等候获取推迟使命的线程,推迟使命的优先级发生变化了,能够来取新的推迟使命了。 至此,咱们往推迟行列里边增加推迟使命的工作就现已完全做完了。
  4. 从推迟行列DelayQueue里边取出推迟行列使命,会调用DelayQueue.take()办法,take()办法里边是一个死循环。循环里边会调用q.peek()办法,从优先级行列里边取出排在第一位的推迟使命,假如取不到推迟使命,阐明没有推迟使命,调用await()办法,让线程永久堵塞下去。假如取到推迟使命,调用推迟使命对象的getDelay()办法获取推迟使命的剩余时刻,假如剩余时刻小于等于0,阐明延期现已过期了,调用p.poll()办法,将推迟使命return出去。假如剩余时刻大于0,调用awaitNanos(推迟使命剩余时刻)办法,让线程堵塞,堵塞时刻为推迟使命的剩余时刻。到时刻之后,由于take办法是死循环,代码会从头调用p.peek取出元素并判断剩余时刻是否小于等于0。take()办法相同也会加锁,同一个时刻只能有一个线程从里边取出数据。
  • Java原生自带的DelayQueue的代码示例如下:
importjava.util.concurrent.BlockingQueue;
importjava.util.concurrent.DelayQueue;
importjava.util.concurrent.Delayed;
importjava.util.concurrent.TimeUnit;
publicclassDelayQueueDemo{
staticDelayQueue<Delayed>queue=newDelayQueue();

publicstaticvoidmain(String[]args)throwsInterruptedException{

queue.add(newMyDelay(100,TimeUnit.SECONDS,"第一次增加使命"));
queue.add(newMyDelay(1,TimeUnit.SECONDS,"第2次增加使命"));
queue.add(newMyDelay(5,TimeUnit.SECONDS,"第三次增加使命"));
queue.add(newMyDelay(10000,TimeUnit.MILLISECONDS,"第四次增加使命,只要到了指定的推迟时刻才能调用queue.take()办法,把这个使命取出来"));

while(!queue.isEmpty()){
//queue.take()从推迟行列中取出使命,假如使命指定的推迟时刻还没有到,这儿是取不出来的,线程将一向堵塞
//线程状况将处于java.lang.Thread.State:TIMED_WAITING(parking),会释放CPU,底层调用的是UNSAFE.park办法。
Delayeddelayed=queue.take();
System.out.println("这么快就取出来了??"+delayed);
}
}
}
importjava.util.concurrent.Delayed;
importjava.util.concurrent.TimeUnit;
publicclassMyDelay<T>implementsDelayed{

//推迟时刻,(时刻单位会在核算剩余时刻的办法getDelay里边,由你自己指定,一般来说都会运用毫秒,更准确一点。)
longdelayTime;

//过期时刻,(时刻单位会在核算剩余时刻的办法getDelay里边,由你自己指定,一般来说都会运用毫秒,更准确一点。)
longexpire;

//你自己放进行列里的数据
Tdata;

publicMyDelay(longdelayTime,TimeUnitdelayTimeUnit,Tt){
//将用户传进来的时刻转换为毫秒
this.delayTime=TimeUnit.MILLISECONDS.convert(delayTime,delayTimeUnit);
//过期时刻=其时时刻+推迟时刻(时刻单位会在核算剩余时刻的办法getDelay里边,由你自己指定,一般来说都会运用毫秒,更准确一点。)
//当然你也能够运用其他时刻,随意的
this.expire=System.currentTimeMillis()+this.delayTime;
data=t;
}

/**
*剩余时刻=过期时刻-其时时刻
*
*/
@Override
publiclonggetDelay(TimeUnitunit){
//留意convert这个办法,第一个参数是一个long类型的数值,第二个参数的意思是告知convert第一个long类型的值的单位是毫秒
returnunit.convert(this.expire-System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}

/**
*优先级:俩个使命比较,时刻短的优先履行
*
*/
@Override
publicintcompareTo(Delayedo){
longf=this.getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS);
return(int)f;
}

@Override
publicStringtoString(){
//这个toString()办法不是有必要的,你能够不重写。写不写都无所谓,我这儿为了测试,将数据打印出来了。
return"delayTime="+delayTime+",expire="+expire+",data="+data;
}
}
  • 源码分析参阅这篇文章《DelayQueue推迟行列》

Redisson推迟行列

少废话,先看代码

  • pom.xml
<?xmlversion="1.0"encoding="UTF-8"?>
<projectxmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.redisson</groupId>
<artifactId>demo</artifactId>
<version>1.0-SNAPSHOT</version>
<description>DemoprojectforSpringBoot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<!--lookupparentfromrepository-->
<relativePath/>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.10.7</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
</dependencies>
</project>
  • SpringBoot的application.yml配置文件
server:
port:8080
spring:
redis:
host:192.168.212.132
port:6379
password:123321
application:
name:redissonTest
  • Redisson配置类:RedissonConfig.java
importorg.redisson.Redisson;
importorg.redisson.api.RedissonClient;
importorg.redisson.config.Config;
importorg.redisson.config.SentinelServersConfig;
importorg.redisson.config.SingleServerConfig;
importorg.springframework.beans.factory.annotation.Value;
importorg.springframework.boot.autoconfigure.data.redis.RedisProperties;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
@Configuration
publicclassRedissonConfig{
@Value("${spring.application.name}")
privateStringserverName;
@Bean
publicRedissonClientredisson(RedisPropertiesredisProperties){
Configconfig=newConfig();
SingleServerConfigsingleServerConfig=config.useSingleServer();
singleServerConfig.setAddress("redis://"+redisProperties.getHost()+":"+redisProperties.getPort());
singleServerConfig.setPassword(redisProperties.getPassword());
singleServerConfig.setKeepAlive(true);
singleServerConfig.setDatabase(redisProperties.getDatabase());
singleServerConfig.setClientName(serverName);
returnRedisson.create(config);
}
}
  • 运用Redisson增加使命到推迟行列里边
importorg.redisson.api.RBlockingDeque;
importorg.redisson.api.RDelayedQueue;
importorg.redisson.api.RedissonClient;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.stereotype.Component;
importjava.time.LocalDateTime;
importjava.time.format.DateTimeFormatter;
importjava.util.concurrent.TimeUnit;
@Component
publicclassAddTaskToDelayQueue{
@Autowired
RedissonClientredissonClient;
/**
*增加使命到延时行列里边
*
*@paramorderId订单ID
*/
publicvoidaddTaskToDelayQueue(StringorderId){
//RBlockingDeque的完成类为:newRedissonBlockingDeque
RBlockingDeque<String>blockingDeque=redissonClient.getBlockingDeque("orderQueue");
//RDelayedQueue的完成类为:newRedissonDelayedQueue
RDelayedQueue<String>delayedQueue=redissonClient.getDelayedQueue(blockingDeque);
System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-ddHH:mm:ss"))+"增加使命到延时行列里边");
delayedQueue.offer(orderId+"增加一个使命",3,TimeUnit.SECONDS);
delayedQueue.offer(orderId+"增加二个使命",6,TimeUnit.SECONDS);
delayedQueue.offer(orderId+"增加三个使命",9,TimeUnit.SECONDS);
}
}
  • Controller代码
importcom.redisson.demo.AddTaskToDelayQueue;
importorg.redisson.api.RBlockingDeque;
importorg.redisson.api.RDelayedQueue;
importorg.redisson.api.RedissonClient;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.data.redis.core.RedisTemplate;
importorg.springframework.web.bind.annotation.GetMapping;
importorg.springframework.web.bind.annotation.RequestMapping;
importorg.springframework.web.bind.annotation.RestController;
importjava.time.LocalDateTime;
importjava.time.format.DateTimeFormatter;
importjava.util.concurrent.TimeUnit;
@RestController
@RequestMapping("order")
publicclassTestController{
@Autowired
privateRedissonClientredissonClient;
@Autowired
AddTaskToDelayQueueaddTaskToDelayQueue;
@GetMapping("testRedissonDelayQueueTake")
publicvoidtestRedissonDelayQueueTake(){
RBlockingDeque<String>blockingDeque=redissonClient.getBlockingDeque("orderQueue");
//留意尽管delayedQueue在这个办法里边没有用到,可是这行代码也是必不可少的。
RDelayedQueue<String>delayedQueue=redissonClient.getDelayedQueue(blockingDeque);
while(true){
StringorderId=null;
try{
orderId=blockingDeque.take();
}catch(Exceptione){
System.err.println(e.getStackTrace());
continue;
}
if(orderId==null){
continue;
}
System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-ddHH:mm:ss"))+"延时行列收到:"+orderId);
}
}
@GetMapping("testRedissonDelayQueueOffer")
publicvoidtestRedissonDelayQueueOffer(){
addTaskToDelayQueue.addTaskToDelayQueue("testRedissonDelayQueue");
}
}

解析代码

咱们先看增加数据delayedQueue.offer()办法的源码

  • **offer()**办法是往Redisson的RDelayedQueue推迟行列里边增加推迟使命的,所以先看这个办法。
//RBlockingDeque的完成类为:newRedissonBlockingDeque
RBlockingDeque<String>blockingDeque=redissonClient.getBlockingDeque("orderQueue");
//RDelayedQueue的完成类为:newRedissonDelayedQueue
RDelayedQueue<String>delayedQueue=redissonClient.getDelayedQueue(blockingDeque);
System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-ddHH:mm:ss"))+"增加使命到延时行列里边");
delayedQueue.offer(orderId+"999999999999999111111111111111",90,TimeUnit.SECONDS);
delayedQueue.offer(orderId+"999999999999992222222222",180,TimeUnit.SECONDS);
delayedQueue.offer(orderId+"9999999999999333333333333",280,TimeUnit.SECONDS);

在IDEA里边按住鼠标左键点击redissonClient.getDelayedQueue(blockingDeque)办法,很简略知道RDelayedQueue的完成类为:org.redisson.RedissonDelayedQueue.java。那么offer办法的源码也在org.redisson.RedissonDelayedQueue.java里边,如下截图:

Redisson分布式延时队列 RedissonDelayedQueue

offer办法的源码

Redisson分布式延时队列 RedissonDelayedQueue

offer办法的源码

其实,offer办法的中心原理很简略。源码里边的Lua脚本咱们不必管,咱们只看咱们能看懂的JAVA代码和Redis指令。

  1. 首先,用户传进来的推迟时刻有必要大于0,小于0抛出异常代码完毕。
  2. 将用户传进来的时刻转换为毫秒,并加上系统其时时刻,核算出来的便是过期时刻。到了过期时刻顾客就能够把该使命取出来消费了。
  3. 最为中心的来了,第一条Redis指令:zadd KEYS[2], ARGV[1], value。留意:Lua的数组下标是从1开端的,跟JAVA的不相同,不是从0开端的。所以这条指令翻译过来便是:zadd timeoutSetName timeout value,运用zadd指令将用户的推迟使命存放在Redis的SortedSet数据结构里边,score的值为推迟使命的过期时刻。Redis的SortedSet数据结构是一个主动排序的Set调集,所以运用zadd指令将推迟使命放进Redis的SortedSet数据结构之后,Redis的SortedSet会主动依照score(推迟使命的过期时刻)从小到大主动排序,最早过期的使命排在最上面
  4. 第二条Redis指令:rpush KEYS[3], value。这条指令翻译过来便是:rpush queueName value,运用Redis的rpush指令,将推迟使命存放在Redis的List数据结构里边。留意:这个list里边只存了详细的使命,没有存使命的过期时刻
  5. 第三条Redis指令:zrange KEYS[2], 0, 0。这条指令翻译过来便是:zrange timeoutSetName, 0, 0。从Redis的SortedSet数据结构取出第一条也便是最早过期的推迟使命。
  6. if v[1] == value then的意思是:将排序后的第一个推迟使命拿出来跟咱们其时的推迟使命进行比较,假如是同一个推迟使命阐明最早到期的推迟使命便是咱们刚增加进去的这个推迟使命而且一起也阐明之前行列里边的优先级有变化,此刻运用Redis的发布订阅指令:publish KEYS[4], ARGV[1] => publish channelName timeout,将推迟使命的过期时刻timeout发布出去,告知订阅者,推迟使命的优先级发生变化了,能够来取新的推迟使命了。 至此,咱们往推迟行列里边增加推迟使命的工作就现已完全做完了。

其实,Redisson的这个流程跟Java原生自带的DelayQueue的offer办法流程是相同的。

接下来看取数据blockingDeque.take()办法的源码

  • **take()**办法是从Redisson的RDelayedQueue推迟行列里边取出推迟使命的。
RBlockingDeque<String>blockingDeque=redissonClient.getBlockingDeque("orderQueue");
//留意尽管delayedQueue在这个办法里边没有用到,可是这行代码也是必不可少的。
RDelayedQueue<String>delayedQueue=redissonClient.getDelayedQueue(blockingDeque);
StringorderId=blockingDeque.take();

take()这个办法的中心原理十分简略,中心代码便是运用Redis的BLPOP指令,从Redis的List数据结构里边取数据,取不到就堵塞,一向比及有数据进来。源码方位:org.redisson.RedissonBlockingDeque,源码如下截图:

Redisson分布式延时队列 RedissonDelayedQueue

org.redisson.RedissonBlockingDeque源码

Redisson分布式延时队列 RedissonDelayedQueue

org.redisson.RedissonBlockingQueue源码

Redisson分布式延时队列 RedissonDelayedQueue

org.redisson.client.protocol.RedisCommands源码

所以blockingDeque.take()办法的中心代码便是一条Redis指令:blpop 行列名字 0,行列名字便是你在 redissonClient.getBlockingDeque(“orderQueue”)这个办法里边传入的名字:orderQueue。发送给Redis履行的指令便是这个:blpop orderQueue 0,0代表不限时,一向堵塞下去。Redisson会把一切现已过期的使命,都存放在这个List里边,所以只要这个List里边有数据,就代表这个数据现已过期了,顾客能够消费了。Redisson往这个List里边放的时候运用的rpush指令,rpush指令的意思是往List的右边放。比如有A和B俩个使命,先放A,再放B,List里边的数据为:AB。可是留意先放进去的肯定是最早过期的,所以咱们消费的时候要先消费A,再消费B。 而blpop这个指令便是从List的左面开端消费的。遥相呼应,十分完美。

那过期的数据是怎么放进Redis的List数据结构里边的呢?

  • 还记得下面这段代码吗?
RBlockingDeque<String>blockingDeque=redissonClient.getBlockingDeque("orderQueue");
//留意尽管delayedQueue在这个办法里边没有用到,可是这行代码也是必不可少的。
RDelayedQueue<String>delayedQueue=redissonClient.getDelayedQueue(blockingDeque);

留意:上面这俩行代码,无论你是在调用offer办法存放数据还是调用take()办法取出数据,这俩行代码都是必不可少的。

过期的数据便是在这俩行代码里边放进去的,中心代码在这儿:org.redisson.Redisson

redissonClient.getDelayedQueue(blockingDeque);

源码如下截图:

Redisson分布式延时队列 RedissonDelayedQueue

redissonClient.getDelayedQueue(blockingDeque)办法源码

redissonClient.getDelayedQueue(blockingDeque)办法调用了RedissonDelayedQueue类的结构办法,过期的数据便是在这个结构办法里边放进去的。

Redisson分布式延时队列 RedissonDelayedQueue

org.redisson.RedissonDelayedQueue结构办法的源码

  1. 咱们先看上图里边pushTaskAsync办法里边的Redis指令。
  2. 第一条Redis指令:zrangebyscore KEYS[2] 0 ARGV[1] limit 0 ARGV[2],翻译过来为:zrangebyscore timeoutSetName 0 System.currentTimeMillis() limit 0 100。意思便是从timeoutSetName这个SortedSet数据结构里边取出score的值在0到系统其时时刻的一切推迟使命。别忘了,咱们之前调用offer办法存数据的时候,推迟使命便是存储在这儿的,score的值咱们其时存储的是推迟使命的过期时刻。所以,假如score的值小于系统其时时刻,阐明这个推迟使命现已过期了,能够让顾客取出来了。limit 0 100的意思是,一次最多只取100条数据。
  3. if #expiredValues > 0 then这行代码的意思是,假如能取出来数据,就运用for循环处理这些现已过期的数据。
  4. 在for循环里边履行了俩条Redis指令。第一条为:rpush KEYS[1], value翻译过来为:rpush getName() value => rpush orderQueue 用户数据,意思便是将取出来的数据继续放进Redis的List数据结构里边,value便是用户其时存进来的详细数据。这个List里边存放的全是现已过期的数据,take办法便是从这儿获取数据的。过期的数据便是在这儿放进去的,这个是最中心的代码了。 第二条Redis指令为:lrem KEYS[3] 1 v => lrem queueName 1 v 从queueName里边删除一个数据。由于之前offer办法往这儿边存数据了,所以过期之后这儿要再把它删除去。
  5. 循环完毕之后,又履行了一条指令:zrem KEYS[2] unpack(expiredValues) => zrem timeoutSetName 一切取出来的数据,意思便是把一切的过期数据,从SortedSet这个数据结构中删除去。
  6. 最终再履行zrange KEYS[2] 0 0 WITHSCORES=>zrange timeoutSetName 0 0 withscores意思便是从Redis的SortedSet数据结构取出第一条也便是最早过期的推迟使命。
  7. if v[1] ~= nil then return v[2]; 这行代码的意思是,假如上面的zrange能取出来数据,就把取出来的数据的score值(使命的过期时刻)返回出去。

那过期的数据是在什么时候放进Redis的List数据结构里边的呢?

  • 在RedissonDelayedQueue的结构办法会触发使命的调度,在这个使命里边会动态的触发守时使命的履行,这些守时使命会在使命过期时调用pushTaskAsync()办法,履行上面的Redis指令,将过期数据放入方针推迟行列供顾客消费。

Redisson分布式延时队列 RedissonDelayedQueue

调度使命

Redisson分布式延时队列 RedissonDelayedQueue

调度使命开端履行

Redisson分布式延时队列 RedissonDelayedQueue

履行逻辑

留意上图中RTopic schedulerTopic = getTopic();这行代码,调用的便是RedissonDelayedQueue结构办法里边定义的getTopic()办法。

Redisson分布式延时队列 RedissonDelayedQueue

getTopic()办法

Redisson分布式延时队列 RedissonDelayedQueue

pushTask()办法

Redisson分布式延时队列 RedissonDelayedQueue

scheduleTask()办法

总结如下:

  1. 在RedissonDelayedQueue类的结构办法里边会直接调用QueueTransferTask这个类的start()办法。
  2. 在QueueTransferTask这个类的start()办法里边会给RTopic schedulerTopic = getTopic()其实便是RedissonTopic这个类注册俩个Listener监听事情:1.onSubscribe(订阅监听)2.onMessage(音讯监听)
  3. schedulerTopic.addListener()这个办法在增加订阅监听的一起也会主动去订阅一下,订阅完直接就触发onSubscribe(订阅监听)监听事情了。触发onSubscribe(订阅监听)事情之后,onSubscribe办法会调用pushTask()办法,pushTask()办法会调用pushTaskAsync()去履行Redis指令,过期数据就会放到方针行列里边了。
  4. 假如pushTaskAsync()办法有返回值(使命过期时刻),会调用scheduleTask(res)办法,scheduleTask(res)办法会发动一个守时使命,守时使命到时刻会履行pushTask()办法。

最终一步,验证一下究竟是不是schedulerTopic.addListener()这个办法会不会主动触发监听Subscribe这个指令呢?

Redisson分布式延时队列 RedissonDelayedQueue

new RedissonTopic

Redisson分布式延时队列 RedissonDelayedQueue

addListenert增加监听事情

接下来咱们,连接上Redis,运用呢monitor指令,监控指令的履行。monitor指令能够监控redis履行了哪些指令,生产环境禁止运用这个指令

Redisson分布式延时队列 RedissonDelayedQueue

连接Redis履行指令监听

然后创立一个Controller办法,发动项目并拜访接口,主要是为了触发一下咱们代码的履行,Controller的代码如下


importorg.redisson.Redisson;
importorg.redisson.RedissonTopic;
importorg.redisson.api.RBlockingDeque;
importorg.redisson.api.RDelayedQueue;
importorg.redisson.api.RedissonClient;
importorg.redisson.api.listener.BaseStatusListener;
importorg.redisson.api.listener.MessageListener;
importorg.redisson.client.codec.LongCodec;
importorg.redisson.command.CommandExecutor;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.data.redis.core.RedisTemplate;
importorg.springframework.web.bind.annotation.GetMapping;
importorg.springframework.web.bind.annotation.RequestMapping;
importorg.springframework.web.bind.annotation.RestController;
importjava.time.LocalDateTime;
importjava.time.format.DateTimeFormatter;
importjava.util.concurrent.TimeUnit;
@RestController
@RequestMapping("order")
publicclassTestController{
@Autowired
privateRedissonClientredissonClient;
@GetMapping("testRedissonDelayQueueTake")
publicvoidtestRedissonDelayQueueTake(){
RedissonTopictestTopic=newRedissonTopic(LongCodec.INSTANCE,((Redisson)redissonClient).getCommandExecutor(),"delayQueueChannelTest");
intstatusListenerId=testTopic.addListener(newBaseStatusListener(){
@Override
publicvoidonSubscribe(Stringchannel){
//搞一个空办法就行
//pushTask();
}
});
}
}

拜访这个Controller接口,http://localhost:8080/order/testRedissonDelayQueueTake。 然后去Redis服务器上面看是否监控到SUBSCRIBE订阅指令。

Redisson分布式延时队列 RedissonDelayedQueue

SUBSCRIBE指令

能够看到,拜访接口之后Redisson向Redis发出了SUBSCRIBE订阅的指令。所以,其实真实触发SUBSCRIBE订阅指令的是这行代码testTopic.addListener()。

给我们画个流程图吧:

Redisson分布式延时队列 RedissonDelayedQueue

结构办法触发守时使命流程图

Redisson分布式延时队列 RedissonDelayedQueue

offer办法触发守时使命流程图

不知道我们能不能看懂,横竖我是现已十分清楚了,我自己看源码看了俩天。我的文章只是提供一些线索,你们依据这一点点小提示、小线索,自己去看源码,很快就明白了。

Redisson分布式延时队列 RedissonDelayedQueue