上篇文章描述了运用RabbitMQ原生API来完成音讯的处理,在实际项目开发中,略显繁琐。咱们就能够运用springboot的RabbitMQ插件进行简略装备就能够完成。
一、springboot集成RabbitMQ
1-1、增加依靠
首要创立一个springboot项目,然后增加下面依靠
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1-2、增加装备信息
spring.rabbitmq.host=192.168.253.131
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/mirror
1-3、创立行列及交换机
创立行列及交换机并进行绑定除了在控制台进行操作外,还能够利用程序进行创立,下面来分别介绍一下
1-3-1、运用@Configuration在项目发动时进行创立
运用@Configuration需求留意的是下面声明queue和exchange以及绑定 的办法需求增加@Bean
这种办法创立的优点便是项目发动之后就能够完成创立,缺陷是假如想动态创立就无法完成了
绑定交换机和行列运用BindingBuilder.bind(行列).to(交换机)
1-3-1-1、创立fanout交换机
@Configuration
public class FanoutConfig {
//声明行列
@Bean
public Queue fanoutQ1() {
return new Queue("finout.queue1");
}
//声明exchange
@Bean
public FanoutExchange setFanoutExchange() {
return new FanoutExchange("fanout.exchange");
}
//声明Binding,exchange与queue的绑定联系
@Bean
public Binding bindQ1() {
return BindingBuilder.bind(fanoutQ1()).to(setFanoutExchange());
}
}
1-3-1-2、创立topic交换机
绑定交换机和行列运用BindingBuilder.bind(行列).to(交换机).with(路由键)
@Configuration
public class TopicConfig {
//声明行列
@Bean
public Queue topicQ1() {
return new Queue("topic.queue1");
}
//声明exchange
@Bean
public TopicExchange setTopicExchange() {
return new TopicExchange("topic.exchange");
}
//声明binding,需求声明一个roytingKey
@Bean
public Binding bindTopicHebei1() {
return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with("hunan.*");
}
}
1-3-1-3、创立header交换机
绑定交换机和行列有三种办法如下:
BindingBuilder.bind(行列).to(交换机).where(key).matches(val)
BindingBuilder.bind(行列).to(交换机).whereAny(Map).match()
BindingBuilder.bind(行列).to(交换机).whereAll(Map).match()
完成代码
@Configuration
public class HeaderConfig {
//声明queue
@Bean
public Queue headQueueTxTyp1() {
return new Queue("txTyp1");
}
@Bean
public Queue headQueueBusTyp1() {
return new Queue("busTyp1");
}
@Bean
public Queue headQueueTxBusTyp() {
return new Queue("txbusTyp1");
}
//声明exchange
@Bean
public HeadersExchange setHeaderExchange() {
return new HeadersExchange("headerExchange");
}
//声明Binding
//绑定header中txtyp=1的行列。header的行列匹配能够用mathces和exisits
@Bean
public Binding bindHeaderTxTyp1() {
return BindingBuilder.bind(headQueueTxTyp1()).to(setHeaderExchange()).where("txTyp").matches("1");
}
//绑定Header中busTyp=1的行列。
@Bean
public Binding bindHeaderBusTyp1() {
return BindingBuilder.bind(headQueueBusTyp1()).to(setHeaderExchange()).where("busTyp").matches("1");
}
//绑定Header中txtyp=1或者busTyp=1的行列。
@Bean
public Binding bindHeaderTxBusTyp1() {
Map<String,Object> condMap = new HashMap<>();
condMap.put("txTyp", "1");
condMap.put("busTyp", "1");
return BindingBuilder.bind(headQueueTxBusTyp()).to(setHeaderExchange()).whereAny(condMap).match();
}
}
1-3-2、运用AmqpAdmin
运用AmqpAdmin优点是能够随时进行行列及路由的创立和绑定
需求留意的是,创立交换机需求运用各自类型的交换机进行创立:
new FanoutExchange()
new TopicExchange()
new HeadersExchange()
@Autowired
private AmqpAdmin amqpAdmin;
@GetMapping("initFanoutExchangeQueue")
public Object initQueue(){
String queueName="fanout.queue1";
String exchangeName="fanout.exchange";
//创立行列
Queue queue = new Queue(queueName, false, false, false, null);
amqpAdmin.declareQueue(queue);
//创立交换机
FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange", false, false, null);
amqpAdmin.declareExchange(fanoutExchange);
//绑定
amqpAdmin.declareBinding(new Binding(queueName, Binding.DestinationType.QUEUE,exchangeName,"",null));
return "success";
}
1-4、发送音讯
RabbitMQ运用起来和redis很类似,redis运用RedisTemplate,RabbitMQ则运用RabbitTemplate
,下面来看下如何运用
在springboot中运用RabbitTemplate无法发送stream行列音讯
,由于发送stream行列音讯,需求设置basicQos
属性,而basicQos属性需求经过channel来设置,RabbitTemplate现已进行封装,运用的时候无法获取channel,因此无法发送stream类型行列音讯;
1-4-2、给指定行列发送音讯
首要设置音讯的相关参数,最终调用rabbitTemplate.send(行列称号,音讯内容,恳求参数)
进行发送
String message="hello word";
//设置部分恳求参数
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//发音讯
rabbitTemplate.send("directqueue",new Message(message.getBytes("UTF-8"),messageProperties));
1-4-3、运用exchange的fanout发送音讯
向交换机发送音讯和向行列发送类似
第一步:设置音讯相关参数
MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
第二步:发送音讯
rabbitTemplate.send(行列称号,路由称号,音讯,音讯相关参数)
String message="fanoutmessage";
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//fanout模式只往exchange里发送音讯。分发到exchange下的所有queue
rabbitTemplate.send(MyConstants.EXCHANGE_FANOUT, "", new Message(message.getBytes("UTF-8"),messageProperties));
1-4-4、运用exchange的topic发送音讯
String routingKey="routingkey";
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//发送音讯
rabbitTemplate.send("topicExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties));
1-4-5、运用exchange的header发送音讯
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//设置header信息
messageProperties.setHeader("name", "admin");
messageProperties.setHeader("pass", "123");
//发送音讯
rabbitTemplate.send("headerExchange", "uselessRoutingKey", new Message(message.getBytes("UTF-8"),messageProperties));
1-4-6、运用quorum行列发送音讯
//设置部分恳求参数
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//发音讯
rabbitTemplate.send("QUEUE_QUORUM",new Message(message.getBytes("UTF-8"),messageProperties));
1-5、接纳音讯
顾客都是经过@RabbitListener注解来声明。注解中包含了声明顾客行列时所需求的重点参数。对照原生API,这些参数就不难了解了。
可是当要消费Stream行列时,仍是要重点留意他的三个必要的进程:
- channel必须设置basicQos属性。 channel目标能够在@RabbitListener声明的顾客办法中直接引证,Spring结构会进行注入。
- 正确声明Stream行列。 经过往Spring容器中注入Queue目标的办法声明行列。在Queue目标中传入声明Stream行列所需求的参数。
- 消费时需求指定offset。 能够经过注入Channel目标,运用原生API传入offset属性。
运用SpringBoot结构集成RabbitMQ后,开发进程能够得到很大的简化,所以运用进程并不难,对照一下示例就能很快上手。可是,需求了解一下的是,SpringBoot集成后的RabbitMQ中的很多概念,虽然都能跟原生API对应上,可是这些模型中心都是做了转换的,比方Message,就不是原生RabbitMQ中的音讯了。运用SpringBoot结构,尤其需求加深对RabbitMQ原生API的了解,这样才干以不变应万变,深入了解各种看起来简略,可是其实坑很多的各种目标声明办法。
首要经过给办法加上如下注解,并设置行列即可消费对应的音讯
@RabbitListener(queues=MyConstants.QUEUE_Name)
1-5-1、接纳音讯
接纳音讯的办法,不论是直接接纳发送到行列的,仍是发送到exchange交换机,办法都是一样,首要差异便是绑定交换机的行列,依据绑定的行列不同而接纳对应的音讯
@RabbitListener(queues=MyConstants.QUEUE_Name)
public void directReceive2(String message) {
System.out.println("consumer2 received message : " +message);
}
1-5-2、普通行列音讯
发送音讯,第一个参数为行列称号,假如有多个消费此行列的消费端,只有一个消费端能够进行消费
rabbitTemplate.send("directqueue",new Message(message.getBytes("UTF-8"),messageProperties));
接纳音讯注解
@RabbitListener(queues="directqueue")
1-5-3、交换机fanout类型接纳音讯
发送音讯,第一个参数为交换机称号,和交换机绑定的行列都能够消费音讯
rabbitTemplate.send("EXCHANGE_FANOUT", routingkey, new Message(message.getBytes("UTF-8"),messageProperties));
接纳音讯,传入和上面EXCHANGE_FANOUT
交换机绑定的行列,即可消费音讯
@RabbitListener(queues="fanout_queue1")
1-5-4、交换机topic类型接纳音讯
发送音讯,指定一个topic类型的交换机,而且设置routingkey
rabbitTemplate.send("topicExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties));
接纳音讯:
留意这个模式会有优先匹配原则。例如发送routingKey=beijing.haidian,那匹配到beijing.* (beijing.haidian,beijing.chaoyang),之后就不会再去匹配*.haidian(XXX.haidian)
@RabbitListener(queues="beijing.haidian")
1-5-5、交换机header类型接纳音讯
如下图三个行列绑定了headerExchange交换机
busType1:当header中有busTyp=1则接纳音讯,即使设置其他key-val也能够
txType1:当header中有txTyp=1则接纳音讯,即使设置其他key-val也能够
txbusType1:当header中有busTyp=1或者txTyp:1则能够接纳音讯(由于设置的x-match:any,假如是all则两个都必须满意)