作者:绍舒

导言

Apache RocketMQ 诞生至今,历经十余年大规划事务稳定性打磨,服务了阿里集团内部事务以及阿里云数以万计的企业客户。作为金融级牢靠的事务音讯计划,RocketMQ 从创立之初就一向专注于事务集成范畴的异步通信才能构建。本篇将继续事务音讯集成的场景,从功用原理、运用事例、最佳实践以及实战等视点介绍 RocketMQ 的次序音讯功用。

简介

次序音讯是音讯行列 RocketMQ 版提供的一种对音讯发送和消费次序有严厉要求的音讯。关于一个指定的 Topic,同一 MessageGroup 的音讯依照严厉的先进先出(FIFO)准则进行发布和消费,即先发布的音讯先消费,后发布的音讯后消费,服务端严厉依照发送次序进行存储、消费。同一 MessageGroup 的音讯确保次序,不同 MessageGroup 之间的音讯次序不做要求,因而需做到两点,发送的次序性和消费的次序性。

解析 RocketMQ 业务消息--“顺序消息”

功用原理

在这儿首先抛出一个问题,在日常的接触中,许多 RocketMQ 运用者会以为,已然次序音讯能在一般音讯的基础上完成次序,看起来就是一般音讯的加强版,那么为什么不全部都运用次序音讯呢?接下来就会围绕这个问题,对比一般音讯和次序音讯进行论述。

次序发送

在分布式环境下,确保音讯的大局次序性是十分困难的,例如两个 RocketMQ Producer A 与 Producer B,它们在没有交流的情况下各自向 RocketMQ 服务端发送音讯 a 和音讯 b,因为分布式系统的约束,咱们无法确保 a 和 b 的次序。因而业界音讯系统一般确保的是分区的次序性,即确保带有同一特点的音讯的次序,咱们将该特点称之为 MessageGroup。如图所示,ProducerA 发送了 MessageGroup 特点为 A 的两条音讯 A1,A2 和 MessageGroup 特点为 B 的 B1,B2,而 ProducerB 发送了 MessageGroup 特点为 C 的两条特点 C1,C2。

解析 RocketMQ 业务消息--“顺序消息”

同时,关于同一 MessageGroup,为了确保其发送次序的先后性,比较简单的做法是结构一个单线程的场景,即不同的 MessageGroup 由不同的 Producer 担任,并且关于每一个 Producer 而言,次序音讯是同步发送的。同步发送的好处是显而易见的,在客户端得到上一条音讯的发送成果后再发送下一条,即能精确确保发送次序,若运用异步发送或多线程则很难确保这一点。

解析 RocketMQ 业务消息--“顺序消息”

因而能够看到,虽然在底层原理上,次序音讯发送和一般音讯发送并无二异,但是为了确保次序音讯的发送次序性,同步发送的方法相比较一般音讯,实际上降低了音讯的最大吞吐。

次序消费

与次序音讯不同的是,一般音讯的消费实际上没有任何约束,顾客拉取的音讯是被异步、并发消费的,而次序音讯,需求确保关于同一个 MessageGroup,同一时间只要一个客户端在消费音讯,并且在该条音讯被承认消费完成之前(或者进入死信行列),顾客无法消费同一 MessageGroup 的下一条音讯,不然消费的次序性将得不到确保。因而这儿存在着一个消费瓶颈,该瓶颈取决于用户自身的事务处理逻辑。极端情况下当某一 MessageGroup 的音讯过多时,就可能导致消费堆积。当然也需求清晰的是,这儿的语境都指的是同一 MessageGroup,不同 MessageGroup 的音讯之间并不存在次序性的相关,是能够进行并发消费的。因而全文中说到的次序实际上是一种偏序。

解析 RocketMQ 业务消息--“顺序消息”

小结

不管关于发送还是消费,咱们经过 MessageGroup 的方法将音讯分组,即并发的根本单元是 MessageGroup,不同的 MessageGroup 能够并发的发送和消费,从而必定程度具有了可拓展性,支持多行列存储、水平拆分、并发消费,且不受影响。回顾一般音讯,站在次序音讯的视角,能够以为一般音讯的并发根本单元是单条音讯,即每条音讯均拥有不同的 MessageGroup。

咱们回到最初那个问题:

已然次序音讯能在一般音讯的基础上完成次序,看起来就是一般音讯的加强版,那么为什么不全部都运用次序音讯呢?

现在我们关于这个问题可能有一个根本的形象了,音讯的次序性当然很好,但是为了完成次序性也是有代价的。

下述是一个表格,扼要对比了次序音讯和一般音讯。

解析 RocketMQ 业务消息--“顺序消息”

最佳实践

合理设置MessageGroup

MessageGroup 会有很多过错的挑选,以某电商渠道为例,某电商渠道将商家 ID 作为 MessageGroup,因为部分规划较大的商家会产出较多订单,因为下流消费才能的约束,因而这部分商家所对应的订单就发生了严重的堆积。正确的做法应当是将订单号作为 MessageGroup,而且站在背面的事务逻辑上来说,同一订单才有次序性的要求。即挑选 MessageGroup 的最佳实践是:MessageGroup 生命周期最好较为时间短,且不同 MessageGroup 的数量应当尽量相同且均匀。

同步发送和发送重试

如之前章节所述,需运用同步发送和发送重试来确保发送的次序性。

消费幂等

音讯传输链路在异常场景下会有少数重复,事务消费是需求做消费幂等,避免重复处理带来的危险。

运用事例

  • 用户注册需求发送验证码,以用户 ID 作为 MessageGroup,那么同一个用户发送的音讯都会依照发布的先后次序来消费。

  • 电商的订单创立,以订单 ID 作为 MessageGroup,那么同一个订单相关的创立订单音讯、订单支付音讯、订单退款音讯、订单物流音讯都会依照发布的先后次序来消费。

解析 RocketMQ 业务消息--“顺序消息”

实战

发送

能够看到,该发送事例设置了 MessageGroup 并且运用了同步发送,发送的代码如下:

public class ProducerFifoMessageExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(ProducerFifoMessageExample.class);
    private ProducerFifoMessageExample() {
    }
    public static void main(String[] args) throws ClientException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // Credential provider is optional for client configuration.
        String accessKey = "yourAccessKey";
        String secretKey = "yourSecretKey";
        SessionCredentialsProvider sessionCredentialsProvider =
            new StaticSessionCredentialsProvider(accessKey, secretKey);
        String endpoints = "foobar.com:8080";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
            .setEndpoints(endpoints)
            .setCredentialProvider(sessionCredentialsProvider)
            .build();
        String topic = "yourFifoTopic";
        final Producer producer = provider.newProducerBuilder()
            .setClientConfiguration(clientConfiguration)
            // Set the topic name(s), which is optional. It makes producer could prefetch the topic route before 
            // message publishing.
            .setTopics(topic)
            // May throw {@link ClientException} if the producer is not initialized.
            .build();
        // Define your message body.
        byte[] body = "This is a FIFO message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
        String tag = "yourMessageTagA";
        final Message message = provider.newMessageBuilder()
            // Set topic for the current message.
            .setTopic(topic)
            // Message secondary classifier of message besides topic.
            .setTag(tag)
            // Key(s) of the message, another way to mark message besides message id.
            .setKeys("yourMessageKey-1ff69ada8e0e")
            // Message group decides the message delivery order.
            .setMessageGroup("youMessageGroup0")
            .setBody(body)
            .build();
        try {
            final SendReceipt sendReceipt = producer.send(message);
            LOGGER.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (Throwable t) {
            LOGGER.error("Failed to send message", t);
        }
        // Close the producer when you don't need it anymore.
        producer.close();
    }
}

消费

消费的代码如下:

public class SimpleConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);
    private SimpleConsumerExample() {
    }
    public static void main(String[] args) throws ClientException, IOException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // Credential provider is optional for client configuration.
        String accessKey = "yourAccessKey";
        String secretKey = "yourSecretKey";
        SessionCredentialsProvider sessionCredentialsProvider =
            new StaticSessionCredentialsProvider(accessKey, secretKey);
        String endpoints = "foobar.com:8080";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
            .setEndpoints(endpoints)
            .setCredentialProvider(sessionCredentialsProvider)
            .build();
        String consumerGroup = "yourConsumerGroup";
        Duration awaitDuration = Duration.ofSeconds(30);
        String tag = "yourMessageTagA";
        String topic = "yourTopic";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
            .setClientConfiguration(clientConfiguration)
            // Set the consumer group name.
            .setConsumerGroup(consumerGroup)
            // set await duration for long-polling.
            .setAwaitDuration(awaitDuration)
            // Set the subscription for the consumer.
            .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
            .build();
        // Max message num for each long polling.
        int maxMessageNum = 16;
        // Set message invisible duration after it is received.
        Duration invisibleDuration = Duration.ofSeconds(5);
        final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
        for (MessageView message : messages) {
            try {
                consumer.ack(message);
            } catch (Throwable t) {
                LOGGER.error("Failed to acknowledge message, messageId={}", message.getMessageId(), t);
            }
        }
        // Close the simple consumer when you don't need it anymore.
        consumer.close();
    }
}

今天经过对 RocketMQ 次序音讯的介绍,希望能够帮我们对次序音讯的原理和运用有更深化的了解,同时也期望 RocketMQ 的次序音讯能够帮助您更有效的处理事务问题。假如您对 RocktMQ 的事务音讯感兴趣,也欢迎您扫描下方二维码参加钉钉群一同交流交流~

解析 RocketMQ 业务消息--“顺序消息”

点击此处,进入官网了解更多概况~