不了解rabbitmq的能够看看我上篇文章rabbitmq入门。
欢迎重视个人公众号【好好学技能】交流学习
如何确保音讯不丢失
rabbitmq音讯投递途径
生产者->交换机->行列->顾客
总的来说分为三个阶段。
- 1.生产者确保音讯投递可靠性。
- 2.mq内部音讯不丢失。
- 3.顾客消费成功。
什么是音讯投递可靠性
简单点说就是音讯百分百发送到音讯行列中。
我们能够敞开confirmCallback
生产者投递音讯后,mq会给生产者一个ack.依据ack,生产者就能够承认这条音讯是否发送到mq.
敞开confirmCallback
修正装备文件
#NONE:禁用发布承认形式,是默认值,CORRELATED:发布音讯成功到交换器后会触发回调办法
spring:
rabbitmq:
publisher-confirm-type: correlated
测验代码
@Test
public void testConfirmCallback() throws InterruptedException {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 装备
* @param ack 交换机是否收到音讯,true是成功,false是失利
* @param cause 失利的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm=====>");
System.out.println("confirm==== ack="+ack);
System.out.println("confirm==== cause="+cause);
//依据ACK状况做对应的音讯更新操作 TODO
}
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ikun.mei", "鸡你太美");
Thread.sleep(10000);
}
经过returnCallback确保音讯从交换器发送到行列成功。
修正装备文件
spring:
rabbitmq:
#敞开returnCallback
publisher-returns: true
#交换机处理音讯到路由失利,则会回来给生产者
template:
mandatory: true
测验代码
@Test
void testReturnCallback() {
//为true,则交换机处理音讯到路由失利,则会回来给生产者 装备文件指定,则这儿不需指定
rabbitTemplate.setMandatory(true);
//敞开强制音讯投递(mandatory为设置为true),但音讯未被路由至任何一个queue,则回退一条音讯
rabbitTemplate.setReturnsCallback(returned -> {
int code = returned.getReplyCode();
System.out.println("code="+code);
System.out.println("returned="+ returned);
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"123456","测验returnCallback");
}
顾客消费音讯时需要经过ack手动承认音讯已消费。
修正装备文件
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
编写测验代码
@RabbitHandler
public void consumer(String body, Message message, Channel channel) throws IOException {
long msgTag = message.getMessageProperties().getDeliveryTag();
System.out.println("msgTag="+msgTag);
System.out.println("message="+ message);
System.out.println("body="+body);
//成功承认,运用此回执办法后,音讯会被 rabbitmq broker 删去
channel.basicAck(msgTag,false);
// channel.basicNack(msgTag,false,true);
}
deliveryTags是音讯投递序号,每次消费音讯或者音讯从头投递后,deliveryTag都会添加
ttl死信行列
什么是死信行列
没有被及时消费的音讯寄存的行列
音讯有哪几种情况成为死信
- 顾客拒收音讯 (basic.reject/ basic.nack) ,而且没有从头入队requeue=false
- 音讯在行列中未被消费,且超过行列或者音讯本身的过期时刻TTL(time-to-live)
- 行列的音讯长度达到极限
- 成果:音讯成为死信后,如果该行列绑定了死信交换机,则音讯会被死信交换机从头路由到死信行列
死信行列经常用来做推迟行列消费。
推迟行列
生产者投递到mq中并不希望这条音讯立马被消费,而是等待一段时刻后再去消费。
springboot整合rabbitmq实现订单超时主动关闭
package com.fandf.test.rabbit;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author fandongfeng
* @date 2023/4/15 15:38
*/
@Configuration
public class RabbitMQConfig {
/**
* 订单交换机
*/
public static final String ORDER_EXCHANGE = "order_exchange";
/**
* 订单行列
*/
public static final String ORDER_QUEUE = "order_queue";
/**
* 订单路由key
*/
public static final String ORDER_QUEUE_ROUTING_KEY = "order.#";
/**
* 死信交换机
*/
public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange";
/**
* 死信行列 routingKey
*/
public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key";
/**
* 死信行列
*/
public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue";
/**
* 创立死信交换机
*/
@Bean("orderDeadLetterExchange")
public Exchange orderDeadLetterExchange() {
return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false);
}
/**
* 创立死信行列
*/
@Bean("orderDeadLetterQueue")
public Queue orderDeadLetterQueue() {
return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build();
}
/**
* 绑定死信交换机和死信行列
*/
@Bean("orderDeadLetterBinding")
public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs();
}
/**
* 创立订单交换机
*/
@Bean("orderExchange")
public Exchange orderExchange() {
return new TopicExchange(ORDER_EXCHANGE, true, false);
}
/**
* 创立订单行列
*/
@Bean("orderQueue")
public Queue orderQueue() {
Map<String, Object> args = new HashMap<>(3);
//音讯过期后,进入到死信交换机
args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE);
//音讯过期后,进入到死信交换机的路由key
args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY);
//过期时刻,单位毫秒
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();
}
/**
* 绑定订单交换机和行列
*/
@Bean("orderBinding")
public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs();
}
}
顾客
package com.fandf.test.rabbit;
import cn.hutool.core.date.DateUtil;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author fandongfeng
* @date 2023/4/15 15:42
*/
@Component
@RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)
public class OrderMQListener {
@RabbitHandler
public void consumer(String body, Message message, Channel channel) throws IOException {
System.out.println("收到音讯:" + DateUtil.now());
long msgTag = message.getMessageProperties().getDeliveryTag();
System.out.println("msgTag=" + msgTag);
System.out.println("message=" + message);
System.out.println("body=" + body);
channel.basicAck(msgTag, false);
}
}
测验类
@Test
void testOrder() throws InterruptedException {
//为true,则交换机处理音讯到路由失利,则会回来给生产者 装备文件指定,则这儿不需指定
rabbitTemplate.setMandatory(true);
//敞开强制音讯投递(mandatory为设置为true),但音讯未被路由至任何一个queue,则回退一条音讯
rabbitTemplate.setReturnsCallback(returned -> {
int code = returned.getReplyCode();
System.out.println("code=" + code);
System.out.println("returned=" + returned);
});
rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", "测验订单推迟");
System.out.println("发送音讯:" + DateUtil.now());
Thread.sleep(20000);
}
程序输出
发送音讯:2023-04-16 15:14:34
收到音讯:2023-04-16 15:14:44
msgTag=1
message=(Body:'测验订单推迟' MessageProperties [headers={spring_listener_return_correlation=03169cfc-5061-41fe-be47-c98e36d17eac, x-first-death-exchange=order_exchange, x-death=[{reason=expired, count=1, exchange=order_exchange, time=Mon Apr 16 15:14:44 CST 2023, routing-keys=[order], queue=order_queue}], x-first-death-reason=expired, x-first-death-queue=order_queue}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=order_dead_letter_exchange, receivedRoutingKey=order_dead_letter_queue_routing_key, deliveryTag=1, consumerTag=amq.ctag-Eh8GMgrsrAH1rvtGj7ykOQ, consumerQueue=order_dead_letter_queue])
body=测验订单推迟