作者:绍舒
导言
Apache RocketMQ 诞生至今,历经十余年大规划事务稳定性打磨,服务了阿里集团内部事务以及阿里云数以万计的企业客户。作为金融级牢靠的事务音讯计划,RocketMQ 从创立之初就一向专注于事务集成范畴的异步通信才能构建。本篇将继续事务音讯集成的场景,从功用原理、运用事例、最佳实践以及实战等视点介绍 RocketMQ 的次序音讯功用。
简介
次序音讯是音讯行列 RocketMQ 版提供的一种对音讯发送和消费次序有严厉要求的音讯。关于一个指定的 Topic,同一 MessageGroup 的音讯依照严厉的先进先出(FIFO)准则进行发布和消费,即先发布的音讯先消费,后发布的音讯后消费,服务端严厉依照发送次序进行存储、消费。同一 MessageGroup 的音讯确保次序,不同 MessageGroup 之间的音讯次序不做要求,因而需做到两点,发送的次序性和消费的次序性。
功用原理
在这儿首先抛出一个问题,在日常的接触中,许多 RocketMQ 运用者会以为,已然次序音讯能在一般音讯的基础上完成次序,看起来就是一般音讯的加强版,那么为什么不全部都运用次序音讯呢?接下来就会围绕这个问题,对比一般音讯和次序音讯进行论述。
次序发送
在分布式环境下,确保音讯的大局次序性是十分困难的,例如两个 RocketMQ Producer A 与 Producer B,它们在没有交流的情况下各自向 RocketMQ 服务端发送音讯 a 和音讯 b,因为分布式系统的约束,咱们无法确保 a 和 b 的次序。因而业界音讯系统一般确保的是分区的次序性,即确保带有同一特点的音讯的次序,咱们将该特点称之为 MessageGroup。如图所示,ProducerA 发送了 MessageGroup 特点为 A 的两条音讯 A1,A2 和 MessageGroup 特点为 B 的 B1,B2,而 ProducerB 发送了 MessageGroup 特点为 C 的两条特点 C1,C2。
同时,关于同一 MessageGroup,为了确保其发送次序的先后性,比较简单的做法是结构一个单线程的场景,即不同的 MessageGroup 由不同的 Producer 担任,并且关于每一个 Producer 而言,次序音讯是同步发送的。同步发送的好处是显而易见的,在客户端得到上一条音讯的发送成果后再发送下一条,即能精确确保发送次序,若运用异步发送或多线程则很难确保这一点。
因而能够看到,虽然在底层原理上,次序音讯发送和一般音讯发送并无二异,但是为了确保次序音讯的发送次序性,同步发送的方法相比较一般音讯,实际上降低了音讯的最大吞吐。
次序消费
与次序音讯不同的是,一般音讯的消费实际上没有任何约束,顾客拉取的音讯是被异步、并发消费的,而次序音讯,需求确保关于同一个 MessageGroup,同一时间只要一个客户端在消费音讯,并且在该条音讯被承认消费完成之前(或者进入死信行列),顾客无法消费同一 MessageGroup 的下一条音讯,不然消费的次序性将得不到确保。因而这儿存在着一个消费瓶颈,该瓶颈取决于用户自身的事务处理逻辑。极端情况下当某一 MessageGroup 的音讯过多时,就可能导致消费堆积。当然也需求清晰的是,这儿的语境都指的是同一 MessageGroup,不同 MessageGroup 的音讯之间并不存在次序性的相关,是能够进行并发消费的。因而全文中说到的次序实际上是一种偏序。
小结
不管关于发送还是消费,咱们经过 MessageGroup 的方法将音讯分组,即并发的根本单元是 MessageGroup,不同的 MessageGroup 能够并发的发送和消费,从而必定程度具有了可拓展性,支持多行列存储、水平拆分、并发消费,且不受影响。回顾一般音讯,站在次序音讯的视角,能够以为一般音讯的并发根本单元是单条音讯,即每条音讯均拥有不同的 MessageGroup。
咱们回到最初那个问题:
已然次序音讯能在一般音讯的基础上完成次序,看起来就是一般音讯的加强版,那么为什么不全部都运用次序音讯呢?
现在我们关于这个问题可能有一个根本的形象了,音讯的次序性当然很好,但是为了完成次序性也是有代价的。
下述是一个表格,扼要对比了次序音讯和一般音讯。
最佳实践
合理设置MessageGroup
MessageGroup 会有很多过错的挑选,以某电商渠道为例,某电商渠道将商家 ID 作为 MessageGroup,因为部分规划较大的商家会产出较多订单,因为下流消费才能的约束,因而这部分商家所对应的订单就发生了严重的堆积。正确的做法应当是将订单号作为 MessageGroup,而且站在背面的事务逻辑上来说,同一订单才有次序性的要求。即挑选 MessageGroup 的最佳实践是:MessageGroup 生命周期最好较为时间短,且不同 MessageGroup 的数量应当尽量相同且均匀。
同步发送和发送重试
如之前章节所述,需运用同步发送和发送重试来确保发送的次序性。
消费幂等
音讯传输链路在异常场景下会有少数重复,事务消费是需求做消费幂等,避免重复处理带来的危险。
运用事例
-
用户注册需求发送验证码,以用户 ID 作为 MessageGroup,那么同一个用户发送的音讯都会依照发布的先后次序来消费。
-
电商的订单创立,以订单 ID 作为 MessageGroup,那么同一个订单相关的创立订单音讯、订单支付音讯、订单退款音讯、订单物流音讯都会依照发布的先后次序来消费。
实战
发送
能够看到,该发送事例设置了 MessageGroup 并且运用了同步发送,发送的代码如下:
public class ProducerFifoMessageExample {
private static final Logger LOGGER = LoggerFactory.getLogger(ProducerFifoMessageExample.class);
private ProducerFifoMessageExample() {
}
public static void main(String[] args) throws ClientException, IOException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// Credential provider is optional for client configuration.
String accessKey = "yourAccessKey";
String secretKey = "yourSecretKey";
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);
String endpoints = "foobar.com:8080";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.setCredentialProvider(sessionCredentialsProvider)
.build();
String topic = "yourFifoTopic";
final Producer producer = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the topic name(s), which is optional. It makes producer could prefetch the topic route before
// message publishing.
.setTopics(topic)
// May throw {@link ClientException} if the producer is not initialized.
.build();
// Define your message body.
byte[] body = "This is a FIFO message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
String tag = "yourMessageTagA";
final Message message = provider.newMessageBuilder()
// Set topic for the current message.
.setTopic(topic)
// Message secondary classifier of message besides topic.
.setTag(tag)
// Key(s) of the message, another way to mark message besides message id.
.setKeys("yourMessageKey-1ff69ada8e0e")
// Message group decides the message delivery order.
.setMessageGroup("youMessageGroup0")
.setBody(body)
.build();
try {
final SendReceipt sendReceipt = producer.send(message);
LOGGER.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (Throwable t) {
LOGGER.error("Failed to send message", t);
}
// Close the producer when you don't need it anymore.
producer.close();
}
}
消费
消费的代码如下:
public class SimpleConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);
private SimpleConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// Credential provider is optional for client configuration.
String accessKey = "yourAccessKey";
String secretKey = "yourSecretKey";
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);
String endpoints = "foobar.com:8080";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.setCredentialProvider(sessionCredentialsProvider)
.build();
String consumerGroup = "yourConsumerGroup";
Duration awaitDuration = Duration.ofSeconds(30);
String tag = "yourMessageTagA";
String topic = "yourTopic";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the consumer group name.
.setConsumerGroup(consumerGroup)
// set await duration for long-polling.
.setAwaitDuration(awaitDuration)
// Set the subscription for the consumer.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
// Max message num for each long polling.
int maxMessageNum = 16;
// Set message invisible duration after it is received.
Duration invisibleDuration = Duration.ofSeconds(5);
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
for (MessageView message : messages) {
try {
consumer.ack(message);
} catch (Throwable t) {
LOGGER.error("Failed to acknowledge message, messageId={}", message.getMessageId(), t);
}
}
// Close the simple consumer when you don't need it anymore.
consumer.close();
}
}
今天经过对 RocketMQ 次序音讯的介绍,希望能够帮我们对次序音讯的原理和运用有更深化的了解,同时也期望 RocketMQ 的次序音讯能够帮助您更有效的处理事务问题。假如您对 RocktMQ 的事务音讯感兴趣,也欢迎您扫描下方二维码参加钉钉群一同交流交流~
点击此处,进入官网了解更多概况~