本文已参与「新人创造礼」活动,一同敞开创造之路。
场景描绘
①需求完成一个守时发布系统通告的功能,怎么完成? ②付出超时,订单主动撤销,怎么完成?
完成办法
一、挂起线程
引荐指数:★★☆ 长处: JDK原生(JUC包下)支撑,无需引进新的依靠; 缺陷: (1)根据内存,使用重启(或宕机)会导致使命丢掉 (2)根据内存挂起线程完成延时,不支撑集群 (3)代码耦合性大,不易维护 (4)一个使命就要新建一个线程绑定使命的履行,简单形成资源浪费
①装备推迟使命专用线程池
/**
* 线程池装备
*/
@Configuration
@EnableAsync
@EnableConfigurationProperties(ThreadPoolProperties.class)
public class ThreadPoolConfig {
//ThreadPoolProperties的装备根据需求和服务器装备自行装备
@Resource
private ThreadPoolProperties threadPoolProperties;
//推迟使命行列容量
private final static int DELAY_TASK_QUEUE_CAPACITY = 100;
@Bean
public ThreadPoolTaskExecutor delayTaskExecutor() {
log.info("start delayTaskExecutor");
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
//装备核心线程数
threadPool.setCorePoolSize(threadPoolProperties.getCorePoolSize());
//装备最大线程数
threadPool.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
//装备行列大小
threadPool.setQueueCapacity(DELAY_TASK_QUEUE_CAPACITY);
//线程最大存活时刻
threadPool.setKeepAliveSeconds (threadPoolProperties.getKeepAliveSeconds());
//装备线程池中的线程的称号前缀
threadPool.setThreadNamePrefix(threadPoolProperties.getThreadNamePrefix());
// rejection-policy:当pool已经达到max size的时候履行的策略
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
//履行初始化
threadPool.initialize();
return threadPool;
}
}
②创立延时使命
在需求履行的代码块创立延时使命
delayTaskExecutor.execute(() -> {
try {
//线程挂起指守时刻
TimeUnit.MINUTES.sleep(time);
//履行事务逻辑
doSomething();
} catch (InterruptedException e) {
log.error("线程被打断,履行事务逻辑失败");
}
});
二、ScheduledExecutorService 推迟使命线程池
引荐指数:★★★ 长处: 代码简练,JDK原生支撑 缺陷: (1)根据内存,使用重启(或宕机)会导致使命丢掉 (2)根据内存寄存使命,不支撑集群 (3)一个使命就要新建一个线程绑定使命的履行,简单形成资源浪费
class Task implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getId()+":"+Thread.currentThread().getName());
System.out.println("scheduledExecutorService====>>>延时器");
}
}
public class ScheduleServiceTest {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService=new ScheduledThreadPoolExecutor(10);
scheduledExecutorService.schedule(new Task(),1, TimeUnit.SECONDS);
scheduledExecutorService.schedule(new Task(),2, TimeUnit.SECONDS);
scheduledExecutorService.schedule(new Task(),1, TimeUnit.SECONDS);
}
}
三、DelayQueue(延时行列)
引荐指数:★★★☆ 长处: (1)JDK原生(JUC包下)支撑,无需引进新的依靠; (2)可以用一个线程对整个延时行列按序履行; 缺陷: (1)根据内存,使用重启(或宕机)会导致使命丢掉 (2)根据内存寄存行列,不支撑集群 (3)根据compareTo办法摆放行列,调用take堵塞式的取出第一个使命(不调用则不取出),比较不灵敏,会影响时刻的准确性
①新建一个延时使命
public class DelayTask implements Delayed {
private Integer taskId;
private long executeTime;
DelayTask(Integer taskId, long executeTime) {
this.taskId = taskId;
this.executeTime = executeTime;
}
/**
* 该使命的延时时长
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return executeTime - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
DelayTask t = (DelayTask) o;
if (this.executeTime - t.executeTime <= 0) {
return -1;
} else {
return 1;
}
}
@Override
public String toString() {
return "延时使命{" +
"使命编号=" + taskId +
", 履行时刻=" + new Date(executeTime) +
'}';
}
/**
* 履行详细事务代码
*/
public void doTask(){
System.out.println(this+":");
System.out.println("线程ID-"+Thread.currentThread().getId()+":线程称号-"+Thread.currentThread().getName()+":do something!");
}
}
②履行延时使命
public class TestDelay {
public static void main(String[] args) throws InterruptedException {
// 新建3个使命,并依次设置超时时刻为 30s 10s 60s
DelayTask d1 = new DelayTask(1, System.currentTimeMillis() + 3000L);
DelayTask d2 = new DelayTask(2, System.currentTimeMillis() + 1000L);
DelayTask d3 = new DelayTask(3, System.currentTimeMillis() + 6000L);
DelayQueue<DelayTask> queue = new DelayQueue<>();
queue.add(d1);
queue.add(d2);
queue.add(d3);
System.out.println("敞开延时行列时刻:" + new Date()+"\n");
// 从延时行列中获取元素
while (!queue.isEmpty()) {
queue.take().doTask();
}
System.out.println("\n使命结束");
}
}
履行成果:
四、Redis-为key指定超时时长,并监听失效key
引荐指数:★★★☆ 长处: 关于有依靠redis的事务且有延时使命的需求,能够快速对接 缺陷: (1)客户端断开后重连会导致一切事情丢掉 (2)高并发场景下,存在很多的失效key场景会导出失效时刻存在推迟 (3)若有多个监听器监听该key,是会重复消费这个过期事情的,需求特定逻辑判别
① 修正Redis装备文件并重启Redis
notify-keyspace-events Ex
留意: redis装备文件不能有空格,否则会发动报错
②Java中关于Redis的装备类
redisTemplate实例bean需求自定义生成; RedisMessageListenerContainer 是redis-key过期监听需求的监听器容器;
@Configuration
@Slf4j
public class RedisConfiguration {
/**
* Redis装备
* @param factory
* @return
*/
@Bean(name = "redisTemplate")
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
template.setConnectionFactory(factory);
//key序列化办法
template.setKeySerializer(redisSerializer);
//value序列化
template.setValueSerializer(redisSerializer);
//value hashmap序列化
template.setHashValueSerializer(redisSerializer);
//key hashmap序列化
template.setHashKeySerializer(redisSerializer);
return template;
}
/**
* 音讯监听器容器bean
* @param connectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer container(LettuceConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
③监听器代码
@Slf4j
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
private static final String TEST_REDIS_KEY = "testExpired";
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer,
RedisTemplate redisTemplate) {
super(listenerContainer);
/**
* 设置一个Redis推迟过期key(key名:testExpired,过期时刻:30秒)
*/
redisTemplate.opsForValue().set(TEST_REDIS_KEY, "1", 20, TimeUnit.SECONDS);
log.info("设置redis-key");
}
@Override
public void onMessage(Message message, byte[] pattern) {
try {
String expiredKey = message.toString();
if (TEST_REDIS_KEY.equals(expiredKey)) {
//事务处理
log.info(expiredKey + "过期,触发回调");
}
} catch (Exception e) {
log.error("key 过期通知处理异常,{}", e);
}
}
}
测试成果:
五、时刻轮
引荐指数:★★★★ 长处: (1)关于很多守时使命,时刻轮可以仅用一个工作线程对编排的使命进行顺序运转; (2)主动运转,可以自定义时刻轮每轮的tick数,tick间隔,灵敏且时刻精度可控 缺陷: (1)根据内存,使用重启(或宕机)会导致使命丢掉 (2)根据内存寄存使命,不支撑集群
public class WheelTimerTest {
public static void main(String[] args) {
//设置每个格子是 100ms, 一共 256 个格子
HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 256);
//参加三个使命,依次设置超时时刻是 10s 5s 20s
System.out.println("参加一个使命,ID = 1, time= " + LocalDateTime.now());
hashedWheelTimer.newTimeout(timeout -> {
System.out.println(Thread.currentThread().getName());
System.out.println("履行一个使命,ID = 1, time= " + LocalDateTime.now());
}, 10, TimeUnit.SECONDS);
System.out.println("参加一个使命,ID = 2, time= " + LocalDateTime.now());
hashedWheelTimer.newTimeout(timeout -> {
System.out.println(Thread.currentThread().getName());
System.out.println("履行一个使命,ID = 2, time= " + LocalDateTime.now());
}, 5, TimeUnit.SECONDS);
System.out.println("参加一个使命,ID = 3, time= " + LocalDateTime.now());
hashedWheelTimer.newTimeout(timeout -> {
System.out.println(Thread.currentThread().getName());
System.out.println("履行一个使命,ID = 3, time= " + LocalDateTime.now());
}, 20, TimeUnit.SECONDS);
System.out.println("参加一个使命,ID = 4, time= " + LocalDateTime.now());
hashedWheelTimer.newTimeout(timeout -> {
System.out.println(Thread.currentThread().getName());
System.out.println("履行一个使命,ID = 4, time= " + LocalDateTime.now());
}, 20, TimeUnit.SECONDS);
System.out.println("等候使命履行===========");
}
}
六、音讯行列-推迟行列
针对使命丢掉的价值过大,高并发的场景 引荐指数:★★★★ 长处: 支撑集群,分布式,高并发场景; 缺陷: 引进额外的音讯行列,增加项目的部署和维护的复杂度。
场景:为一个托付指定期限,托付到期后,托付联系停止,相关事务权限移交回原拥有者 这儿选用的是RabbitMq的死信行列加TTL音讯转化为推迟行列的办法(RabbitMq没有延时行列)
①声明一个行列设定其的死信行列
@Configuration
public class MqConfig {
public static final String GLOBAL_RABBIT_TEMPLATE = "rabbitTemplateGlobal";
public static final String DLX_EXCHANGE_NAME = "dlxExchange";
public static final String AUTH_EXCHANGE_NAME = "authExchange";
public static final String DLX_QUEUE_NAME = "dlxQueue";
public static final String AUTH_QUEUE_NAME = "authQueue";
public static final String DLX_AUTH_QUEUE_NAME = "dlxAuthQueue";
@Bean
@Qualifier(GLOBAL_RABBIT_TEMPLATE)
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean
@Qualifier(AUTH_EXCHANGE_NAME)
public Exchange authExchange() {
return ExchangeBuilder.directExchange (AUTH_EXCHANGE_NAME).durable (true).build ();
}
/**
* 死信交换机
* @return
*/
@Bean
@Qualifier(DLX_EXCHANGE_NAME)
public Exchange dlxExchange() {
return ExchangeBuilder.directExchange (DLX_EXCHANGE_NAME).durable (true).build ();
}
/**
* 记录日志的死信行列
* @return
*/
@Bean
@Qualifier(DLX_QUEUE_NAME)
public Queue dlxQueue() {
// Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
return QueueBuilder.durable (DLX_QUEUE_NAME).build ();
}
/**
* 托付授权专用行列
* @return
*/
@Bean
@Qualifier(AUTH_QUEUE_NAME)
public Queue authQueue() {
return QueueBuilder
.durable (AUTH_QUEUE_NAME)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME)
.withArgument("x-dead-letter-routing-key", "dlx_auth")
.build ();
}
/**
* 托付授权专用死信行列
* @return
*/
@Bean
@Qualifier(DLX_AUTH_QUEUE_NAME)
public Queue dlxAuthQueue() {
// Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
return QueueBuilder
.durable (DLX_AUTH_QUEUE_NAME)
.withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME)
.withArgument("x-dead-letter-routing-key", "dlx_key")
.build ();
}
@Bean
public Binding bindDlxQueueExchange(@Qualifier(DLX_QUEUE_NAME) Queue dlxQueue, @Qualifier(DLX_EXCHANGE_NAME) Exchange dlxExchange){
return BindingBuilder.bind (dlxQueue).to (dlxExchange).with ("dlx_key").noargs ();
}
/**
* 托付授权专用死信行列绑定联系
* @param dlxAuthQueue
* @param dlxExchange
* @return
*/
@Bean
public Binding bindDlxAuthQueueExchange(@Qualifier(DLX_AUTH_QUEUE_NAME) Queue dlxAuthQueue, @Qualifier(DLX_EXCHANGE_NAME) Exchange dlxExchange){
return BindingBuilder.bind (dlxAuthQueue).to (dlxExchange).with ("dlx_auth").noargs ();
}
/**
* 托付授权专用行列绑定联系
* @param authQueue
* @param authExchange
* @return
*/
@Bean
public Binding bindAuthQueueExchange(@Qualifier(AUTH_QUEUE_NAME) Queue authQueue, @Qualifier(AUTH_EXCHANGE_NAME) Exchange authExchange){
return BindingBuilder.bind (authQueue).to (authExchange).with ("auth").noargs ();
}
}
②发送含过期时刻的音讯
向授权交换机,发送路由为”auth”的音讯(指定了事务所需的超时时刻) =》发向MqConfig.AUTH_QUEUE_NAME 行列
rabbitTemplate.convertAndSend(MqConfig.AUTH_EXCHANGE_NAME, "auth", "类型:END,信息:{id:1,fromUserId:111,toUserId:222,beginData:20201204,endData:20211104}", message -> {
/**
* MessagePostProcessor:音讯后置处理
* 为音讯设置属性,然后回来音讯,相当于包装音讯的类
*/
//事务逻辑:过期时刻=xxxx
String ttl = "5000";
//设置音讯的过期时刻
message.getMessageProperties ().setExpiration (ttl);
return message;
});
③超时后行列MqConfig.AUTH_QUEUE_NAME会将音讯转发至其装备的死信路由”dlx_auth”,监听该死信行列即可消费守时的音讯
/**
* 授权守时处理
* @param channel
* @param message
*/
@RabbitListener(queues = MqConfig.DLX_AUTH_QUEUE_NAME)
public void dlxAuthQ(Channel channel, Message message) throws IOException {
System.out.println ("\n死信原因:" + message.getMessageProperties ().getHeaders ().get ("x-first-death-reason"));
//1.判别音讯类型:1.BEGIN 2.END
try {
//2.1 类型为授权到期(END)
//2.1.1 修正报件处理人
//2.1.2 修正授权状况为0(失效)
//2.2 类型为授权敞开(BEGIN)
//2.2.1 修正授权状况为1(敞开)
System.out.println (new String(message.getBody (), Charset.forName ("utf8")));
channel.basicAck (message.getMessageProperties ().getDeliveryTag (), false);
System.out.println ("已处理,授权相关信息修正成功");
} catch (Exception e) {
//拒签音讯
channel.basicNack (message.getMessageProperties ().getDeliveryTag (), false, false);
System.out.println ("授权相关信息处理失败, 进入死信行列记录日志");
}
}