我正在参加「启航方案」

1、MQ导言

1.1 什么是MQ

MQ(Message Quene) : 翻译为音讯行列,经过典型的 出产者顾客模型,出产者不断向音讯行列中出产音讯,顾客不断的从行列中获取音讯。由于音讯的出产和消费都是异步的,而且只关怀音讯的发送和接纳,没有事务逻辑的侵入,轻松的完结体系间解耦。别名为 音讯中间件经过运用高效牢靠的音讯传递机制进行渠道无关的数据交流,并依据数据通信来进行分布式体系的集成。

1.2 MQ有哪些

当今市道上有许多干流的音讯中间件,如老牌的ActiveMQRabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。

1.3 不同MQ特色

# 1.ActiveMQ
        ActiveMQ 是Apache出品,最盛行的,能力强劲的开源音讯总线。它是一个彻底支撑JMS标准的的音讯中间件。丰厚的API,多种集群架构方式让ActiveMQ在业界成为老牌的音讯中间件,在中小型企业颇受欢迎!
​
# 2.Kafka
        Kafka是LinkedIn开源的分布式发布-订阅音讯体系,现在归属于Apache顶级项目。Kafka首要特色是依据Pull的方式来处理音讯消费,
        寻求高吞吐量,一开始的意图便是用于日志搜集和传输。0.8版别开始支撑仿制,不支撑事务,对音讯的重复、丢掉、过错没有严格要求,
        合适产生大量数据的互联网服务的数据搜集事务。
​
# 3.RocketMQ
        RocketMQ是阿里开源的音讯中间件,它是纯Java开发,具有高吞吐量、高可用性、合适大规模分布式体系应用的特色。RocketMQ思路起
        源于Kafka,但并不是Kafka的一个Copy,它对音讯的牢靠传输及事务性做了优化,现在在阿里集团被广泛应用于交易、充值、流计算、消
        息推送、日志流式处理、binglog分发等场景。
​
# 4.RabbitMQ
        RabbitMQ是运用Erlang言语开发的开源音讯行列体系,依据AMQP协议来完结。AMQP的首要特征是面向音讯、行列、路由(包括点对点和
        发布/订阅)、牢靠性、安全。AMQP协议更多用在企业体系内对数据一致性、稳定性和牢靠性要求很高的场景,对功能和吞吐量的要求还在
        其次。
        

RabbitMQ比Kafka牢靠,Kafka更合适IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),牢靠性(少量丢数据)要求稍低的场景运用,比方ELK日志搜集。

2、RabbitMQ导言

2.1 RabbitMQ

依据AMQP协议,erlang言语开发,是布置最广泛的开源音讯中间件,是最受欢迎的开源音讯中间件之一。

RabbitMQ基础复习

官网: www.rabbitmq.com/

官方教程: www.rabbitmq.com/#getstarted

AMQP协议:

AMQP(advanced message queuing protocol)`在2003年时被提出,最早用于解决金融领不同渠道之间的音讯传递交互问题。望文生义,AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的实质不同,AMQP不从API层进行限制,而是直接界说网络交换的数据格式。这使得完结了AMQP的provider天然性便是跨渠道的。以下是AMQP协议模型:

RabbitMQ基础复习

2.2 RabbitMQ装置

这个曾经写过,不重复介绍了 codeleader.blog.csdn.net/article/det…

3、RabbitMQ配置

3.1 RabbitMQ指令行

# 1.服务发动相关
    systemctl start|restart|stop|status rabbitmq-server
​
# 2.办理指令行  用来在不运用web办理界面状况下指令操作RabbitMQ
    rabbitmqctl  help  可以检查更多指令
​
# 3.插件办理指令行
    rabbitmq-plugins enable|list|disable 

3.2 Web办理界面

3.2.1 overview概览

RabbitMQ基础复习

  • connections:不管出产者还是顾客,都需求与RabbitMQ树立衔接后才可以完结音讯的出产和消费,在这儿可以检查衔接状况
  • channels:通道,树立衔接后,会构成通道,音讯的投递获取依赖通道。
  • Exchanges:交换机,用来完结音讯的路由
  • Queues:行列,即音讯行列,音讯存放在行列中,等候消费,消费后被移除行列。

3.2.2 Admin用户和虚拟主机办理

1、增加用户

RabbitMQ基础复习

上面的Tags选项,其实是指定用户的人物,可选的有以下几个:

  • 超级办理员(administrator)

    可登陆办理控制台,可检查一切的信息,而且可以对用户,战略(policy)进行操作。

  • 监控者(monitoring)

    可登陆办理控制台,同时可以检查rabbitmq节点的相关信息(进程数,内存运用状况,磁盘运用状况等)

  • 战略制定者(policymaker)

    可登陆办理控制台, 同时可以对policy进行办理。但无法检查节点的相关信息(上图红框标识的部分)。

  • 普通办理者(management)

    仅可登陆办理控制台,无法看到节点信息,也无法对战略进行办理。

  • 其他

    无法登陆办理控制台,一般便是普通的出产者和顾客。

    2、创立虚拟主机

    RabbitMQ基础复习

默许创立虚拟主机之后,没有用户可以运用,需求绑定用户

3、绑定虚拟主机和用户

创立好虚拟主机,咱们还要给用户增加拜访权限:

点击增加好的虚拟主机:

RabbitMQ基础复习

这儿给admin和ems都授权,授权之后就能在页面上看到了,如下图

RabbitMQ基础复习

4、RabbitMQ常用音讯模型测验

4.1 RabbitMQ支撑的音讯模型

RabbitMQ基础复习

RabbitMQ基础复习

4.2 引进依赖

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.16.0</version>
</dependency>

4.3 第一种模型:直连

RabbitMQ基础复习

在上图的模型中,有以下概念:

  • P:出产者,也便是要发送音讯的程序
  • C:顾客:音讯的承受者,会一向等候音讯到来。
  • queue:音讯行列,图中赤色部分。类似一个邮箱,可以缓存音讯;出产者向其间投递音讯,顾客从其间取出音讯。

4.3.1 自界说衔接东西类

ublic class RabbitUtils {
  //创立衔接MQ的衔接工厂  重量级资源
  public static ConnectionFactory connectionFactory=new ConnectionFactory();
  static {  //类加载履行  只履行一次
    //设置衔接rabbitmq主机
    connectionFactory.setHost("ip");
    //设置端口号
    connectionFactory.setPort(5672);
    //设置衔接哪个虚拟主机
    connectionFactory.setVirtualHost("/ems");
    //设置拜访虚拟主机的用户名和暗码
    connectionFactory.setUsername("ems");
    connectionFactory.setPassword("暗码");
   }
​
  //界说提供衔接目标的方法
  public static Connection getConnection() {
    Connection connection = null;
    try {
​
      //获取衔接目标
      connection = connectionFactory.newConnection();
     } catch (IOException | TimeoutException e) {
      e.printStackTrace();
     }
    return connection;
   }
​
  //封闭通道和封闭衔接的方法
  public static void closeConnectionAndChannel(Channel channel, Connection connection) {
    try {
      if (channel != null) {
        channel.close();
       }
      if (connection != null) {
        connection.close();
       }
     } catch (IOException | TimeoutException e) {
      e.printStackTrace();
     }
   }
}

4.3.2 出产者

public class Provider {
​
  //出产音讯  HelloWorld:直连方式public static void main(String[] args) throws IOException {
 
    Connection connection= RabbitUtils.getConnection();
    //获取衔接中的通道
    Channel channel = connection.createChannel();
    //通道绑定对应的音讯行列
    //参数1:行列称号,假如不存在,主动创立。
    //参数2:界说行列特性是否耐久化 true :耐久化,false:不耐久化
    //参数3:是否独占行列
    //参数4:是否在消费完结后主动删去行列 true:主动删去,false:不主动删去
    //参数5:额定附加参数
    channel.queueDeclare("hello",false,false,false,null);
    //发布音讯
    //交换机称号,行列称号,传递音讯的额定设置,音讯的具体内容
    channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());
    RabbitUtils.closeConnectionAndChannel(channel,connection);
   }
}

这儿不指定交换机称号,用的便是默许交换机。

4.3.3 顾客

public class Consumer {
​
  public static void main(String[] args) throws IOException, TimeoutException {
    //调用自界说东西类
    Connection connection= RabbitUtils.getConnection();
​
    //创立通道
    Channel channel = connection.createChannel();
    //通道绑定目标
    channel.queueDeclare("hello",true,false,true,null);
​
    //消费音讯
    //参数1:消费哪个行列的音讯
    //参数2:敞开音讯的主动承认机制
    //参数3:消费音讯时的回调接口
    channel.basicConsume("hello",true,new DefaultConsumer(channel){
      //body:音讯行列中取出的音讯
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("=============="+new String(body));
       }
     });
​
    //调用东西类
    RabbitUtils.closeConnectionAndChannel(channel,connection);
​
   }
}

发动出产者:

RabbitMQ基础复习

发动顾客:

RabbitMQ基础复习

可以看到,音讯现已收到了

4.4 第二种模型:Work Queue

Work queues,也被称为(Task queues`),使命模型。当音讯处理比较耗时的时分,或许出产音讯的速度会远远大于音讯的消费速度。久而久之,音讯就会堆积越来越多,无法及时处理。此时就可以运用work 模型:让多个顾客绑定到一个行列,共同消费行列中的音讯。行列中的音讯一旦消费,就会消失,因而使命是不会被重复履行的。

RabbitMQ基础复习

人物:

  • P:出产者:使命的发布者
  • C1:顾客-1,收取使命而且完结使命,假定完结速度较慢
  • C2:顾客-2:收取使命并完结使命,假定完结速度快

4.4.1 出产者:

public class Provider {
  public static void main(String[] args) throws IOException {
    //获取衔接目标
    Connection connection = RabbitUtils.getConnection();
    Channel channel = connection.createChannel();
    //经过通道声明行列
    channel.queueDeclare("work",true,false,false,null);
    //出产音讯
    for (int i = 1; i <=20 ; i++) {
      channel.basicPublish("","work", null,(i+" hello work queue").getBytes());
     }
​
    //封闭资源
    RabbitUtils.closeConnectionAndChannel(channel,connection);
   }
}

4.4.2 顾客

顾客1:

//轮询分发测验
public class Consumer1 {
  public static void main(String[] args) throws IOException {
    Connection connection = RabbitUtils.getConnection();
    Channel channel = connection.createChannel();
​
    channel.queueDeclare("work",true,false,false,null);
​
    channel.basicConsume("work",true,new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("顾客-1:"+new String(body));
        try {
          Thread.sleep(1000);
         } catch (InterruptedException e) {
          e.printStackTrace();
         }
       }
     });
   }
}

顾客2:

public class Consumer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        //参数1:行列称号,参数2:音讯主动承认 true:顾客主动向rabbitmq承认音讯消费了,false:不会主动承认音讯
        channel.basicConsume("work",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("顾客-2:"+new String(body));
            }
        });
    }
}

先发动两个顾客

RabbitMQ基础复习

再发动出产者:

RabbitMQ基础复习

RabbitMQ基础复习

总结:默许状况下,RabbitMQ将按次序将每个音讯发送给下一个运用者。平均而言,每个顾客都会收到相同数量的音讯。这种分发音讯的方法称为循环。

可以看到,默许是轮询分发的,但是这样子不好,咱们的顾客1运用线程休眠了1s处理的很慢仍然和顾客2五五开。

咱们想要的结果是能者多劳,也便是处理速度快的就尽量多处理几条音讯。

改善如下:

  • 设置一次只承受一条未承认的音讯
  • 封闭音讯主动承认,改为手动承认

4.4.3 改善为能者多劳

出产者不动,改动顾客

顾客1:

//能者多劳测验
public class Consumer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.basicQos(1);//每次只能消费一个音讯
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("顾客-1:"+new String(body));
                //手动承认
                //参数1:手动承认音讯标识, 参数2:false 每次承认一个
                channel.basicAck(envelope.getDeliveryTag(),false);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

顾客2:

public class Consumer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.basicQos(1);//每次只能消费一个音讯
        channel.queueDeclare("work",true,false,false,null);
        //参数1:行列称号,参数2:音讯主动承认 true:顾客主动向rabbitmq承认音讯消费了,false:不会主动承认音讯
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("顾客-2:"+new String(body));
                //手动承认
                //参数1:手动承认音讯标识, 参数2:false 每次承认一个
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

顾客2:

RabbitMQ基础复习

顾客2:

RabbitMQ基础复习

可以看到,达到了能者多劳的效果

4.5 第三种模型:Fanout

fanout 扇出 也称为播送

RabbitMQ基础复习

RabbitMQ基础复习

在播送方式下,音讯发送流程是这样的:

  • 可以有多个顾客
  • 每个顾客有自己的queue(行列)
  • 每个行列都要绑定到Exchange(交换机)
  • 出产者发送的音讯,只能发送到交换机,交换机来决定要发给哪个行列,出产者无法决定。
  • 交换机把音讯发送给绑定过的一切行列
  • 行列的顾客都能拿到音讯。完结一条音讯被多个顾客消费

4.5.1 出产者

public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        //将通道声明指定的交换机
        //参数1:交换机称号,参数2:交换机类型,fanout:播送类型
        channel.exchangeDeclare("logs","fanout");
        //发送音讯  fanout中的routingkey没啥效果
        channel.basicPublish("logs","",null,"fanout type message".getBytes());
        RabbitUtils.closeConnectionAndChannel(channel,connection);
    }

4.5.2 开发3个顾客

顾客1:

public class Consumer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare("logs","fanout");
        //暂时行列
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和行列
        channel.queueBind(queueName,"logs","");
        //消费音讯
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("顾客1:"+new String(body));
            }
        });
    }
}

顾客2:

public class Consumer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare("logs","fanout");
        //暂时行列
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和行列
        channel.queueBind(queueName,"logs","");
        //消费音讯
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("顾客2:"+new String(body));
            }
        });
    }
}
public class Consumer3 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare("logs","fanout");
        //暂时行列
        String queueName = channel.queueDeclare().getQueue();
        //绑定交换机和行列
        channel.queueBind(queueName,"logs","");
        //消费音讯
        channel.basicConsume(queueName,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("顾客3:"+new String(body));
            }
        });
    }
}

先发动3个顾客

发动出产者之后调查3个顾客是否都接纳到了音讯:

RabbitMQ基础复习

RabbitMQ基础复习

RabbitMQ基础复习

调查是否创立了对应的交换机:

RabbitMQ基础复习

4.6 第四种模型:Routing

其实Routing和Topics很像,一个是写死了RoutingKey,另一个运用了通配符。

在Fanout方式中,一条音讯,会被一切订阅的行列都消费。但是,在某些场景下,咱们期望不同的音讯被不同的行列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 行列与交换机的绑定,不能是恣意绑定了,而是要指定一个RoutingKey(路由key)
  • 音讯的发送方在 向 Exchange发送音讯时,也有必要指定音讯的 RoutingKey
  • Exchange不再把音讯交给每一个绑定的行列,而是依据音讯的Routing Key进行判断,只要行列的Routingkey与音讯的 Routing key彻底一致,才会接纳到音讯

RabbitMQ基础复习

图解:

  • P:出产者,向Exchange发送音讯,发送音讯时,会指定一个routing key。
  • X:Exchange(交换机),接纳出产者的音讯,然后把音讯递交给 与routing key彻底匹配的行列
  • C1:顾客,其地点行列指定了需求routing key 为 error 的音讯
  • C2:顾客,其地点行列指定了需求routing key 为 info、error、warning 的音讯

4.6.1 出产者

public class Provider {
    public static final String EXCHANGE_NAME="logs_direct";
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机  参数1:交换机称号,参数2:direct 路由方式
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //发送音讯
//        String routingKey="info";
//        String routingKey="error";
        String routingKey="warning";
//        String routingKey="trade";
        channel.basicPublish(EXCHANGE_NAME,routingKey,null,("这是direct模型发布的依据routingKey:["+routingKey+"]").getBytes());
        //封闭资源
        RabbitUtils.closeConnectionAndChannel(channel,connection);
    }
}

4.6.2 顾客1

public class Consumer1 {
    public static final String EXCHANGE_NAME="logs_direct";
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        //通道声明交换机以及交换机的类型
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //创立一个暂时行列
        String queue = channel.queueDeclare().getQueue();
        //依据routingKey去绑定行列和交换机
        channel.queueBind(queue,EXCHANGE_NAME,"error");
        //消费音讯
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("顾客1:"+new String(body));
            }
        });
    }
}

4.6.3 顾客2

public class Consumer2 {
    public static final String EXCHANGE_NAME="logs_direct";
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        //通道声明交换机以及交换机的类型
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //创立一个暂时行列
        String queue = channel.queueDeclare().getQueue();
        //依据routingKey去绑定行列和交换机
        channel.queueBind(queue,EXCHANGE_NAME,"info");
        channel.queueBind(queue,EXCHANGE_NAME,"error");
        channel.queueBind(queue,EXCHANGE_NAME,"warning");
        //消费音讯
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("顾客2:"+new String(body));
            }
        });
    }
}

先发动两个顾客:

RabbitMQ基础复习

发动出产者之后调查顾客是否收到了音讯:

RabbitMQ基础复习

RabbitMQ基础复习

可以看到,顾客1没有收到音讯,由于咱们出产者的routintKey为warning,而顾客1行列的routingKey是error,顾客2行列的routingKey是warning

所以只要顾客2可以接纳到音讯,只要行列的Routingkey与音讯的 Routing key彻底一致,才会接纳到音讯

4.7 第五种模型:Topics

Topic类型的ExchangeDirect相比,都是可以依据RoutingKey把音讯路由到不同的行列。只不过Topic类型Exchange可以让行列在绑定Routing key 的时分运用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

RabbitMQ基础复习

# 统配符
		* (star) can substitute for exactly one word.    匹配不多不少刚好1个词
		# (hash) can substitute for zero or more words.  匹配一个或多个词
# 如:
	audit.#    匹配audit.irs.corporate或许 audit.irs 等
    audit.*   只能匹配 audit.irs

4.7.1 出产者

public class Provider {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机以及交换机类型 topic
        channel.exchangeDeclare("topics", BuiltinExchangeType.TOPIC);
        //发布音讯
        String routingKey="user.save";
//        String routingKey="user.save.findAll";
    //    String routingKey="user";
        channel.basicPublish("topics",routingKey,null,("这儿是topic动态路由模型,routingKey:["+routingKey+"]").getBytes());
        //封闭资源
        RabbitUtils.closeConnectionAndChannel(channel,connection);
    }
}

4.7.2 顾客1:

public class Cosumer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机以及交换机类型
        channel.exchangeDeclare("topics", BuiltinExchangeType.TOPIC);
        //创立暂时行列
        String queue = channel.queueDeclare().getQueue();
        //创立行列和交换机,动态通配符方式 routingKey
        channel.queueBind(queue,"topics","user.*");
        //消费音讯
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("顾客1:"+new String(body));
            }
        });
    }
}

4.7.3 顾客2:

public class Cosumer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机以及交换机类型
        channel.exchangeDeclare("topics", BuiltinExchangeType.TOPIC);
        //创立暂时行列
        String queue = channel.queueDeclare().getQueue();
        //创立行列和交换机,动态通配符方式 routingKey
        channel.queueBind(queue,"topics","user.#");
        //消费音讯
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("顾客2:"+new String(body));
            }
        });
    }
}

咱们注意到顾客1的routingKey为user.*,顾客2的routingKey为user.#

发动两个顾客,再发动出产者

RabbitMQ基础复习

RabbitMQ基础复习

此时都收到了音讯,是由于两个规矩都能匹配到。

咱们现在将出产者交换机的routingKey改为user.save.findAll,发动出产者,调查结果:

RabbitMQ基础复习

RabbitMQ基础复习

是由于顾客2中行列的routingKey为user.#,user后边可以匹配一个或许多个,而顾客1中行列的routingKey为user.*,user后边只能匹配一个词,所以收不到音讯。

就先介绍到这儿,后边的RPC暂时不搞了,至于Publisher Confirms看我专栏曾经的文章,这几种方式足够敷衍绝大多数的事务场景了。

5、RabbitMQ与SpringBoot整合

真正写代码的时分都是与现有结构进行集成,很少用上面那种原生的写法。

5.0 树立环境

5.0.1 引进依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

5.0.2 配置文件

spring:
  application:
    name: rabbitmq-springboot
  rabbitmq:
    host: 你的ip
    port: 5672
    username: 用户名
    password: 暗码
    virtual-host: /ems  # 虚拟主机

5.1 第一种:HelloWorld模型

出产者:

//注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
//hello world
@Test
public void testHelloWorld(){
    rabbitTemplate.convertAndSend("hello","hello world");
}

顾客:

@Component
@RabbitListener(queuesToDeclare = @Queue(value = "hello"))
public class HelloCustomer {
    @RabbitHandler
    public void receive(String message){
        System.out.println("message="+message);
    }
}

由于@RabbitListener注解会一向监听音讯,所以这儿不必像上面一样分别发动顾客和出产者了。

咱们直接发动出产者:

RabbitMQ基础复习

RabbitMQ基础复习

办理界面却是存在hello行列,控制台也看到音讯现已被顾客接纳。

5.2 第二种:Work Queue

出产者:

//work queues
@Test
public void testWorkQueue(){
    for (int i = 0; i < 10; i++) {
        rabbitTemplate.convertAndSend("work","work模型"+i);
    }
}

顾客:

@Component
public class WorkCustomer {
    //顾客1
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive(String message){
        System.out.println("message1="+message);
    }
    //顾客2
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive1(String message){
        System.out.println("message2="+message);
    }
}

这儿我树立两个顾客来测验轮询分发方式

RabbitMQ基础复习

RabbitMQ基础复习

办理界面中看到了绑定的work行列,控制台也看到了音讯现已被两个顾客接纳。

5.3 第三种:Fanout

出产者:

//fanout 播送
@Test
public void testFanout(){
    //这种方式的routingKey没啥效果
    rabbitTemplate.convertAndSend("logs","","Fanout的模型发送的音讯");
}

界说一个交换机:logs

顾客:

@Component
public class FanoutCustomer {
    @RabbitListener(bindings = {
           @QueueBinding(
                   value = @Queue,//不写value代表暂时行列
                   exchange =@Exchange(value = "logs",type ="fanout")   //绑定的叫喊及
           )
    })
    public void receive1(String message){
        System.out.println("message1= "+message);
    }
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,//不写value代表暂时行列
                    exchange =@Exchange(value = "logs",type ="fanout")   //绑定的交换机
            )
    })
    public void receive2(String message){
        System.out.println("message2= "+message);
    }
}

这儿两个暂时行列都与logs交换机进行绑定,所以咱们出产者将音讯发送到logs交换机上面之后,两个顾客都能接纳到音讯。

RabbitMQ基础复习

RabbitMQ基础复习

5.4 第四种:Routing

出产者:

 //routing 路由方式
    @Test
    public void testRoute(){
//        rabbitTemplate.convertAndSend("directs","info","发送info的key的路由信息");
        rabbitTemplate.convertAndSend("directs","error","发送info的key的路由信息");
    }

顾客:

@Component
public class RouteCustomer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, //创立暂时行列
                    exchange = @Exchange(value = "directs",type = "direct"),//指定交换机
                    key = {"info","error","warn"}
            )
    })
    public void receive1(String message){
        System.out.println("message1= "+message);
    }
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, //创立暂时行列
                    exchange = @Exchange(value = "directs",type = "direct"),//指定交换机
                    key = {"error"}
            )
    })
    public void receive2(String message){
        System.out.println("message2= "+message);
    }
}

当routingKey=error的时分,两个顾客都可以接纳到:

RabbitMQ基础复习

咱们现在将routingKey改为info,再次发送:

rabbitTemplate.convertAndSend("directs","info","发送info的key的路由信息");

RabbitMQ基础复习

可以看到,只要顾客1接纳到了音讯,由于只要顾客1的行列和交换机进行绑定的routingKey"info","error","warn",包括了info,而顾客2中行列和交换机绑定的routingKeyerror,所以顾客2接纳不到这条音讯。

5.4 第五种:Topics

也叫动态路由模型,便是在第四种模型的根底之上加了通配符罢了。

出产者:

  //topic 动态路由  订阅方式
    @Test
    public void testTopic(){
        rabbitTemplate.convertAndSend("topics","user.save","user.save 路由音讯");
//        rabbitTemplate.convertAndSend("topics","order","user.save 路由音讯");
//        rabbitTemplate.convertAndSend("topics","product.save.add","product.save.add 路由音讯");
    }

顾客:

@Component
public class TopicCustomer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(value = "topics",type = "topic"),
                    key = {"user.save","user.*"}
            )
    })
    public void reveive1(String message){
        System.out.println("message1 = "+message);
    }
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(value = "topics",type = "topic"),
                    key = {"order.#","product.#","user.*"}
            )
    })
    public void reveive2(String message){
        System.out.println("message2 = "+message);
    }
}

此时,出产者中的界说的routingKeyuser.save,而顾客1有user.save和user.*,顾客2有:user.*,所以两个都能接纳到音讯:

RabbitMQ基础复习

RabbitMQ基础复习

控制台看到两个顾客都输出了音讯,办理界面中 也看到了新建的交换机。

现在修改出产者音讯的routingKey如下:

rabbitTemplate.convertAndSend("topics","order","user.save 路由音讯");

RabbitMQ基础复习

可以看到,只要顾客2接纳到了音讯,这是由于顾客中的routingKey包括"order.#",#代表有一个或许多个单词,所以匹配到。

将出产者代码修改如下:

rabbitTemplate.convertAndSend("topics","product.save.add","product.save.add 路由音讯");

RabbitMQ基础复习

顾客2中有routingKeyproduct.#,所以可以接纳到。