敞开生长之旅!这是我参加「日新方案 2 月更文应战」的第 5 天,点击检查活动概况

在上一篇 Java 完成订单未支付超时主动撤销,运用Java自带的守时使命TimeTask完成订单超时撤销,但是有小伙伴提出这种完成,会有以下几个问题:

  • 线上服务挂了,导致服务下一切的守时使命失效。
  • 服务重启,守时使命也会失效。
  • 服务上线需要发布新的服务,本来服务也会封闭。

针对上述服务挂了、或许服务重启导致音讯失效的问题,需要运用独立于项目的服务,比方音讯中间件,比方Redis或许RabbitMQ。本文主要解说音讯行列RabbitMQ

完成效果

创建一个订单,超时30分钟未支付就撤销订单。

延迟队列实现订单超时自动取消

RabbitMQ自身是不支撑推迟行列的,但能够使用RabbitMQ存活时刻 + 死信行列来完成音讯推迟。

TTL + DLX

存活时刻 TTL

TTL全称为:time to live,意思为存活时刻,当音讯没有配置顾客,音讯就一向停留在行列中,停留时刻超越存活时刻后,音讯会被主动删除

RabbitMQ支撑两种TTL设置:

  • 对音讯自身设置存活时刻,每条音讯的存活时刻能够灵活设置为不同的存活时刻。
  • 对传递的行列设置存活时刻,每条传到到行列的过期时刻都共同。

当音讯过期还没有被消费,此刻音讯会变成死信音讯dead letter这是完成推迟行列的要害

音讯变为死信的条件:

  • 音讯被回绝basic.reject/basic.nack,而且requeue=false
  • 音讯的过期时刻到期了。
  • 行列到达最大长度。

死信交流机 DLX

当上面的音讯变成死信音讯之后,它不会立即被删除,首要它要看有没有对应的死信交流机,如果有绑定的死信交流机,音讯就会从发送到对应的死信交流机上。

DLX全程为Dead Letter Exchanges,意思为死信交流机。

死信交流机和普通交流机没什么差异,不同的是死信交流时机绑定在其他行列上,当行列的音讯变成死信音讯后,死信音讯会发送到死信交流上。

行列绑定死信交流机需要两个参数:

  • x-dead-letter-exchange: 绑定的死信交流机称号。
  • x-dead-letter-routing-key: 绑定的死信交流机routingKey

死信交流机和普通交流机的差异便是死信交流机的ExchangeroutingKey作为绑定参数,绑定在其他行列上。

项目实战

音讯发送的流程图:

延迟队列实现订单超时自动取消

  • 生产者将带有TTL的音讯发送给交流机,由交流机路由到行列中。
  • 行列由于没有消费,音讯一向停留在行列中,一向等到音讯超时,变成死信音讯。
  • 死信音讯转发到死信交流机在路由到死信行列上,最终给顾客消费。

创建死信行列

@Configuration
public class DelayQueueRabbitConfig {
  // 下面是死信行列
	/**
	 * 死信行列
	 */
	public static final String DLX_QUEUE = "queue.dlx";
	/**
	 * 死信交流机
	 */
	public static final String DLX_EXCHANGE = "exchange.dlx";
	/**
	 * 死信routing-key
	 */
	public static final String DLX_ROUTING_KEY = "routingKey.dlx";
	/**
	 * 死信行列
	 * @return
	 */
	@Bean
	public Queue dlxQueue() {
		return new Queue(DLX_QUEUE,true);
	}
	/**
	 * 死信交流机
	 * @return
	 */
	@Bean
	public DirectExchange dlxExchange() {
		return new DirectExchange(DLX_EXCHANGE,true,false);
	}
	/**
	 * 死信行列和交流机绑定
	 * @return
	 */
	@Bean
	public Binding bindingDLX() {
		return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);
	}
}

创建推迟行列,并绑定死信行列

  // 下面的是推迟行列
	/**
	 * 订单推迟行列
	 */
	public static final String ORDER_QUEUE = "queue.order";
	/**
	 * 订单交流机
	 */
	public static final String ORDER_EXCHANGE = "exchange.order";
	/**
	 * 订单routing-key
	 */
	public static final String ORDER_ROUTING_KEY = "routingkey.order";
	/**
	 * 订单推迟行列
	 * @return
	 */
	@Bean
	public Queue orderQueue() {
		Map<String,Object> params = new HashMap<>();
		params.put("x-dead-letter-exchange", DLX_EXCHANGE);
		params.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
		return new Queue(ORDER_QUEUE, true, false, false, params);
	}
	/**
	 * 订单交流机
	 * @return
	 */
	@Bean
	public DirectExchange orderExchange() {
		return new DirectExchange(ORDER_EXCHANGE,true,false);
	}
	/**
	 * 订单行列和交流机绑定
	 * @return
	 */
	@Bean
	public Binding orderBinding() {
		return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);
	}

绑定死信交流通过增加x-dead-letter-exchangex-dead-letter-routing-key参数指定对应的交流机和路由。

发送音讯

设置五秒超时时刻

@RestController
public class SendController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/dlx")
    public String dlx() {
        String date = DateUtil.dateFormat(new Date());
        String delayTime = "5000";
        System.out.println("【发送音讯】推迟 5 秒 发送时刻 " + date);
        rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
                message, message1 -> {
                    message1.getMessageProperties().setExpiration(delayTime);
                    return message1;
                });
       return "ok";         
    }
    class DateUtil{
       public static String dateFormat(Date date) {
        SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
        return sdf.format(date);
      }
    } 
}    

消费音讯

@RabbitListener(queues = DelayQueueRabbitConfig.DLX_QUEUE)
public void delayPrecss(String msg,Channel channel,Message message){
    System.out.println("【接纳音讯】" + msg + " 接纳时刻" + DateUtil.dateFormat(new Date()));
}

控制台输出

【发送音讯】推迟5 秒 发送时刻 21:32:15
【接纳音讯】推迟5 秒 发送时刻 21:32:15 接纳时刻21:32:20

发送音讯,5秒之后顾客后会收到音讯。说明推迟成功。

行列都有先进先出的特色,如果行列前面的音讯推迟比后的音讯推迟更长,会出现什么情况。

音讯时序问题

发送三条音讯,推迟分别是10s2s5s

@RestController
public class SendController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/dlx")
    public String dlx() {
         dlxSend("推迟10秒","10000");
         dlxSend("推迟2 秒","2000");
         dlxSend("推迟5 秒","5000");
         return "ok";
    }
    private void dlxSend(String message,String delayTime) {
         System.out.println("【发送音讯】" + message +  "当时时刻" + DateUtil.dateFormat(new Date()));
         rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
                message, message1 -> {
                    message1.getMessageProperties().setExpiration(delayTime);
                    return message1;
                });
    }

控制台输出:

【发送音讯】推迟10秒当时时刻21:54:36
【发送音讯】推迟2 秒当时时刻21:54:36
【发送音讯】推迟5 秒当时时刻21:54:36
【接纳音讯】推迟10秒 当时时刻21:54:46
【接纳音讯】推迟2 秒 当时时刻21:54:46
【接纳音讯】推迟5 秒 当时时刻21:54:46

一切的音讯都要等10s的音讯消费完才干消费,当10s音讯未被消费,其他音讯也会堵塞,即便音讯设置了更短的推迟。由于行列有先进先出的特征,当行列有多条音讯,推迟时刻就没用作用了,前面的音讯消费后,后的音讯才干被消费,不然会被堵塞到行列中。

插件完成处理音讯时序问题

针对上面音讯的时序问题,RabbitMQ开发一个推迟音讯的插件delayed_message_exchange,推迟音讯交流机。运用该插件能够处理上面时序的问题。

在Github官网找到对应的版别,我选择的是3.8.17

延迟队列实现订单超时自动取消

将文件下载下来放到服务器/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.9/plugins目录下,履行以下命令,发动插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

发动插件,交流时机有新的类型x-delayed-message:

延迟队列实现订单超时自动取消

x-delayed-message类型的交流机,支撑推迟投递音讯。发送音讯给x-delayed-message类型的交流流程图:

延迟队列实现订单超时自动取消

  • x-delayed-message类型的交流机接纳音讯投递后,并未将直接路由到行列中,而是存储mnesia(一个分布式数据体系),该体系会检测音讯推迟时刻。
  • 音讯到达可投递时刻,音讯会被投递到方针行列。

配置推迟行列

@Configuration
public class XDelayedMessageConfig {
  /**
	 * 行列
	 */
	public static final String DIRECT_QUEUE = "queue.delayed";
	/**
	 * 推迟交流机
	 */
	public static final String DELAYED_EXCHANGE = "exchange.delayed";
	/**
	 * 绑定的routing key
	 */
	public static final String ROUTING_KEY = "routingKey.bind";
	@Bean
	public Queue directQueue() {
		return new Queue(DIRECT_QUEUE,true);
	}
	/**
	 * 定义推迟交流机
	 * 交流机的类型为 x-delayed-message
	 * @return
	 */
	@Bean
	public CustomExchange delayedExchange() {
		Map<String,Object> map = new HashMap<>();
		map.put("x-delayed-type","direct");
		return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,map);
	}
	@Bean
	public Binding delayOrderBinding() {
		return BindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
	}
}

发送音讯:

    @GetMapping("/delay")
    public String delay() {
	    delaySend("推迟行列10 秒",10000);
	    delaySend("推迟行列5 秒",5000);
	    delaySend("推迟行列2 秒",2000);
        return "ok";
    }
    private void delaySend(String message,Integer delayTime) {
        message = message + " " + DateUtil.dateFormat(new Date());
        System.out.println("【发送音讯】" + message);
        rabbitTemplate.convertAndSend(XDelayedMessageConfig.DELAYED_EXCHANGE,XDelayedMessageConfig.ROUTING_KEY,
                message, message1 -> {
                    message1.getMessageProperties().setDelay(delayTime);
                    //message1.getMessageProperties().setHeader("x-delay",delayTime);
                    return message1;
                });
    }    

消费音讯:

    @RabbitListener(queues = XDelayedMessageConfig.DIRECT_QUEUE)
    public void delayProcess(String msg,Channel channel, Message message) {
        System.out.println("【接纳音讯】" + msg + " 当时时刻" + DateUtil.dateFormat(new Date()));
   }

控制台输出:

【发送音讯】推迟行列10 秒 22:00:01
【发送音讯】推迟行列5 秒 22:00:01
【发送音讯】推迟行列2 秒 22:00:01
【接纳音讯】推迟行列2 秒 22:00:01 当时时刻22:00:03
【接纳音讯】推迟行列5 秒 22:00:01 当时时刻22:00:05
【接纳音讯】推迟行列10 秒 22:00:01 当时时刻22:00:10

处理了音讯的时序问题。

总结

  • 运用Java自带的推迟音讯,体系重启或许挂了之后,音讯就无法发送,不适于用在生产环境上。
  • RabbitMQ自身不支撑推迟行列,能够运用存活时刻ttl + 死信行列dlx完成音讯推迟。
    • 发送的音讯设置ttl,地点的行列不设置顾客。
    • 行列绑定死信行列,音讯超时之后,变成死信音讯,再发送给死信行列,最终发送给顾客。
  • 发送多条不同推迟时刻音讯,前面音讯没有到推迟时刻,会堵塞后面推迟更低的音讯,由于行列有先进先出的特性。
  • RabbitMQx-delay-message插件能够处理音讯时序问题。
    • 带有ttl的音讯发送x-delayed-message类型的交流机,音讯不会直接路由到行列中。而且存储到分布式数据体系中,该体系会检测音讯推迟时刻。
    • 音讯到达推迟时刻,音讯才干会投递到行列中,最终发送给顾客。

Github 源码

  • Github 源码

参阅

  • Time-To-Live and Expiration

  • Dead Letter Exchanges

  • 领导看了我写的封闭超时订单,让我出门左转!

敞开生长之旅!这是我参加「日新方案 2 月更文应战」的第 5 天,点击检查活动概况