敞开成长之旅!这是我参与「日新计划 12 月更文应战」的第23天,点击查看活动概况

什么是死信行列?

死信行列,英文缩写DLX,Dead Letter Exchange(死信交换机),当音讯成为Dead message(音讯过期)后,能够被从头发送到另一个交换机,这个交换机就算是DLX,其实死信交换机(行列)和正常交换机(行列)没有什么差异

为什么叫死信行列可是翻译过来叫死信交换机呢,由于RabbitMQ比较特殊,其他MQ只要行列没有交换机这个概念的

正常来说,行列设置了过期时刻,当音讯到了行列之后,在过期时刻内没有被消费,那么这个音讯就被丢掉了,可是假如这个行列绑定了一个DLX死信行列(交换机),那么就算音讯过期了也不会被直接丢掉掉,而是会发送给死信交换机,那么死信交换机又能够绑定其他行列,将这些音讯存储到其他行列,然后又能够进行音讯消费,就算这个意思,过程如图所示

RabbitMQ高级特性:死信队列

什么情况下音讯成为死信行列

音讯成为死信行列的三种情况

1 行列音讯长度抵达约束

比方说给行列最大存储长度为10,当11条音讯进来的时分,第11条音讯进不去了,那么第11条音讯就是死信

2 顾客回绝消费音讯,basicNack/basicReject,而且不把音讯从头放入原目标行列,requeue=false,

顾客运用basicNack/basicReject,而且requeue=false,表示顾客回绝从头消费该音讯

3 原行列存在音讯过期设置,音讯抵达超时时刻未被消费

本来的行列存在过期时刻,可是到了过期时刻还没有消费该音讯

行列绑定交换机的办法

给行列设置两个参数 x-dead-letter-exchange(设置交换机的称号)和x-dead-letter-routing-key(发送音讯时指定的routingkey)

死信行列代码完成

声明两套交换机行列,一套是正常的交换机行列,一套是死信的交换机行列

RabbitMQ高级特性:死信队列

1 声明正常行列和交换机test_queue_dlx test_exchange_dlx

2 声明死信行列和死信交换机 queue_dlx exchange_dlx

3 正常行列绑定死信交换机

设置两个参数

1死信交换机称号 x-dead-letter-exchange

2发送给死信交换机的路由键 x-dead-letter-routing-key

1 音讯过期时刻的测验

测验过期时刻的死信 给正常的交换机发送音讯,过了存活时刻音讯主动从正常行列跑到死信行列

RabbitMQ高级特性:死信队列

RabbitMQ高级特性:死信队列

2 行列长度约束的测验

假如发送成功 那么由于设置了最大长度是10,只会有10条进行正常行列剩下的会跑到死信行列,过了10s后正常行列中的音讯也会主动跑到死信行列中

RabbitMQ高级特性:死信队列

RabbitMQ高级特性:死信队列

3 顾客音讯拒收的约束

顾客Consumer监听正常的行列,然后让音讯回绝接纳而且不重回行列

由于顾客拒收音讯,音讯会直接跑到私信行列中

RabbitMQ高级特性:死信队列

RabbitMQ高级特性:死信队列

RabbitMQ高级特性:死信队列

终究相关测验代码

出产者装备

 <? xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:context="http://www.springframework.org/schema/context"
 xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 xsi:schemaLocation="http://www.springframework.org/schema/beans
 http://www.springframework.org/schema/beans/spring-beans.xsd
 http://www.springframework.org/schema/context
 https://www.springframework.org/schema/context/spring-context.xsd
 http://www.springframework.org/schema/rabbit
 http://www.springframework.org/schema/rabbit/spring-rabbit.xsd" >
    <!--加载装备文件-->
 <context:property-placeholder location="classpath:application.properties" />
    <!-- 界说rabbitmq connectionFactory -->
 <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
 port="${rabbitmq.port}"
 username="${rabbitmq.username}"
 password="${rabbitmq.password}"
 virtual-host="${rabbitmq.virtual-host}"
 publisher-confirms="true"
 publisher-returns="true"
 />
    <!--界说管理交换机,行列-->
 <rabbit:admin connection-factory="connectionFactory" />
    <!--界说rabbitTemplate对象操作能够在代码中调用api发送音讯-->
 <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" />
    <!--音讯的可靠性投递(出产端)-->
 <!--行列-->
 <rabbit:queue id="test_queue_confirm" name="test_queue_confirm" ></rabbit:queue>
    <!--交换机 广播-->
 <rabbit:direct-exchange name="test_exchange_confirm" >
        <!--绑定queue-->
 <rabbit:bindings>
            <rabbit:binding queue="test_queue_confirm" key="confirm" ></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>
    <!--ttl音讯存活时刻-->
 <!--声明行列queue-->
 <rabbit:queue name="test_queue_ttl" id="test_queue_ttl" >
            <!--设置queue行列存活时刻-->
 <rabbit:queue-arguments>
            <!--经过entry设置属性 键值对应的格式 key=键 value=值 value-type表示值的数据类型-->
 <!--设置行列存活为10s 默许单位为毫秒
 由于存换时刻是int类型 这边声明的是字符串 需求运用value-type表名值是整形的 不指定会报错的 -->
 <!--x-message-ttl 指行列的过期时刻-->
 <entry key="x-message-ttl" value="100000" value-type="java.lang.Integer" ></entry>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <!--声明交换机-->
 <rabbit:topic-exchange name="test_exchange_ttl" >
        <!--绑定交换机和行列-->
 <rabbit:bindings>
            <rabbit:binding pattern="ttl.#" queue="test_queue_ttl" ></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!--
 死信行列
 1 声明正常行列和交换机test_queue_dlx   test_exchange_dlx
 2 声明死信行列和死信交换机  queue_dlx exchange_dlx
 3 正常行列绑定死信交换机
 设置两个参数
 1死信交换机称号  x-dead-letter-exchange
 2发送给死信交换机的路由键 x-dead-letter-routing-key
 -->
 <!-- 1 声明正常行列和交换机test_queue_dlx   test_exchange_dlx-->
 <rabbit:queue id="test_queue_dlx" name="test_queue_dlx" >
        <!-- 3正常行列绑定死信交换机 运用arguments和entry设置参数-->
 <rabbit:queue-arguments>
            <!--3.1死信交换机称号  x-dead-letter-exchange-->
 <entry key="x-dead-letter-exchange" value="exchange_dlx" />
            <!--3.2发送给死信交换机的路由键 x-dead-letter-routing-key-->
 <entry key="x-dead-letter-routing-key" value="dlx.hehe" />
            <!--4让音讯完成死信-->
 <!--4.1 设置行列过期时刻 ttl 10s-->
 <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
            <!--4.2 设置行列的长度约束 max_length-->
 <entry key="x-max-length" value="10" value-type="java.lang.Integer" />
        </rabbit:queue-arguments>
    </rabbit:queue>
    <!--设置正常的交换机-->
 <rabbit:topic-exchange name="test_exchange_dlx" >
        <!--正常交换机绑定正常行列-->
 <rabbit:bindings>
            <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx" />
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!-- 2 声明死信行列和死信交换机  queue_dlx exchange_dlx-->
 <rabbit:queue id="queue_dlx" name="queue_dlx" />
    <!--设置正常的交换机-->
 <rabbit:topic-exchange name="exchange_dlx" >
        <!--正常交换机绑定正常行列-->
 <rabbit:bindings>
            <rabbit:binding pattern="dlx.#" queue="queue_dlx" />
        </rabbit:bindings>
    </rabbit:topic-exchange>
</beans>

顾客装备

 <? xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:context="http://www.springframework.org/schema/context"
 xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 xsi:schemaLocation="http://www.springframework.org/schema/beans
 http://www.springframework.org/schema/beans/spring-beans.xsd
 http://www.springframework.org/schema/context
 https://www.springframework.org/schema/context/spring-context.xsd
 http://www.springframework.org/schema/rabbit
 http://www.springframework.org/schema/rabbit/spring-rabbit.xsd" >
    <!--加载装备文件-->
 <context:property-placeholder location="classpath:application.properties" />
    <!-- 界说rabbitmq connectionFactory -->
 <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
 port="${rabbitmq.port}"
 username="${rabbitmq.username}"
 password="${rabbitmq.password}"
 virtual-host="${rabbitmq.virtual-host}"
 />
    <!--装备监听器bean 扫描这个包的途径 装备类需求加注解-->
 <context:component-scan base-package="com.wyh.listener" />
    <!--界说监听器    acknowledge="" 设置签收办法 -->
 <rabbit:listener-container  connection-factory="connectionFactory"  acknowledge="manual" prefetch="5" >
        <!--加载对应监听器的类 详细的类名和监听的行列名-->
 <!--  <rabbit:listener ref="qasCKListener" queue-names="test_queue_confirm"  />-->
 <!--
 <rabbit:listener ref="rabbitMQACKListener" queue-names="test_queue_confirm"  />
-->
 <!--监听正常行列 回绝签收并不允许重返行列 成为死信行列-->
 <rabbit:listener ref="DLXCKListener" queue-names="test_queue_dlx" />
    </rabbit:listener-container>
</beans>

出产者发送音讯


package com.producer.test;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 /**
 *  @prog ram: SpringBoot_RabbitMQ_Advanced
 *  @description:  测验承认形式音讯是否发送成功
 *  @author:  魏一鹤
 *  @createDate:  2022-04-04 23:10
 **/
 //spring装备文件
@RunWith(SpringJUnit4ClassRunner.class)
 //加载文件的途径
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml" )
public class ProducerTest {
    //注入 RabbitTemplate
 @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
 * 承认形式:
 * 步骤
 * 1承认形式的敞开:在connectionFactory中敞开,默许是false不敞开的 publisher-confirms="true"
 * 2回调函数的编写:在RabbitTemplate模板工具类界说ConfirmCallBack(回调函数).当音讯发送出去的时分回调函数会主动履行,回来true(成功)或者false(失利)
 **/
 @Test
    public void testConfirm() {
        //界说承认形式的回调函数
  rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            //匿名内部类
 /**
 * confirm有三个参数 下面一一说明
 * CorrelationData correlationData 相关的装备信息
 * boolean ack  exchange交换机是否成功收到了音讯 true成功false失利
 * String cause 失利原因 ack=false
 **/
  public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println( "承认形式的回调函数被履行了!" ); //承认形式的回调函数被履行了!
 System.out.println( "音讯是否发送成功?" +ack);
                if(ack){
                    //交换机接纳成功出产者发送的音讯
 System.out.println( "接纳成功音讯!原因是:" +cause);
                }else{
                    //交换机接纳没有成功出产者发送的音讯
 System.out.println( "接纳失利音讯!原因是:" +cause);
                    //将来会做处理,就算音讯发送失利也会从头去发送音讯,确保音讯第2次发送成功
 }
            }
        });
        //发送音讯
  rabbitTemplate.convertAndSend( "test_exchange_confirm" , "confirm" , "message confirm..." );
    }
    /**
 * 回退形式:当音讯发送给Exchange交换机后,交换机路由到queue失利时才会履行ReturnCallBack
 * 步骤
 * 1回退形式的敞开:在connectionFactory中敞开,默许是false不敞开的 publisher-returns="true"
 * 2回调函数的编写:在RabbitTemplate模板工具类界说ConfirmCallBack(回调函数).当音讯发送出去的时分回调函数会主动履行,回来true(成功)或者false(失利)
 * 3设置Exchange处理音讯的形式 它有两种形式
 *   3.1 第一种形式:假如音讯没有路由到queue行列,则会丢掉音讯(默许的办法)
 *   3.2 第二种形式:假如音讯没有路由到queue行列,则回来给音讯的发送方ReturnCallBack
 **/
 @Test
    public void testReturn() {
        //由于处理音讯的形式默许是假如音讯没有路由到queue行列,则会丢掉音讯
 //所以需求设置交换机处理音讯的形式,交换机会把音讯回来给对应的出产者,出产者经过监听就能拿到音讯
  rabbitTemplate.setMandatory(true);
        //编写returnCallBack回调函数
  rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
            @Override
            //匿名内部类
 /**
 * returnedMessage有五个参数 下面一一说明
 * Message message 音讯对象
 * int replyCode    回来编码 错误码
 * String replyText 错误信息
 * String exchange  交换机
 * String routingKey 路由键
 **/
  public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println( "回来形式的回调函数被履行了!" );
                System.out.println( "音讯对象:" +message);
                System.out.println( "回来编码(错误码):" +replyCode);
                System.out.println( "错误信息:" +replyText);
                System.out.println( "交换机:" +exchange);
                System.out.println( "路由键:" +routingKey);
                //将来会做处理 把信息从头路由
 //-----------------------------------------打印信息
 //回来形式的回调函数被履行了!
 //音讯对象:(Body:'message confirm...' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
 //回来编码(错误码):312
 //错误信息:NO_ROUTE
 //交换机:test_exchange_confirm
 //路由键:confirm111
 }
        });
        //发送音讯
  rabbitTemplate.convertAndSend( "test_exchange_confirm" , "confirm" , "message confirm..." );
    }
    //专门测验限流循环发送音讯
 @Test
    public void testSend() {
        for (int i = 0; i < 10; i++) {
            //发送音讯
  rabbitTemplate.convertAndSend( "test_exchange_confirm" , "confirm" , "message confirm..." );
        }
    }
    //测验ttl
 /**
 *  ttl 过期时刻分为两种
 *  1行列一致过期
 *  2音讯独自过期
 *  假如设置了音讯的过期时刻,也设置了行列的过期时刻,它以时刻短的为准!
 *  假如行列过期后,会将行列所有音讯悉数移除,由于是一致的
 *  音讯过期后,只要音讯在行列顶端(快要被消费),才会判断其是否过期,假如过期就会被移除掉
 **/
 @Test
    public void testTTL() {
        //音讯独自过期 在convertAndSend办法中心新增一个参数 MessagePostProcessor 加载匿名内部类和它的重写办法
 //MessagePostProcessor 音讯处理对象 设置一些音讯参数信息
 //发送音讯
  for (int i = 0; i < 10; i++) {
            if(i==5){
                //过期的音讯
  rabbitTemplate.convertAndSend( "test_exchange_ttl" , "ttl.weiyihe" , "message ttl..." ,new MessagePostProcessor(){
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        //设置message的信息  setExpiration 设置音讯过期时刻
 message.getMessageProperties().setExpiration( "10000" );
                        //回来该音讯
  return message;
                    }
                });
            }else{
                //不过期的音讯
  rabbitTemplate.convertAndSend( "test_exchange_ttl" , "ttl.weiyihe" , "message ttl..." );
            }
        }
    }
    /**
 * 发送测验私信音讯
 * 1、音讯过期时刻的测验
 * 2、行列长度约束的测验
 * 3、顾客音讯拒收的约束
 **/
 @Test
    public void testDlx(){
        //1、测验过期时刻的死信 给正常的交换机发送音讯
 //假如音讯正常发送到正常的交换机,过了10s会主动去死信行列
 //发送音讯
 //rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha", "我是一条音讯,我会死吗?");
 //2、行列长度约束的测验
 //发送20条信息 现在x-max-length是10
 //假如发送成功 那么由于设置了最大长度是10,只会有10条进行正常行列
 // 剩下的会跑到死信行列,过了10s后正常行列中的音讯也会主动跑到死信行列中
 //for (int i = 0; i < 20; i++) {
 //    //发送音讯
 //    rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.haha", "我是一条音讯,我会死吗?");
 //}
 //3 顾客音讯拒收的约束
 //顾客Consumer监听正常的行列,然后让音讯回绝接纳而且不重回行列
  rabbitTemplate.convertAndSend( "test_exchange_dlx" , "test.dlx.haha" , "我是一条音讯,我会死吗?" );
    }
}

顾客拒收音讯

package com.wyh.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
 /**
 *  @program:  SpringBoot_RabbitMQ_Advanced
 *  @description:  RabbitMQ 死信通信
 *  @author:  魏一鹤
 *  @createDate:  2022-04-06 20:30
 **/
 //包扫描注解 把bean加载到spring容器中
@Component
 //完成MessageListener接口并重写onMessage办法
public class DLXCKListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            //1 接纳并打印顾客收到(消费)的音讯
 System.out.println(new String(message.getBody()));
            //2 处理事务逻辑
 System.out.println( "处理事务逻辑......" );
            //故意使得事务报错
  int num=3/0;
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            System.out.println( "出现异常,顾客回绝签收!" );
            //死信行列 回绝签收requeue=false 将音讯路由到死信行列中
 channel.basicNack(deliveryTag,true,false);
        }
    }
}

DLX小结

1 死信交换机(行列)和普通交换机(行列)没什么差异

2 当音讯成为死信后,假如该行列绑定了死信行列,那么该音讯就会被死信交换机路由到死信行列,假如没有绑定死信行列,那么音讯就会根据音讯过期时刻丢失,就不会被消费

成为死信行列的三种情况

1 行列音讯长度抵达最大约束

2 音讯者拒收音讯,而且不允许重回路由

3 原行列设置过期时刻,音讯抵达超时时刻未被消费