前语

Rabbitmq拉形式是什么?为什么有推形式还要拉形式?怎样在spring中完成批量拉音讯消费呢?本文针对这些问题来着手完成一个Rabbitmq拉形式,顾客自动请求服务端接口来批量消费音讯。

Rabbitmq的消费形式有两种:

  1. 推形式
  2. 拉形式

在spring中,加入封装好的amqp:

<dependency>
 <groupId>org.springframework.amqp</groupId>
 <artifactId>spring-rabbit</artifactId>
 <version>2.1.2.RELEASE</version>
</dependency>

只需求在顾客方法上加上@RabbitListener(queues = XXXX)就能完成顾客监听订阅的主题;比方如下,监听咱们订阅的队列,当有音讯的时分,顾客执行发送邮件方法

@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void receiveMessage(String message) {  
 log.info("开始发送邮件信息:"   message);   
 mailService.sendMail(message);  
}

探求原理

什么是拉形式?

先了解一下推形式:音讯中间件自动将音讯推送给顾客;能够结合观察者形式来理解,条件是顾客订阅(注册)了需求的主题,当出产者在该主题内出产了新的音讯,那么服务端就会自动推送新的音讯到顾客端,这样就完成了音讯队列的推形式。由于推形式是信息到达RabbitMQ后,就会立即被投递给匹配的顾客,所以实时性非常好,顾客能及时得到最新的音讯。

拉形式:顾客自动从音讯中间件拉取音讯,相较于推形式的差异,其实就跟pull和push一样,是顾客自动去服务端拉取音讯消费,是自动的;由于拉形式需求顾客手动去RabbitMQ中拉取音讯,所以实时性较差;顾客难以获取实时音讯,具体什么时分能拿到新音讯彻底取决于顾客什么时分去拉取音讯。

为什么要拉形式?

既然拉形式的的实时性欠好,为什么还要用呢?因为在某些场景下,由于某些限制,顾客只要在某些条件成立下才能去音讯中间件中批量获取音讯;并且相较于推形式重视于音讯的实时性,拉形式愈加重视顾客的消费能力,所以是因为场景不同,咱们需求拉形式。

怎样完成批量拉音讯消费呢?

引进spring完成的amqp包,其中RabbitTemplate供给了获取单条音讯的方法,咱们能够自己完成批量消费。

着手完成

配置


@Configuration
public class RabbitConfig {
    public static final String TOPIC_EXCHANGE_NAME = "topic.exchange";
    public static final String TOPIC_QUEUE_A = "topic.queue.a";
    public static final String TOPIC_QUEUE_B = "topic.queue.b";
    @Resource
    private RabbitAdmin rabbitAdmin;
    @Bean
    public RabbitTemplate rabbitTemplate() {
        return rabbitAdmin.getRabbitTemplate();
    }
    public ConnectionFactory getConnectFactory() {
        com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();
        connectionFactory.setHost("XXXX");
        connectionFactory.setPort(xXXX);
        connectionFactory.setUsername("XXXX");
        connectionFactory.setPassword("XXXX");
        connectionFactory.setVirtualHost("XXXX");
        return new CachingConnectionFactory(connectionFactory);
    }
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    @Bean
    public RabbitAdmin  rabbitAdmin() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(getConnectFactory());
        // 只要设置为 true,spring 才会加载 RabbitAdmin 这个类
        rabbitAdmin.setAutoStartup(true);
        rabbitAdmin.declareExchange(rabbitTopicExchange());
        rabbitAdmin.declareQueue(topicQueue(TOPIC_QUEUE_A));
        rabbitAdmin.declareQueue(topicQueue(TOPIC_QUEUE_B));
        return rabbitAdmin;
    }
    public TopicExchange rabbitTopicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE_NAME, true, false);
    }
    public Queue topicQueue(String queueName) {
        return new Queue(queueName, true, false, false);
    }
    @Bean
    public Binding bindTopicExchangeToMail() {
        return BindingBuilder.bind(topicQueue(TOPIC_QUEUE_A))
                .to(rabbitTopicExchange())
                .with("XXXX");
    }
    @Bean
    public Binding bindTopicExchangeToSms() {
        return BindingBuilder.bind(topicQueue(TOPIC_QUEUE_B))
                .to(rabbitTopicExchange())
                .with("XXXX");
    }
}

完成类

RabbitMQService

public interface RabbitMQService {
    String consumeOneMessage(String queueName);
    List<String> consumeBatchMessage(String queueName, int batchSize);
}

RabbitMQServiceImpl

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
    @Resource
    private RabbitTemplate rabbitTemplate;
    // 消费单条音讯
    @Override
    public String consumeOneMessage(String queueName) {
        Object o = rabbitTemplate.receiveAndConvert(queueName);
        return (String) o;
    }
    // 批量消费音讯
    @Override
    public List<String> consumeBatchMessage(String queueName, int batchSize) {
        List<String> result = new ArrayList<>();
        for (int i = 0; i < batchSize; i   ) {
            String message = (String) rabbitTemplate.receiveAndConvert(queueName);
            if (message == null) {
                break;
//            取200条音讯,大概25ms continue;
            }
            result.add(message);
        }
        return result;
    }
}

接口

@RestController
@RequestMapping("/rabbitmq")
public class RabbitMQController {
    @Resource
    private RabbitMQService rabbitMQService;
    @GetMapping("/batchConsume")
    public List<String> consumeBatch(@RequestParam(name = "queueName") String queueName, @RequestParam(name = "batchSize") int batchSize) {
        return rabbitMQService.consumeBatchMessage(queueName, batchSize);
    }
    @GetMapping("/singleConsume")
    public String consumeSingle(@RequestParam(name = "queueName") String queueName) {
        return rabbitMQService.consumeOneMessage(queueName);
    }

问题

1. 守时使命不生效的问题;当咱们的配置文件完成了BeanPostProcessor后,守时使命不生效?

2. RabbitTemplate完成获取单条音讯的原理呢?ack机制是什么?

3. 推磨式下怎样保证音讯不丢掉呢?