导语
Apache Pulsar 是一个多租户、高性能的服务间音讯传输解决方案,支撑多租户、低延时、读写分离、跨地域复制、快速扩容、灵敏容错等特性。腾讯云MQ Oteam Pulsar工作组对 Pulsar 做了深化调研以及大量的性能和稳定性方面优化,现在现已在TDBank、腾讯云TDMQ落地上线。本篇将简单介绍Pulsar服务端音讯承认的一些概念和原理,欢迎咱们阅览。
作者简介
林琳
腾讯云中间件专家工程师
Apache Pulsar PMC,《深化解析Apache Pulsar》作者。现在专注于中间件范畴,在音讯行列和微服务方向具有丰厚的经验。担任 TDMQ的规划与开发工作,现在致力于打造稳定、高效和可扩展的基础组件与服务。
前语
在业务音讯未呈现前,Pulsar中支撑的最高等级的音讯传递确保,是经过Broker的音讯去重机制,来确保Producer在单个分区上的音讯只准确保存一次。当Producer发送音讯失利后,即便重试发送音讯,Broker也能确保音讯只被耐久化一次。但在Partitioned Topic的场景下,Producer没有办法确保多个分区的音讯原子性。
当Broker 宕机时,Producer或许会发送音讯失利,假如Producer没有重试或已竭尽重试次数,则音讯不会写入 Pulsar。在顾客方面,现在的音讯承认是尽力而为的操作,并不能确保音讯一定被承认成功,假如音讯承认失利,这将导致音讯从头投递,顾客将收到重复的音讯, Pulsar 只能确保顾客至少消费一次。
类似地,Pulsar Functions 仅确保对幂等函数上的单个音讯处理一次,即需求业务确保幂等。它不能确保处理多个音讯或输出多个成果只产生一次。
举个比如,某个Function的履行过程是:从Topic-A1、Topic-A2中消费音讯,然后Function中对音讯进行聚合处理(如:时间窗口聚合核算),成果存储到Topic-B,最终别离承认(ACK) Topic-A1和Topic-A2中的音讯。该Function或许会在“输出成果到Topic-B”和“承认音讯”之间失利,甚至在承认单个音讯时失利。这将导致一切(或部分)Topic-A1、Topic-A2的音讯被从头传递和从头处理,并生成新的成果,然后导致整个时间窗口的核算成果错误。
因而,Pulsar需求业务机制来确保准确一次的语义(Exactly-once),出产和消费都能确保准确一次,不会重复,也不会丢掉数据,即便在Broker宕机或Function处理失利的状况下。
业务简介
Pulsar业务音讯的规划初衷是用于确保Pulsar Function的准确一次语义,能够确保Producer发送多条音讯到不同的Partition时,能够一起悉数成功或许一起悉数失利。也能够确保Consumer消费多条音讯在时,能够一起悉数承认成功或一起悉数失利。当然,也能够把出产、消费都包含在同一个业务中,要么悉数成功,要么悉数失利。
咱们以本末节最初处的Function场景为例,演示出产、消费在同一个业务中的场景:
首要,咱们需求在broker.conf中启用业务。
\transactionCoordinatorEnabled=true
然后,咱们别离创立PulsarClient和业务目标。出产者和顾客API中都需求带上这个业务目标,才干确保它们在同一个业务中。
//创立client,并启用业务
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.enableTransaction(true)
.build();
// 创立业务Transaction txn = pulsarClient
.newTransaction()
.withTransactionTimeout(1, TimeUnit.MINUTES)
.build()
.get();
String sourceTopic = "public/default/source-topic";
String sinkTopic = "public/default/sink-topic";
//创立出产者和顾客Consumer<String> sourceConsumer = pulsarClient.newConsumer(Schema.STRING)
.topic(sourceTopic)
.subscriptionName("my-sub")
.subscribe();
Producer<String> sinkProducer = pulsarClient.newProducer(Schema.STRING)
.topic(sinkTopic)
.create();
// 从原Topic中消费一条音讯,并发送到别的一个Topic中,它们在同一个业务内 Message<String> message = sourceConsumer.receive();
sinkProducer.newMessage(txn).value("sink data").sendAsync();
sourceConsumer.acknowledgeAsync(message.getMessageId(), txn);
// 提交业务
txn.commit().get();
咱们以本末节最初处的Function比如来说:
当未敞开业务时,假如Function先把成果写入SinkTopic,可是音讯承认失利(下图Step-4失利),这会导致音讯被从头被投递(下图Step-1),Function会从头核算一个成果再发送到SinkTopic,这样就会呈现一条数据被重复核算并投递了两次。
假如没有敞开业务,Function会先承认音讯,再把数据写入SinkTopic(先履行Step-4 再履行Step-3),此刻假如写入SinkTopic失利,而SourceTopic的音讯又现已被承认,则会形成数据丢掉,终究的核算成果也不准确。
假如敞开了业务,只要最终没有commit,前面一切的过程都会被回滚,出产的音讯、承认过的音讯都被回滚,然后让整个流程能够从头再来一遍,不会重复核算,也不会丢掉数据。整个时序图如下所示:
咱们只需求依据上面过程,了解每一步详细做了什么,就能清楚整个业务的完成方法。在下面的末节中,咱们将逐渐介绍。
业务流程
在了解整个业务流程之前,咱们先介绍Pulsar中业务的组件,常见的分布式业务中都会有TC、TM、RM等组件:
-
TM:业务发起者。界说业务的鸿沟,担任奉告 TC,分布式业务的开端,提交,回滚。在Pulsar业务中,由每个PulsarClient来扮演这个人物。
-
RM:每个节点的资源办理者。办理每个分支业务的资源,每一个 RM 都会作为一个分支业务注册在 TC。在Pulsar中界说了一个TopicTransactionBuffer和PendingAckHandle来别离办理出产、消费的资源。
-
TC :业务协调者。TC用于处理来自Pulsar Client的业务恳求以盯梢其业务状态的模块。每个TC都有一个 仅有id (TCID) 标识,TC之间独立维护自己的业务元数据存储。TCID 用于生成业务 ID,播送告诉不同节点提交、回滚业务。
下面,咱们以一个Producer来介绍整个业务的流程,图中灰色部分代表存储,现有内存和Bookkeeper两种存储完成:
-
挑选TC。一个Pulsar集群中或许存在多个TC(默许16个),PulsarClient在创立业务时需求先挑选用哪个TC,后续一切业务的创立、提交、回滚等操作都会发往这个TC。挑选的规则很简单,因为TC的Topic是固定的,首要Lookup查看一切分区地点的Broker(每个分区就是一个TC),然后每次Client创立新业务,轮询挑选一个TC即可。
-
敞开业务。代码中经过pulsarClient.newTransaction()敞开一个业务,Client会往对应的TC中发送一个newTxn指令,TC生成并回来一个新业务的ID目标,目标里保存了TC的ID(用于后续恳求找节点)和业务的ID,业务ID是递加的,同一个TC生成ID不会重复。
-
注册分区。Topic有或许是分区主题,音讯会被发往不同的Broker节点,为了让TC知道音讯会发送到哪些节点(后续业务提交、回滚时TC需求告诉这些节点),Producer在发送音讯之前,会先往TC上注册分区信息。这样一来,后续TC就知道要告诉哪些节点的RM来提交、回滚业务了。
-
发送音讯。这一步和一般的音讯发送没有太大的差异,不过音讯需求先经过每个Broker上的RM,Pulsar中RM被界说为TopicTransactionBuffer,RM里边会记载一些元数据,最终音讯仍是会被写入原始的Topic中。此刻虽然音讯现已被写入了原始Topic,但顾客是不可见的,Pulsar中的业务隔离级别是Read Commit。
-
提交业务。Producer发送完一切的音讯后,提交业务,TC会收到提交恳求后,会播送告诉RM节点提交业务,更新对应的元数据,让音讯能够被顾客消费。
Setp-4中的音讯是怎么确保耐久化到Topic中又不可见的呢?
每个Topic中都会保存一个maxReadPosition属性,用来标识当前顾客能够读取的最大方位,当业务还未提交之前,虽然数据现已耐久化到Topic中,可是maxReadPosition是不会改动的。因而Consumer无法消费到未提交的数据。
音讯现已耐久化了,最终业务要回滚,这部分数据怎么处理?
假如业务要回滚,RM中会记载这个业务为Aborted状态。每条音讯的元数据中都会保存业务的ID等信息,Dispatcher中会依据业务ID判别这条音讯是否需求投递给Consumer。假如发现业务现已完毕,则直接过滤掉(内部承认掉音讯)。
最终提交业务时假如部分成功、部分失利,怎么处理?
TC中有一个名为TransactionOpRetryTimer的守时目标,一切未悉数成功播送的业务都会交给它来重试,直到一切节点终究悉数成功或超过重试次数。那这个过程不会呈现一致性问题吗?首要咱们想想,呈现这种状况的场景是什么。通常是某些Broker节点宕机导致这些节点不可用,或是网络抖动导致暂时不可达。在Pulsar中假如呈现Broker宕机,Topic的归属是会搬运的,除非整个集群不可用,否则总是能够找到一个新的Broker,经过重试来解决。在Topic归属搬运过程中,maxReadPosition没有改动,顾客也消费不到音讯。即便整个集群不可用,后续等到集群康复后,Timer仍是会经过重试让业务提交。
假如业务未完成,会堵塞一般音讯的消费吗?
会。假定咱们敞开业务,发送了几条业务音讯,可是并未提交或回滚业务。此刻持续往Topic中发送一般音讯,因为业务音讯一直没有提交,maxReadPosition不会变化,顾客会消费不到新的音讯,会堵塞一般音讯的消费。这是契合预期的行为,为了确保音讯的顺序。而不同Topic之间不会相互影响,因为每个Topic都有自己的maxReadPosition。
业务的完成
咱们能够把业务的完成分为五部分:环境、TC、出产者RM、顾客RM、客户端。因为出产和消费资源的办理是分隔的,因而咱们会别离介绍。
环境设定
业务协调者的设置,需求从Pulsar集群的初始化时开端,咱们在第一章中有介绍怎么建立集群,第一次需求履行一段指令,初始化ZooKeeper中的集群元数据。此刻,Pulsar会主动创立一个SystemNamespace,并在里边创立一个Topic,完好的Topic如下所示:
persistent://pulsar/system/transaction_coordinator_assign
这是一个PartitionedTopic,默许有16个分区,每个分区就是一个独立的TC。咱们能够经过–initial-num-transaction-coordinators参数来设置TC的数量。
TC与RM
接下来,咱们看看服务端的业务组件,如下图所示:
-
TransactionMetadataStoreService 是Broker上业务的整体协调者,咱们能够以为它是TC。
-
TransactionMetadataStore 被TC用来保存业务的元数据,如:新创立的业务,Producer注册上来的分区。这个接口有两个完成类,一个是把数据保存到Bookkeeper的完成,别的一个则直接把数据保存在内存中。
-
TransactionTimeoutTracker 服务端用于追寻超时的业务。
-
各种Provider,它们都归于工厂类,无需特别重视。
-
TopicTransactionBuffer 出产者的RM,当业务音讯被发送到Broker,RM作为署理会记载一些元数据,然后把音讯存入原始Topic。内部包含了TopicTransactionBufferRecover和TransactionBufferSnapshotService,RM的元数据会被结构化为快照并守时刷盘,这两个目标别离担任快照的康复和快照的保存。因为出产音讯是以Topic为单位,因而每个Topic/Partition都会有一个。
-
PendingAckHandle 顾客的RM,因为消费是以订阅为单位的,因而每个订阅都有一个。
因为线上环境通常会运用耐久化的业务,因而下面的原理都依据耐久化完成。
一切业务相关的服务,在BrokerService启动时会初始化。TC主题中,每个Partition都是一个Topic,TransactionMetadataStoreService在初始化时,会依据当前Broker纳管的TC Partition,从Bookkeeper中康复之前耐久化的元数据。每个TC会保存以下元数据:
-
newTransaction。新建一个业务,回来一个仅有的业务ID目标。
-
addProducedPartitionToTxn。注册出产者要发送音讯的Partition信息,用于后续TC告诉对应节点的RM提交/回滚业务。
-
addAckedPartitionToTxn。注册顾客要消费音讯的Partition信息,用于后续TC告诉对应节点的RM提交/回滚业务。
-
endTransaction。完毕一个业务,能够是提交、回滚或许超时等。
咱们在初始化PulsarClient时,假如设置了enableTransaction=true,则Client初始化时,还会额定初始化一个TransactionCoordinatorClient。因为TC的Tenant、Namespace以及Topic称号都是固定的,因而TC客户端能够经过Lookup发现一切的Partition信息并缓存到本地,后续Client创立业务时,会轮询从这个缓存列表中选取下一个业务要运用的TC。
Producer业务办理
接下来咱们会敞开一个业务:
// 创立业务
Transaction txn = pulsarClient
.newTransaction()
.withTransactionTimeout(1, TimeUnit.MINUTES)
.build()
.get();
上面这段代码中,会发送一个newTxn给某个TC,并得到一个Transaction目标。
敞开业务时,TransactionCoordinatorClient会从缓存中选取一个TC,然后往选定的TC地点的Broker发送一个newTxn指令,指令的结构界说如下所示:
message CommandNewTxn {
required uint64 request_id = 1;
optional uint64 txn_ttl_seconds = 2 [default = 0];
optional uint64 tc_id = 3 [default = 0];
}
因为指令中包含了TCID,因而即便多个TC被同一个Broker纳管也没有问题。Broker会依据TCID找到对应的TC并处理恳求。
Producer发送音讯之前,会先发送一个AddPartitionToTxn指令给Broker,只有成功后,才会持续发送实在的音讯。业务音讯抵达Broker后,传递给TransactionBuffer进行处理。期间Broker必定会对音讯进行去重校验,经过校验后,数据会保存到TransactionBuffer里,而TransactionBuffer只是一个署理(会保存一些元数据),它终究会调用原始Topic保存音讯,TransactionBuffer在初始化时,结构方法需求传入原始Topic目标。咱们能够把TransactionBuffer看作是Producer端的RM。
TransactionBuffer中会保存两种信息,一种是原始音讯,直接运用Topic保存。别的一种是快照,快照中保存了Topic称号,最大可读方位信息(避免Consumer读到未提交的数据)、该Topic中现已中止(aborted)的业务列表。
其中,中止的业务,是由TC播送奉告其他Broker节点的,TransactionBuffer接到信息后,会直接在原始Topic中写入一个abortMarker,符号业务现已中止,然后更新内存中的列表。abortMarker也是一条一般的音讯,可是音讯头中的元数据和一般音讯不一样。这些数据保存在快照中,主要是为了Broker重启后数据能快速康复。假如快照数据丢掉,TopicTransactionBufferRecover会从尾到头读取Topic中的一切数据,每遇到一个abortMarker都会更新内存中的中止列表。假如有了快照,咱们只需求从快照处的起点开端读即可康复数据。
Consumer业务办理
顾客需求在音讯承认时带上业务目标,标识运用业务Ack:
\consumer.acknowledge(message, txn);
服务端每个订阅都有一个PendingAckHandle目标用于办理业务Ack信息,咱们能够以为它是办理顾客数据的RM。当Broker发现音讯承认恳求中带有业务信息,则会把这个恳求转交给对应的PendingAckHandle处理。
一切敞开了业务的音讯承认,不会直接修改游标上的MarkDeleted方位,而是先耐久化到一个额定的Ledger中,Broker内存中也会缓存一份。这个Ledger由pendingAckStore办理,咱们能够以为是Consumer RM的日志。
业务提交时,RM会调用顾客对应的Subscription,履行方才一切的音讯承认操作。一起,也会在日志Ledger中写入一个特别的Marker,标识业务需求提交。在业务回滚时,也会先在日志中记载一个AbortMarker,然后触发Message从头投递。
pendingAckStore中保存的日志是redo log,该组件在初始化时,会先从日志Ledger中读取一切redo log,然后在内存中重建从前的音讯承认信息。因为音讯承认是幂等操作,假如Broker不慎宕机,只需求把redo log中的操作从头履行一遍。当订阅中的音讯被真实承认掉后,pendingAckStore中对应的redo log也能够被整理了。整理方法很简单,只需求移动pendingAckStore中Ledger的MarkDelete方位即可。
再谈TC
一切的业务提交、回滚,因为Client端奉告TC,或许因为超时TC主动感知。TC的日志中保存了Producer的音讯要发往哪些Partition,也保存了Consumer会Ack哪些Partition。RM涣散在每个Broker上,记载了整个业务中发送的音讯和要承认的音讯。当业务完毕时,TC则以TCID为key,找到一切的元数据,经过元数据得知需求告诉哪些Broker上的RM,最终发起播送,告诉这些Broker上的RM,业务需求提交/回滚。
结尾
Pulsar中的规划细节十分多,因为篇幅有限,作者会整理一系列的文章进行技术共享,敬请期待。假如各位期望体系性地学习Pulsar,能够购买作者出版的新书《深化解析Apache Pulsar》。
one more thing
现在,腾讯云音讯行列 TDMQ Pulsar版(TDMQ for Pulsar,简称 TDMQ Pulsar 版)已开端正式商业化。音讯行列 Pulsar 版是一款依据 Apache Pulsar 自研的音讯中间件,具有极好的云原生和 Serverless 特性,兼容 Pulsar 的各个组件与概念,具有核算存储分离,灵敏扩缩容的底层优势。
各位想要了解的请点击**官网**。