前语
Rabbitmq拉形式是什么?为什么有推形式还要拉形式?怎样在spring中完成批量拉音讯消费呢?本文针对这些问题来着手完成一个Rabbitmq拉形式,顾客自动请求服务端接口来批量消费音讯。
Rabbitmq的消费形式有两种:
- 推形式
- 拉形式
在spring中,加入封装好的amqp:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
只需求在顾客方法上加上@RabbitListener(queues = XXXX)就能完成顾客监听订阅的主题;比方如下,监听咱们订阅的队列,当有音讯的时分,顾客执行发送邮件方法
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void receiveMessage(String message) {
log.info("开始发送邮件信息:" message);
mailService.sendMail(message);
}
探求原理
什么是拉形式?
先了解一下推形式:音讯中间件自动将音讯推送给顾客;能够结合观察者形式来理解,条件是顾客订阅(注册)了需求的主题,当出产者在该主题内出产了新的音讯,那么服务端就会自动推送新的音讯到顾客端,这样就完成了音讯队列的推形式。由于推形式是信息到达RabbitMQ后,就会立即被投递给匹配的顾客,所以实时性非常好,顾客能及时得到最新的音讯。
拉形式:顾客自动从音讯中间件拉取音讯,相较于推形式的差异,其实就跟pull和push一样,是顾客自动去服务端拉取音讯消费,是自动的;由于拉形式需求顾客手动去RabbitMQ中拉取音讯,所以实时性较差;顾客难以获取实时音讯,具体什么时分能拿到新音讯彻底取决于顾客什么时分去拉取音讯。
为什么要拉形式?
既然拉形式的的实时性欠好,为什么还要用呢?因为在某些场景下,由于某些限制,顾客只要在某些条件成立下才能去音讯中间件中批量获取音讯;并且相较于推形式重视于音讯的实时性,拉形式愈加重视顾客的消费能力,所以是因为场景不同,咱们需求拉形式。
怎样完成批量拉音讯消费呢?
引进spring完成的amqp包,其中RabbitTemplate供给了获取单条音讯的方法,咱们能够自己完成批量消费。
着手完成
配置
@Configuration
public class RabbitConfig {
public static final String TOPIC_EXCHANGE_NAME = "topic.exchange";
public static final String TOPIC_QUEUE_A = "topic.queue.a";
public static final String TOPIC_QUEUE_B = "topic.queue.b";
@Resource
private RabbitAdmin rabbitAdmin;
@Bean
public RabbitTemplate rabbitTemplate() {
return rabbitAdmin.getRabbitTemplate();
}
public ConnectionFactory getConnectFactory() {
com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();
connectionFactory.setHost("XXXX");
connectionFactory.setPort(xXXX);
connectionFactory.setUsername("XXXX");
connectionFactory.setPassword("XXXX");
connectionFactory.setVirtualHost("XXXX");
return new CachingConnectionFactory(connectionFactory);
}
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
@Bean
public RabbitAdmin rabbitAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(getConnectFactory());
// 只要设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);
rabbitAdmin.declareExchange(rabbitTopicExchange());
rabbitAdmin.declareQueue(topicQueue(TOPIC_QUEUE_A));
rabbitAdmin.declareQueue(topicQueue(TOPIC_QUEUE_B));
return rabbitAdmin;
}
public TopicExchange rabbitTopicExchange() {
return new TopicExchange(TOPIC_EXCHANGE_NAME, true, false);
}
public Queue topicQueue(String queueName) {
return new Queue(queueName, true, false, false);
}
@Bean
public Binding bindTopicExchangeToMail() {
return BindingBuilder.bind(topicQueue(TOPIC_QUEUE_A))
.to(rabbitTopicExchange())
.with("XXXX");
}
@Bean
public Binding bindTopicExchangeToSms() {
return BindingBuilder.bind(topicQueue(TOPIC_QUEUE_B))
.to(rabbitTopicExchange())
.with("XXXX");
}
}
完成类
RabbitMQService
public interface RabbitMQService {
String consumeOneMessage(String queueName);
List<String> consumeBatchMessage(String queueName, int batchSize);
}
RabbitMQServiceImpl
@Service
public class RabbitMQServiceImpl implements RabbitMQService {
@Resource
private RabbitTemplate rabbitTemplate;
// 消费单条音讯
@Override
public String consumeOneMessage(String queueName) {
Object o = rabbitTemplate.receiveAndConvert(queueName);
return (String) o;
}
// 批量消费音讯
@Override
public List<String> consumeBatchMessage(String queueName, int batchSize) {
List<String> result = new ArrayList<>();
for (int i = 0; i < batchSize; i ) {
String message = (String) rabbitTemplate.receiveAndConvert(queueName);
if (message == null) {
break;
// 取200条音讯,大概25ms continue;
}
result.add(message);
}
return result;
}
}
接口
@RestController
@RequestMapping("/rabbitmq")
public class RabbitMQController {
@Resource
private RabbitMQService rabbitMQService;
@GetMapping("/batchConsume")
public List<String> consumeBatch(@RequestParam(name = "queueName") String queueName, @RequestParam(name = "batchSize") int batchSize) {
return rabbitMQService.consumeBatchMessage(queueName, batchSize);
}
@GetMapping("/singleConsume")
public String consumeSingle(@RequestParam(name = "queueName") String queueName) {
return rabbitMQService.consumeOneMessage(queueName);
}
问题
1. 守时使命不生效的问题;当咱们的配置文件完成了BeanPostProcessor后,守时使命不生效?
2. RabbitTemplate完成获取单条音讯的原理呢?ack机制是什么?
3. 推磨式下怎样保证音讯不丢掉呢?