我正在参加「启航方案」
1、MQ导言
1.1 什么是MQ
MQ
(Message Quene) : 翻译为音讯行列
,经过典型的 出产者
和顾客
模型,出产者不断向音讯行列中出产音讯,顾客不断的从行列中获取音讯。由于音讯的出产和消费都是异步的,而且只关怀音讯的发送和接纳,没有事务逻辑的侵入,轻松的完结体系间解耦。别名为 音讯中间件
经过运用高效牢靠的音讯传递机制进行渠道无关的数据交流,并依据数据通信来进行分布式体系的集成。
1.2 MQ有哪些
当今市道上有许多干流的音讯中间件,如老牌的ActiveMQ
、RabbitMQ
,炙手可热的Kafka
,阿里巴巴自主开发RocketMQ
等。
1.3 不同MQ特色
# 1.ActiveMQ
ActiveMQ 是Apache出品,最盛行的,能力强劲的开源音讯总线。它是一个彻底支撑JMS标准的的音讯中间件。丰厚的API,多种集群架构方式让ActiveMQ在业界成为老牌的音讯中间件,在中小型企业颇受欢迎!
# 2.Kafka
Kafka是LinkedIn开源的分布式发布-订阅音讯体系,现在归属于Apache顶级项目。Kafka首要特色是依据Pull的方式来处理音讯消费,
寻求高吞吐量,一开始的意图便是用于日志搜集和传输。0.8版别开始支撑仿制,不支撑事务,对音讯的重复、丢掉、过错没有严格要求,
合适产生大量数据的互联网服务的数据搜集事务。
# 3.RocketMQ
RocketMQ是阿里开源的音讯中间件,它是纯Java开发,具有高吞吐量、高可用性、合适大规模分布式体系应用的特色。RocketMQ思路起
源于Kafka,但并不是Kafka的一个Copy,它对音讯的牢靠传输及事务性做了优化,现在在阿里集团被广泛应用于交易、充值、流计算、消
息推送、日志流式处理、binglog分发等场景。
# 4.RabbitMQ
RabbitMQ是运用Erlang言语开发的开源音讯行列体系,依据AMQP协议来完结。AMQP的首要特征是面向音讯、行列、路由(包括点对点和
发布/订阅)、牢靠性、安全。AMQP协议更多用在企业体系内对数据一致性、稳定性和牢靠性要求很高的场景,对功能和吞吐量的要求还在
其次。
RabbitMQ比Kafka牢靠,Kafka更合适IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),牢靠性(少量丢数据)要求稍低的场景运用,比方ELK日志搜集。
2、RabbitMQ导言
2.1 RabbitMQ
依据
AMQP
协议,erlang言语开发,是布置最广泛的开源音讯中间件,是最受欢迎的开源音讯中间件之一。
官网
: www.rabbitmq.com/
官方教程
: www.rabbitmq.com/#getstarted
AMQP协议:
AMQP(advanced message queuing protocol)`在2003年时被提出,最早用于解决金融领不同渠道之间的音讯传递交互问题。望文生义,AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的实质不同,AMQP不从API层进行限制,而是直接界说网络交换的数据格式。这使得完结了AMQP的provider天然性便是跨渠道的。以下是AMQP协议模型:
2.2 RabbitMQ装置
这个曾经写过,不重复介绍了 codeleader.blog.csdn.net/article/det…
3、RabbitMQ配置
3.1 RabbitMQ指令行
# 1.服务发动相关
systemctl start|restart|stop|status rabbitmq-server
# 2.办理指令行 用来在不运用web办理界面状况下指令操作RabbitMQ
rabbitmqctl help 可以检查更多指令
# 3.插件办理指令行
rabbitmq-plugins enable|list|disable
3.2 Web办理界面
3.2.1 overview概览
connections:不管出产者还是顾客,都需求与RabbitMQ树立衔接后才可以完结音讯的出产和消费,在这儿可以检查衔接状况
channels:通道,树立衔接后,会构成通道,音讯的投递获取依赖通道。
Exchanges:交换机,用来完结音讯的路由
Queues:行列,即音讯行列,音讯存放在行列中,等候消费,消费后被移除行列。
3.2.2 Admin用户和虚拟主机办理
1、增加用户
上面的Tags选项,其实是指定用户的人物,可选的有以下几个:
-
超级办理员(administrator)
可登陆办理控制台,可检查一切的信息,而且可以对用户,战略(policy)进行操作。
-
监控者(monitoring)
可登陆办理控制台,同时可以检查rabbitmq节点的相关信息(进程数,内存运用状况,磁盘运用状况等)
-
战略制定者(policymaker)
可登陆办理控制台, 同时可以对policy进行办理。但无法检查节点的相关信息(上图红框标识的部分)。
-
普通办理者(management)
仅可登陆办理控制台,无法看到节点信息,也无法对战略进行办理。
-
其他
无法登陆办理控制台,一般便是普通的出产者和顾客。
2、创立虚拟主机
默许创立虚拟主机之后,没有用户可以运用,需求绑定用户
3、绑定虚拟主机和用户
创立好虚拟主机,咱们还要给用户增加拜访权限:
点击增加好的虚拟主机:
这儿给admin和ems都授权,授权之后就能在页面上看到了,如下图
4、RabbitMQ常用音讯模型测验
4.1 RabbitMQ支撑的音讯模型
4.2 引进依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
4.3 第一种模型:直连
在上图的模型中,有以下概念:
- P:出产者,也便是要发送音讯的程序
- C:顾客:音讯的承受者,会一向等候音讯到来。
- queue:音讯行列,图中赤色部分。类似一个邮箱,可以缓存音讯;出产者向其间投递音讯,顾客从其间取出音讯。
4.3.1 自界说衔接东西类
ublic class RabbitUtils {
//创立衔接MQ的衔接工厂 重量级资源
public static ConnectionFactory connectionFactory=new ConnectionFactory();
static { //类加载履行 只履行一次
//设置衔接rabbitmq主机
connectionFactory.setHost("ip");
//设置端口号
connectionFactory.setPort(5672);
//设置衔接哪个虚拟主机
connectionFactory.setVirtualHost("/ems");
//设置拜访虚拟主机的用户名和暗码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("暗码");
}
//界说提供衔接目标的方法
public static Connection getConnection() {
Connection connection = null;
try {
//获取衔接目标
connection = connectionFactory.newConnection();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
return connection;
}
//封闭通道和封闭衔接的方法
public static void closeConnectionAndChannel(Channel channel, Connection connection) {
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
4.3.2 出产者
public class Provider {
//出产音讯 HelloWorld:直连方式
public static void main(String[] args) throws IOException {
Connection connection= RabbitUtils.getConnection();
//获取衔接中的通道
Channel channel = connection.createChannel();
//通道绑定对应的音讯行列
//参数1:行列称号,假如不存在,主动创立。
//参数2:界说行列特性是否耐久化 true :耐久化,false:不耐久化
//参数3:是否独占行列
//参数4:是否在消费完结后主动删去行列 true:主动删去,false:不主动删去
//参数5:额定附加参数
channel.queueDeclare("hello",false,false,false,null);
//发布音讯
//交换机称号,行列称号,传递音讯的额定设置,音讯的具体内容
channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());
RabbitUtils.closeConnectionAndChannel(channel,connection);
}
}
这儿不指定交换机称号,用的便是默许交换机。
4.3.3 顾客
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//调用自界说东西类
Connection connection= RabbitUtils.getConnection();
//创立通道
Channel channel = connection.createChannel();
//通道绑定目标
channel.queueDeclare("hello",true,false,true,null);
//消费音讯
//参数1:消费哪个行列的音讯
//参数2:敞开音讯的主动承认机制
//参数3:消费音讯时的回调接口
channel.basicConsume("hello",true,new DefaultConsumer(channel){
//body:音讯行列中取出的音讯
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("=============="+new String(body));
}
});
//调用东西类
RabbitUtils.closeConnectionAndChannel(channel,connection);
}
}
发动出产者:
发动顾客:
可以看到,音讯现已收到了
4.4 第二种模型:Work Queue
Work queues,也被称为(
Task queues`),使命模型。当音讯处理比较耗时的时分,或许出产音讯的速度会远远大于音讯的消费速度。久而久之,音讯就会堆积越来越多,无法及时处理。此时就可以运用work 模型:让多个顾客绑定到一个行列,共同消费行列中的音讯。行列中的音讯一旦消费,就会消失,因而使命是不会被重复履行的。
人物:
- P:出产者:使命的发布者
- C1:顾客-1,收取使命而且完结使命,假定完结速度较慢
- C2:顾客-2:收取使命并完结使命,假定完结速度快
4.4.1 出产者:
public class Provider {
public static void main(String[] args) throws IOException {
//获取衔接目标
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
//经过通道声明行列
channel.queueDeclare("work",true,false,false,null);
//出产音讯
for (int i = 1; i <=20 ; i++) {
channel.basicPublish("","work", null,(i+" hello work queue").getBytes());
}
//封闭资源
RabbitUtils.closeConnectionAndChannel(channel,connection);
}
}
4.4.2 顾客
顾客1:
//轮询分发测验
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("顾客-1:"+new String(body));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
顾客2:
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
//参数1:行列称号,参数2:音讯主动承认 true:顾客主动向rabbitmq承认音讯消费了,false:不会主动承认音讯
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("顾客-2:"+new String(body));
}
});
}
}
先发动两个顾客
再发动出产者:
总结:默许状况下,RabbitMQ将按次序将每个音讯发送给下一个运用者。平均而言,每个顾客都会收到相同数量的音讯。这种分发音讯的方法称为循环。
可以看到,默许是轮询分发的,但是这样子不好,咱们的顾客1运用线程休眠了1s处理的很慢仍然和顾客2五五开。
咱们想要的结果是能者多劳,也便是处理速度快的就尽量多处理几条音讯。
改善如下:
- 设置一次只承受一条未承认的音讯
- 封闭音讯主动承认,改为手动承认
4.4.3 改善为能者多劳
出产者不动,改动顾客
顾客1:
//能者多劳测验
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);//每次只能消费一个音讯
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("顾客-1:"+new String(body));
//手动承认
//参数1:手动承认音讯标识, 参数2:false 每次承认一个
channel.basicAck(envelope.getDeliveryTag(),false);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
顾客2:
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);//每次只能消费一个音讯
channel.queueDeclare("work",true,false,false,null);
//参数1:行列称号,参数2:音讯主动承认 true:顾客主动向rabbitmq承认音讯消费了,false:不会主动承认音讯
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("顾客-2:"+new String(body));
//手动承认
//参数1:手动承认音讯标识, 参数2:false 每次承认一个
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
顾客2:
顾客2:
可以看到,达到了能者多劳的效果
4.5 第三种模型:Fanout
fanout 扇出 也称为播送
在播送方式下,音讯发送流程是这样的:
- 可以有多个顾客
- 每个顾客有自己的queue(行列)
- 每个行列都要绑定到Exchange(交换机)
- 出产者发送的音讯,只能发送到交换机,交换机来决定要发给哪个行列,出产者无法决定。
- 交换机把音讯发送给绑定过的一切行列
- 行列的顾客都能拿到音讯。完结一条音讯被多个顾客消费
4.5.1 出产者
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
//将通道声明指定的交换机
//参数1:交换机称号,参数2:交换机类型,fanout:播送类型
channel.exchangeDeclare("logs","fanout");
//发送音讯 fanout中的routingkey没啥效果
channel.basicPublish("logs","",null,"fanout type message".getBytes());
RabbitUtils.closeConnectionAndChannel(channel,connection);
}
4.5.2 开发3个顾客
顾客1:
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare("logs","fanout");
//暂时行列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和行列
channel.queueBind(queueName,"logs","");
//消费音讯
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("顾客1:"+new String(body));
}
});
}
}
顾客2:
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare("logs","fanout");
//暂时行列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和行列
channel.queueBind(queueName,"logs","");
//消费音讯
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("顾客2:"+new String(body));
}
});
}
}
public class Consumer3 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare("logs","fanout");
//暂时行列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和行列
channel.queueBind(queueName,"logs","");
//消费音讯
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("顾客3:"+new String(body));
}
});
}
}
先发动3个顾客
发动出产者之后调查3个顾客是否都接纳到了音讯:
调查是否创立了对应的交换机:
4.6 第四种模型:Routing
其实Routing和Topics很像,一个是写死了RoutingKey,另一个运用了通配符。
在Fanout方式中,一条音讯,会被一切订阅的行列都消费。但是,在某些场景下,咱们期望不同的音讯被不同的行列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 行列与交换机的绑定,不能是恣意绑定了,而是要指定一个
RoutingKey
(路由key) - 音讯的发送方在 向 Exchange发送音讯时,也有必要指定音讯的
RoutingKey
。 - Exchange不再把音讯交给每一个绑定的行列,而是依据音讯的
Routing Key
进行判断,只要行列的Routingkey
与音讯的Routing key
彻底一致,才会接纳到音讯
图解:
- P:出产者,向Exchange发送音讯,发送音讯时,会指定一个routing key。
- X:Exchange(交换机),接纳出产者的音讯,然后把音讯递交给 与routing key彻底匹配的行列
- C1:顾客,其地点行列指定了需求routing key 为 error 的音讯
- C2:顾客,其地点行列指定了需求routing key 为 info、error、warning 的音讯
4.6.1 出产者
public class Provider {
public static final String EXCHANGE_NAME="logs_direct";
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
//声明交换机 参数1:交换机称号,参数2:direct 路由方式
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//发送音讯
// String routingKey="info";
// String routingKey="error";
String routingKey="warning";
// String routingKey="trade";
channel.basicPublish(EXCHANGE_NAME,routingKey,null,("这是direct模型发布的依据routingKey:["+routingKey+"]").getBytes());
//封闭资源
RabbitUtils.closeConnectionAndChannel(channel,connection);
}
}
4.6.2 顾客1
public class Consumer1 {
public static final String EXCHANGE_NAME="logs_direct";
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
//通道声明交换机以及交换机的类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//创立一个暂时行列
String queue = channel.queueDeclare().getQueue();
//依据routingKey去绑定行列和交换机
channel.queueBind(queue,EXCHANGE_NAME,"error");
//消费音讯
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("顾客1:"+new String(body));
}
});
}
}
4.6.3 顾客2
public class Consumer2 {
public static final String EXCHANGE_NAME="logs_direct";
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
//通道声明交换机以及交换机的类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//创立一个暂时行列
String queue = channel.queueDeclare().getQueue();
//依据routingKey去绑定行列和交换机
channel.queueBind(queue,EXCHANGE_NAME,"info");
channel.queueBind(queue,EXCHANGE_NAME,"error");
channel.queueBind(queue,EXCHANGE_NAME,"warning");
//消费音讯
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("顾客2:"+new String(body));
}
});
}
}
先发动两个顾客:
发动出产者之后调查顾客是否收到了音讯:
可以看到,顾客1没有收到音讯,由于咱们出产者的routintKey为warning,而顾客1行列的routingKey是error,顾客2行列的routingKey是warning
所以只要顾客2可以接纳到音讯,只要行列的Routingkey
与音讯的 Routing key
彻底一致,才会接纳到音讯。
4.7 第五种模型:Topics
Topic
类型的Exchange
与Direct
相比,都是可以依据RoutingKey
把音讯路由到不同的行列。只不过Topic
类型Exchange
可以让行列在绑定Routing key
的时分运用通配符!这种模型Routingkey
一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
# 统配符
* (star) can substitute for exactly one word. 匹配不多不少刚好1个词
# (hash) can substitute for zero or more words. 匹配一个或多个词
# 如:
audit.# 匹配audit.irs.corporate或许 audit.irs 等
audit.* 只能匹配 audit.irs
4.7.1 出产者
public class Provider {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
//声明交换机以及交换机类型 topic
channel.exchangeDeclare("topics", BuiltinExchangeType.TOPIC);
//发布音讯
String routingKey="user.save";
// String routingKey="user.save.findAll";
// String routingKey="user";
channel.basicPublish("topics",routingKey,null,("这儿是topic动态路由模型,routingKey:["+routingKey+"]").getBytes());
//封闭资源
RabbitUtils.closeConnectionAndChannel(channel,connection);
}
}
4.7.2 顾客1:
public class Cosumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
//声明交换机以及交换机类型
channel.exchangeDeclare("topics", BuiltinExchangeType.TOPIC);
//创立暂时行列
String queue = channel.queueDeclare().getQueue();
//创立行列和交换机,动态通配符方式 routingKey
channel.queueBind(queue,"topics","user.*");
//消费音讯
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("顾客1:"+new String(body));
}
});
}
}
4.7.3 顾客2:
public class Cosumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
//声明交换机以及交换机类型
channel.exchangeDeclare("topics", BuiltinExchangeType.TOPIC);
//创立暂时行列
String queue = channel.queueDeclare().getQueue();
//创立行列和交换机,动态通配符方式 routingKey
channel.queueBind(queue,"topics","user.#");
//消费音讯
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("顾客2:"+new String(body));
}
});
}
}
咱们注意到顾客1的routingKey为user.*
,顾客2的routingKey为user.#
发动两个顾客,再发动出产者
此时都收到了音讯,是由于两个规矩都能匹配到。
咱们现在将出产者交换机的routingKey
改为user.save.findAll
,发动出产者,调查结果:
是由于顾客2中行列的routingKey为user.#
,user后边可以匹配一个或许多个,而顾客1中行列的routingKey为user.*
,user后边只能匹配一个词,所以收不到音讯。
就先介绍到这儿,后边的RPC暂时不搞了,至于Publisher Confirms看我专栏曾经的文章,这几种方式足够敷衍绝大多数的事务场景了。
5、RabbitMQ与SpringBoot整合
真正写代码的时分都是与现有结构进行集成,很少用上面那种原生的写法。
5.0 树立环境
5.0.1 引进依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
5.0.2 配置文件
spring:
application:
name: rabbitmq-springboot
rabbitmq:
host: 你的ip
port: 5672
username: 用户名
password: 暗码
virtual-host: /ems # 虚拟主机
5.1 第一种:HelloWorld模型
出产者:
//注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
//hello world
@Test
public void testHelloWorld(){
rabbitTemplate.convertAndSend("hello","hello world");
}
顾客:
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "hello"))
public class HelloCustomer {
@RabbitHandler
public void receive(String message){
System.out.println("message="+message);
}
}
由于
@RabbitListener
注解会一向监听音讯,所以这儿不必像上面一样分别发动顾客和出产者了。
咱们直接发动出产者:
办理界面却是存在hello行列,控制台也看到音讯现已被顾客接纳。
5.2 第二种:Work Queue
出产者:
//work queues
@Test
public void testWorkQueue(){
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work","work模型"+i);
}
}
顾客:
@Component
public class WorkCustomer {
//顾客1
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive(String message){
System.out.println("message1="+message);
}
//顾客2
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive1(String message){
System.out.println("message2="+message);
}
}
这儿我树立两个顾客来测验轮询分发方式
办理界面中看到了绑定的work行列,控制台也看到了音讯现已被两个顾客接纳。
5.3 第三种:Fanout
出产者:
//fanout 播送
@Test
public void testFanout(){
//这种方式的routingKey没啥效果
rabbitTemplate.convertAndSend("logs","","Fanout的模型发送的音讯");
}
界说一个交换机:logs
顾客:
@Component
public class FanoutCustomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,//不写value代表暂时行列
exchange =@Exchange(value = "logs",type ="fanout") //绑定的叫喊及
)
})
public void receive1(String message){
System.out.println("message1= "+message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,//不写value代表暂时行列
exchange =@Exchange(value = "logs",type ="fanout") //绑定的交换机
)
})
public void receive2(String message){
System.out.println("message2= "+message);
}
}
这儿两个暂时行列都与logs交换机进行绑定,所以咱们出产者将音讯发送到logs交换机上面之后,两个顾客都能接纳到音讯。
5.4 第四种:Routing
出产者:
//routing 路由方式
@Test
public void testRoute(){
// rabbitTemplate.convertAndSend("directs","info","发送info的key的路由信息");
rabbitTemplate.convertAndSend("directs","error","发送info的key的路由信息");
}
顾客:
@Component
public class RouteCustomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //创立暂时行列
exchange = @Exchange(value = "directs",type = "direct"),//指定交换机
key = {"info","error","warn"}
)
})
public void receive1(String message){
System.out.println("message1= "+message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //创立暂时行列
exchange = @Exchange(value = "directs",type = "direct"),//指定交换机
key = {"error"}
)
})
public void receive2(String message){
System.out.println("message2= "+message);
}
}
当routingKey=error的时分,两个顾客都可以接纳到:
咱们现在将routingKey改为info,再次发送:
rabbitTemplate.convertAndSend("directs","info","发送info的key的路由信息");
可以看到,只要顾客1接纳到了音讯,由于只要顾客1的行列和交换机进行绑定的
routingKey
是"info","error","warn"
,包括了info
,而顾客2中行列和交换机绑定的routingKey
为error
,所以顾客2接纳不到这条音讯。
5.4 第五种:Topics
也叫动态路由模型,便是在第四种模型的根底之上加了通配符罢了。
出产者:
//topic 动态路由 订阅方式
@Test
public void testTopic(){
rabbitTemplate.convertAndSend("topics","user.save","user.save 路由音讯");
// rabbitTemplate.convertAndSend("topics","order","user.save 路由音讯");
// rabbitTemplate.convertAndSend("topics","product.save.add","product.save.add 路由音讯");
}
顾客:
@Component
public class TopicCustomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "topics",type = "topic"),
key = {"user.save","user.*"}
)
})
public void reveive1(String message){
System.out.println("message1 = "+message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "topics",type = "topic"),
key = {"order.#","product.#","user.*"}
)
})
public void reveive2(String message){
System.out.println("message2 = "+message);
}
}
此时,出产者中的界说的routingKey
为user.save
,而顾客1有user.save和user.*
,顾客2有:user.*
,所以两个都能接纳到音讯:
控制台看到两个顾客都输出了音讯,办理界面中 也看到了新建的交换机。
现在修改出产者音讯的routingKey如下:
rabbitTemplate.convertAndSend("topics","order","user.save 路由音讯");
可以看到,只要顾客2接纳到了音讯,这是由于顾客中的routingKey
包括"order.#"
,#
代表有一个或许多个单词,所以匹配到。
将出产者代码修改如下:
rabbitTemplate.convertAndSend("topics","product.save.add","product.save.add 路由音讯");
顾客2中有
routingKey
为product.#
,所以可以接纳到。