前言
从服务端来说,rocketmq5.x的布置方法和之前有所不同,新增了rocketmq-proxy人物。详细布置方法,参阅官方文档(rocketmq.apache.org/docs/deploy…)。
从客户端来说,官方主张运用新版grpc轻量级客户端(github.com/apache/rock…),本来的私有remoting协议将作为服务端内部协议运用,详细兼容性问题见官方文档(rocketmq.apache.org/docs/sdk/01…)。
在前面的几个章节,我运用的都是remoting客户端+nameserver+broker的布置方法,其实兼容性没有什么大问题。
新架构之下,依靠两个重要特性,之前已经依据remoting协议做了铺垫(服务端内部运用相同的协议,不影响了解新架构):
本文首要剖析,grpc客户端+proxy下一般音讯的收发流程,broker侧逻辑将疏忽。
注:本文服务端依据rocketmq 5.1.1版别,客户端依据rocketmq-clients java-5.0.5版别;
一、事例
rocketmq-proxy
装备文件rmq-proxy.json:
- namesrvAddr:NameServer地址,分号切割;
- proxyMode:cluster形式;
- rocketMQClusterName:方针broker集群名称(proxy是集群等级的);
- remotingListenPort:remoting协议端口,默许8080;
- grpcServerPort:grpc协议端口,默许8081;
- useEndpointPortFromRequest:查询路由,是否运用入参endpoint端点的端口;
- enableACL:是否敞开acl(client和proxy之间),这儿为了阐明proxy职责规模,所以敞开了;
{
"namesrvAddr": "127.0.0.1:9876",
"proxyMode": "cluster",
"rocketMQClusterName": "DefaultCluster",
"remotingListenPort": 8080,
"grpcServerPort": 8081,
"useEndpointPortFromRequest": true,
"enableACL": true
}
本地发动vm参数:
-Dcom.rocketmq.proxy.configPath=rmq-proxy.json
-Drocketmq.client.localOffsetStoreDir=/tmp/rmqstore/proxy1/local-offset
-Drocketmq.home.dir=path/to/rocketmq_home
出产者
运用出产者发送简略音讯。
// 1. 经过SPI加载客户端完成
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 2. ACL 设置ak/sk
String accessKey = "RocketMQ";
String secretKey = "12345678";
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);
// 3. 设置proxy端点,分号切割
String endpoints = "127.0.0.1:8081;127.0.0.1:8091";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints) // proxy端点,分号切割
.setRequestTimeout(Duration.ofDays(1)) // 恳求超时时刻
.setCredentialProvider(sessionCredentialsProvider) // ak sk
.build();
String topic = "MyTopic";
// 4. 创立producer
final Producer producer = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.setTopics(topic)
.build();
byte[] body = "Hello".getBytes(StandardCharsets.UTF_8);
String tag = "TagA";
// 5. 创立音讯
final Message message = provider.newMessageBuilder()
.setTopic(topic)
.setTag(tag)
.setKeys("123")
.setBody(body)
.build();
while (true) {
try {
Thread.sleep(1000);
// 6. 发送音讯
final SendReceipt sendReceipt = producer.send(message);
log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (Throwable t) {
log.error("Failed to send message", t);
break;
}
}
// 7. 封闭出产者
producer.close();
顾客
一般Push顾客。
// 1. 经过SPI加载客户端完成
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 2. ACL 设置ak/sk
String accessKey = "RocketMQ";
String secretKey = "12345678";
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);
// 3. 设置proxy端点,分号切割
String endpoints = "127.0.0.1:8081;127.0.0.1:8091";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.setCredentialProvider(sessionCredentialsProvider)
.build();
// 4. 创立Push顾客
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
String consumerGroup = "groupA";
String topic = "MyTopic";
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// 消费组
.setConsumerGroup(consumerGroup)
// 订阅
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
// 业务逻辑
log.info("Consume message={}", messageView);
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
pushConsumer.close();
二、客户端概览
客户端完成
rocketmq-client-apis模块界说了producer、consumer、message笼统api接口。
ClientServiceProvider是一切客户端api进口,loadService经过JDK SPI加载ClientServiceProdiver完成。
现在ClientServiceProvider完成只要一个ClientServiceProviderImpl在rocketmq-client-java-noshade模块中。
ClientServiceProvider结构出来的客户端:
- producer有一种:ProducerImpl;
- consumer有两种:
-
- PushConsumerImpl:PUSH形式;
- SImpleConsumerImpl:疏忽;
RPC客户端
往往一个进程中既有出产者又有顾客。
MQClientManager#getOrCreateMQClientInstance:
传统remoting协议客户端(DefaultMQProducer/DefaultMQPushConsumer等)默许instanceName都是DEFAULT,终究底层都会共用一个netty客户端MQClientInstance(MQClientAPIImpl)。
新版客户端,ProducerImpl和ConsumerImpl都承继自ClientImpl。
ClientImpl中各自会保护一个ClientManagerImpl实例。
每个ClientManagerImpl依照proxy端点Endpoints纬度,单独保护RpcClient。
即grpc出产和顾客实例独立保护一套底层rpc客户端。
在RpcClient的完成是RpcClientImpl。
RpcClientImpl结构操作grpc api构建ManagedChannel和Stub。
这儿需求留意,grpc客户端负载均衡战略并没有指定(defaultLoadBalancePolicy),默许会取pick_first,挑选一个可用的address树立长衔接,即除非出现问题,将只与这个proxy通讯。
这意味着,经过简略ip列表指定proxy,客户端无负载均衡。
如下面单元测试:虽然地址列表有多个address,终究传入grpc api的端点是ipv4:127.0.0.1:8080,127.0.0.2:8081,由grpc客户端经过pick_first战略挑选一个可用proxy树立长衔接。
@Test
public void testEndpointsWithMultipleIpv4() {
final Endpoints endpoints = new Endpoints("127.0.0.1:8080;127.0.0.2:8081");
Assert.assertEquals(AddressScheme.IPv4, endpoints.getScheme());
Assert.assertEquals(2, endpoints.getAddresses().size());
final Iterator<Address> iterator = endpoints.getAddresses().iterator();
final Address address0 = iterator.next();
Assert.assertEquals("127.0.0.1", address0.getHost());
Assert.assertEquals(8080, address0.getPort());
final Address address1 = iterator.next();
Assert.assertEquals("127.0.0.2", address1.getHost());
Assert.assertEquals(8081, address1.getPort());
Assert.assertEquals(AddressScheme.IPv4, endpoints.getScheme());
Assert.assertEquals("ipv4:127.0.0.1:8080,127.0.0.2:8081", endpoints.getFacade());
Assert.assertEquals("ipv4:127.0.0.1:8080,127.0.0.2:8081", endpoints.getGrpcTarget());
}
Endpoints也能够经过域名结构,在服务端完成负载均衡,如nginx。
终究传入grpc api的端点是rocketmq.apache.org:80。
@Test
public void testEndpointsWithDomain() {
final Endpoints endpoints = new Endpoints("rocketmq.apache.org");
Assert.assertEquals(AddressScheme.DOMAIN_NAME, endpoints.getScheme());
final Iterator<Address> iterator = endpoints.getAddresses().iterator();
final Address address = iterator.next();
Assert.assertEquals("rocketmq.apache.org", address.getHost());
Assert.assertEquals(80, address.getPort());
Assert.assertEquals("dns:rocketmq.apache.org:80", endpoints.getFacade());
Assert.assertEquals("rocketmq.apache.org:80", endpoints.getGrpcTarget());
}
详细逻辑见org.apache.rocketmq.client.java.route.Endpoints结构。
三、路由
1、proxy路由办理
TopicRouteService:proxy侧路由办理服务。
结构时界说topic路由本地缓存(caffeine):
- 缓存20s后主动改写;
- 运转阶段,假如topic缓存不存在,从nameserver加载;
- 运转阶段,假如nameserver回来topic不存在,本地缓存一个空方针WRAPPED_EMPTY_QUEUE;
TopicRouteCacheLoader经过remoting协议与nameserver通讯。
本来nameserver要为n个客户端供给路由服务,现在只需求为m个proxy供给路由服务。
往往m<n吧,proxy能减轻nameserver压力。
可是现在client和proxy都有本地缓存。
2、客户端查询路由
每个producer和consumer实例针对自己关心的topic,单独缓存一份路由数据。
注:传统客户端出产和消费路由混合在一个MQClientInstance实例里。
ClientImpl#startUp:
- 支撑发动阶段主动加载路由(producer设置topic、consumer发动前订阅topic),假如改写路由失利,发动失利;(运转阶段新增topic懒加载)
- 后期每隔30s改写路由缓存;
ClientImpl#fetchTopicRoute0:客户端发送topic+proxy的endpoints。
当然运转阶段,假如客户端新增topic,也会懒加载路由。
比方producer在运转阶段发送新topic,consumer运转阶段新增订阅topic。
路由在新版客户端中是十分重要的,一切动作都离不开路由,包括心跳。
3、proxy回来路由
RouteActivity#queryRoute:查询路由
- 入参模型转化;
- 查询;
- 出参模型转化;
RouteActivity#convertToAddressList:
客户端会带着自己装备的proxy端点进来,在broker侧会对端点的adress地址列表做转化。
默许情况下,host不变,port会修改为当时proxy实例装备的grpcServerPort。
可是假如经过端口转化(各类负载均衡,如nginx、k8s),能够经过设置useEndpointPortFromRequest=true,选用客户端传入port。
ClusterTopicRouteService#getTopicRouteForProxy:Cluster形式下
- 查询topic对应路由信息,先走本地caffeine缓存,cache miss走nameserver;
- 一切broker地址,都替换为客户端传入endpoint,即rocketmq-proxy实例地址;
留意,这儿一切broker地址都是客户端传入endpoints,很重要,由于一个客户端实例只会对应一个Endpoints。
出参是一组MessageQueue。
- topic;
- id:queueId;
- permission:读写权限;
- broker:broker名、地址(proxy地址)等;
- MessageType:5.x多了MessageType,在topic纬度界说音讯类型,分红四类:一般NORMAL、次序FIFO、推迟DELAY、业务TRANSACTION。
四、心跳
1、客户端
ClientManagerImpl#startUp:
每个客户端实例(每个producer和consumer实例)每隔10s向proxy发送独立心跳。
注:传统客户端producer和consumer的instanceName相同,每隔30s会发送一个心跳(包括producer和consumer信息) 给一切相关master broker。
ClientImpl#doHeartbeat:
值得留意的是,心跳包方针端点需求从路由数据中获取,即心跳依靠路由。
而路由中的endpoints是客户端查路由传入的endpoints(见路由部分),所以理论上只要一个。
grpc客户端的心跳包HeartbeatRequest是十分单纯的心跳包:
- group:消费组(PRODUCER不传);
- client_type:客户端类型,PRODUCER/PUSH_CONSUMER/SIMPLE_CONSUMER;
注:传统客户端的心跳包HeartbeatData有许多数据,包括consumer的订阅联系。
message HeartbeatRequest {
optional Resource group = 1;
ClientType client_type = 2;
}
ClientImpl#doHeartbeat:客户端侧多了一个阻隔逻辑,假如心跳成功,endpoints将从阻隔集合中移除,这与producer相关。
2、proxy端
ClientActivity#heartbeat:
proxy端处理心跳,依靠于一个新概念Settings。(后边再看)
关于producer和consumer履行不同的注册逻辑。
GrpcClientSettingsManager#getClientSettings:
从缓存CLIENT_SETTINGS_MAP中获取clientId对应Settings,依照producer或consumer别离兼并回来。
注:每个客户端恳求都会在header里传入一些信息,其间包括clientId,见客户端Signature#sign。
producer注册
ClientActivity#heartbeat:producer注册,循环settings中的topic。
ClientActivity#registerProducer:grpc模型适配remoting协议模型
- GrpcClientChannel,承继netty的AbstractChannel;
- ClientChannelInfo,和remoting协议的模型共同;
注:业务音讯逻辑后边再说。
org.apache.rocketmq.broker.client.ProducerManager#registerProducer:
终究走的是broker模块的ProducerManager,即和broker原始逻辑共同,将ClientChannelInfo放入内存table。
需求留意的是,producer自身心跳恳求不包括group,这儿传入的group是Settings中的topic,弦外之音,producer没有出产组概念了。
也能了解,5.x在topic纬度设置了音讯类型TRANSACTIONAL,直接将topic作为producerGroup。
consumer注册
ClientActivity#heartbeat:
consumer注册,依靠于Settings中的订阅联系,可是updateSubscription=false,不更新订阅联系。
ClientActivity#registerConsumer:相同适配remoting协议模型。
ClusterConsumerManager#registerConsumer:
终究consumer注册走ClusterConsumerManager。
和producer相似,底层逻辑走的也是broker的ConsumerManager,将consumer信息保存到内存table,可是consumer多了HeartbeatSyncer。
ConsumerManager#registerConsumer:
此处仅更新心跳时刻,不会更新订阅联系。
HeartbeatSyncer#onConsumerRegister:向系统topic发送一条心跳同步音讯。
音讯包括:
- ClientChannelInfo,客户端信息,包括clientId;
- consumerGroup,消费组;
- localProxyId,当时proxy的标识(本机ip+remoting端口+grpc端口);
- RemoteChannel,经过GrpcClientChannel转化而来,包括当时grpc客户端与proxy的衔接信息,比方双方address等;
- subscriptionDataSet,消费组订阅联系;
AbstractSystemMessageSyncer#sendSystemMessage:
这个系统topic默许为heartbeatSyncerTopicName=DefaultHeartBeatSyncerTopic。
proxy侧,依照broker纬度做负载均衡(onlyBroker=true),queueId都是-1;
broker侧,会依照可写queue数量做负载均衡(随机)。
consumer心跳同步
传统客户端,循环向一切broker发送心跳包。
proxy客户端,只会向一个proxy实例发送心跳包。
proxy之间需求彼此同步consumer心跳。
AbstractSystemMessageSyncer#start:
proxy发动阶段会发动一个广播顾客,订阅topic=DefaultHeartBeatSyncerTopic。
注:所以proxy也不是彻底无状态,由于广播消费进展存储在本地。
AbstractSystemMessageSyncer#createSysTopic:
proxy发动会判别心跳同步topic是否存在,假如不存在,会在指定cluster下创立,
cluster下每个broker一个queue。
HeartbeatSyncer#consumeMessage:消费心跳同步音讯
- 反序列化;
- 判别localProxyId,假如是自己发送的音讯,疏忽;
- 履行consumer注册逻辑;
ConsumerManager#registerConsumer:数据同步需求更新订阅联系。
HeartbeatSyncer#onConsumerRegister:数据同步不需求再次发送心跳同步音讯。
心跳超时
ClusterServiceManager#init:心跳超时由proxy扫描。
producer超时时刻写死2分钟;
consumer超时时刻默许2分钟,能够经过channelExpiredTimeout装备;
超时逻辑与broker原始逻辑大致共同,超时从内存table中移除producer或consumer。
不同的是,直连proxy的grpc客户端衔接,不会由于扫描producer/consumer心跳超时而断开。
SimpleChannel#close:直连proxy的GrpcClientChannel和同步proxy的RemoteChannel都承继自SimpleChannel,close办法什么都不做。
GrpcServerBuilder:
grpc客户端衔接是否断开,取决于grpc服务端装备的空闲时长grpcClientIdleTimeMills=120s。
五、Settings
1、模型
从客户端侧来看,settings依照客户端类型(ClientType)分为三类:
- PublishingSettings:PRODUCER;
- PushSubscriptionSettings:PUSH_CONSUMER;
- SimpleSubscriptionSettings:SIMPLE_CONSUMER(pull);
基类Settings,包括客户端类型,ClientId等属性。
ClientId由hostname+pid+index+startTime四部分组成,是客户端的仅有标识。
PublishingSettings:producer的Settings包括发布的topic。
PushSubscriptionSettings:Push顾客Settings
- group:消费组;
- subscriptionExpressions:订阅联系,key是topic;
- fifo:是否次序消费;
- receiveBatchSize:批处理大小;
- longPollingTimeout:长轮询超时时刻,30s;
2、客户端发送Settings
ClientManagerImpl#startUp:
每隔5分钟,producer/consumerImpl与proxy履行settings同步。
ClientImpl#syncSettings:
- 封装Settings到TelemetryCommand;
- 从路由搜集proxy的endpoints;
- 发送TelemetryCommand到proxy;
ClientSessionImpl:proxy的endpoints会对应一个单例ClientSessionImpl。
初次发送TelemetryCommand,客户端与proxy会树立一个grpc bistream双向流。
ClientSessionImpl#write:操作grpc api向proxy发送Settings。
3、proxy收到Settings
ClientActivity#processAndWriteClientSettings:
不管是producer仍是consumer的Settings过来,都会先履行一次注册逻辑。
所以心跳(注册/续期)依靠Settings,Settings依靠注册/续期。
承认注册完成后,将输出流注入GrpcClientChannel。
留意,consumer的订阅联系,会在Settings同步中更新,而不是在心跳部分。
ClientActivity#processAndWriteClientSettings:
注册完成后,proxy会对settings做处理,再写回客户端。
ClientActivity#processClientSettings:处理Settings
- updateClientSettings:将客户端Settings放入内存table(GrpcClientSettingsManager.CLIENT_SETTINGS_MAP);
- getClientSettings:兼并客户端Settings和proxy装备;
- 回来兼并后的Settings;
GrpcClientSettingsManager#mergeProducerData:
关于producer的Settings,用proxy装备掩盖,比方音讯体大小限制等。
GrpcClientSettingsManager#mergeSubscriptionData:
关于consumer的Settings,用proxy的装备和broker的订阅组装备(caffeine缓存)掩盖。
4、客户端收到Settings
ClientSessionImpl#onNext:
客户端收到兼并后的Settings。
ClientImpl#onSettingsCommand:producer和consumer履行不同的兼并逻辑。
PublishingSettings#sync:
producer兼并proxy回来的Settings。
PushSubscriptionSettings#sync:
push consumer相似。
六、一般音讯发送
1、客户端发送
ProducerImpl#send:发音讯,疏忽前置各种校验
- 获取路由:一组可写MessageQueue,broker地址都是proxy端点;
这儿和传统客户端不相同,传统客户端能够获取topic=TBW102作为默许路由,主动创立topic,可是新客户端不支撑,所以新客户端必定要手动创立topic;
- 选行列:其实这儿选哪个都不太重要,由于树立grpc长衔接的proxy只要一个,后续在proxy侧会做实践负载均衡,这儿首要是为了拿到路由回来的endpoints;
- send0:调用proxy发送音讯;
ProducerImpl#send0:模型转化,发送proxy。
注:messageId仍是由客户端生成,见Message模型PublishingMessageImpl结构。
2、proxy发送
SendMessageActivity#sendMessage:模型转化。
ProducerProcessor#sendMessage:
- 依据topic路由选queue;
- 调用queue对应broker,发送音讯;
SendMessageQueueSelector#select:proxy担任发送音讯queue等级负载均衡
- 一般音讯,从可写行列里,轮询选一个queue;
- 次序音讯,依据messageGroup.hashCode,从可写行列里选一个queue;
3、客户端重试
假如客户端发送音讯失利,依照战略履行重试。
ProducerImpl#send0:
- 阻隔endpoints,直到proxy能正确处理心跳;
- 超出最大重试次数3(proxy指定,经过Settings同步),失利;
- 业务音讯,快速失利;
- 假如由于服务端流控(TooManyRequests)失利,依照指数退避战略(proxy指定,经过Settings同步),推迟10ms、20ms、40ms重试;
- 其他,当即重试;
七、一般音讯消费(Push)
1、概览
POP消费概览
新架构之下,消费流程都遵循POP消费形式( POP消费源码剖析 ) :
- queryAssignment:查询Assignment。Push顾客需求,Simple顾客不需求;
- receiveMessage:拉音讯;
- 处理音讯:
- ackMessage:消费成功,ack;
- changeInvisibleTime:消费失利,nack,过n时长后重试;
新版客户端
从新版客户端的线程模型上来看,相较于传统客户端简略了许多,分为三组线程:
回顾broker
回顾POP消费形式下,broker关于pop消费的处理逻辑。
queryAssignment
以4.x的角度来说,这阶段也是客户端rebalance。
办法进口:QueryAssignmentProcessor#queryAssignment。
broker默许为每个consumer实例分配一切queue,仅仅queueId=-1。
(broker侧默许装备defaultPopShareQueueNum=-1的行为)
拉音讯
pop拉音讯特色:
- 客户端针对每个master broker提交一个Pop恳求,而恳求中未清晰指定queueId,也未清晰指定消费进展;
- broker需求担任挑选queue和消费进展;
- 关于一次pop恳求,broker需求记录checkpoint信息,topic=rmq_sys_REVIVE_LOG_{clusterName},queueId=random(8),tag=ck,要运用timer音讯,方针投递时刻戳=pop恳求时刻+invisibleTime(60s)-1s;
- 回来用户音讯的properties中会追加checkpoint信息,用于后续ack和changeInvisibleTime;
处理音讯
pop处理音讯特色:
- 客户端做tag准确匹配(broker只能依据consumequeue记录的hascode过滤一遍),假如不匹配,直接异步ack不匹配音讯;
- 不管消费成功仍是失利,都需求回复broker,ack代表消费成功,changeInvisibleTime代表消费失利,需求重试,重试时刻由consumer定夺;
- broker关于ack恳求,发送一条ack音讯,topic=rmq_sys_REVIVE_LOG_{clusterName},queueId=checkpoint音讯所在queueId,tag=ack,要运用timer音讯,方针投递时刻戳=pop恳求时刻+invisibleTime(60s) ;
- broker关于changeInvisibleTime恳求,先创立一个新的checkpoint音讯,方针投递时刻戳=当时时刻+invisibleTime(客户端指定);然后针对本次pop恳求发送一个ack音讯;
consumer消费进展提交
broker发动PopBufferMergeService线程扫描缓存的checkpoint。
承认ck音讯发送成功,对这些用户音讯主动提交offset。
重试
broker发动n个PopReviveService线程,消费revive topic的n个行列。
假如ck音讯超过invisibleTime未匹配ack音讯,从头投递用户音讯到重试topic。
2、queryAssignment
Assignments模型
从领域上来说,Assignments也被归入路由,只针对PUSH_CONSUMER有用。
从客户端来看,Assignments本质上是n个MessageQueue。
客户端查询Assignments
PushConsumerImpl#startUp:
PUSH_CONSUMER客户端每隔5秒,改写本地缓存Assignments。
PushConsumerImpl#scanAssignments:
循环每个订阅topic,调用proxy查询Assignments。
PushConsumerImpl#queryAssignment:
- 从topic路由中搜集endpoints;
- 恳求包括:topic+endpoints+group;
TopicRouteData#pickEndpointsToQueryAssignments:从一切queue中选一个queue的endpoints,要求:
- master上线;
- 有权限;
proxy回来Assignments
RouteActivity#queryAssignment:proxy查询Assignment
- 将入参endpoints转化为地址列表,和路由查询共同;
- 查询路由,和路由查询共同;
- 出参转化;
留意:proxy没有将查询Assignment恳求代理到broker,假如运用传统客户端敞开pop消费,查询Assignment逻辑走的是broker,见POP消费源码剖析。
RouteActivity#queryAssignment:出参转化是重点
- 挑选可读行列;
- 选master broker,与路由同理,地址是入参的endpoints,即proxy地址;
- 行列id都设置为-1(由broker做queue负载均衡);
从成果上来看,终究回来客户端的Assignments只包括topic下master broker数量个queueId=-1的MessageQueue,且地址都为proxy端点。
客户端处理Assignments
PushConsumerImpl#scanAssignments:
- 更新topic对应缓存Assignments;
- 更新ProcessQueue;
PushConsumerImpl#syncProcessQueue:
依照Assignment指示:
删去queue,标记ProcessQueue为drop,从table中移除。
新增queue,依照proxy回来,一个topic,一个master broker会有一个ProcessQueue,queueId=-1,关于每个queue履行fetchMessageImmediately当即开端拉音讯。
3、receiveMessage
客户端拉音讯
至此,consumer针对每个ProcessQueue向proxy建议拉音讯恳求。
ProcessQueueImpl#receiveMessageImmediately:
- 决策本次拉取音讯数量;
- 向proxy建议拉音讯恳求,scheduler线程切rpc线程;
ProcessQueueImpl#getReceptionBatchSize:
拉取音讯数量,包括必定流控逻辑,不超过32条(Settings同步由proxy确认)。
proxy处理拉音讯
proxy向broker建议长轮询
ReceiveMessageActivity#receiveMessage:proxy处理receiveMessage恳求进口
- actualInvisibleTime:pop消费的invisibleTime,proxy操控,60s;
- pollingTime:proxy与broker之间长轮询挂起时刻,proxy操控,20s;
ConsumerProcessor#popMessage:先选行列,再向broker建议长轮询。
ReceiveMessageQueueSelector#select:基本上归于透传。
broker侧直接取consumer指定的broker的queue,queueId相同等于-1;
之后由broker做pop消费负载均衡(随机选一个queue)。
ConsumerProcessor#popMessage:
proxy侧,协议转化,向broker建议长轮询。
broker回来音讯
broker侧,pop消费处理逻辑见POP消费源码剖析。
重点是,broker将checkpoint信息放在回来的每条message的properties的POP_CK里。
包括ack/unack音讯需求的必要信息,比方queueId、offset等信息。
proxy收到broker回来音讯
ConsumerProcessor#popMessage:
tag准确过滤逻辑放在了proxy侧,不匹配的音讯会直接发送ack给broker。
MessagingProcessor#popMessage:
- push消费,有一个renew逻辑(autoRenew=true,默许敞开),将checkpoint信息(receiptHandle)缓存;(后边再看,这段逻辑没有也不影响流程)
- 将呼应写回客户端;
ReceiveMessageResponseStreamWriter#writeAndComplete:写回客户端。
能够看到拉音讯走的是ServerStream,proxy会按次序写三种包呼应客户端:
- STATUS:1个状态包,标识本次receiveMessage呼应是否成功;
- MESSAGE:0-n个音讯包,即拉到的音讯;
- DELIVERY_TIMESTAMP:1个呼应时刻戳包,这个首要是metrics收集用,能够疏忽;
proxy侧,checkpoint信息被转化为ReceiptHandle。
客户端收到音讯
RpcClientImpl#receiveMessage:客户端收到proxy呼应。
ConsumerImpl#receiveMessage:proto模型转化。
ProcessQueueImpl#onReceiveMessageResult:ProcessQueue处理拉到的音讯
- 缓存音讯;
- 提交音讯到消费线程;
- 再次提交receiveMessage恳求到rpc线程,形成循环;
StandardConsumeService#consume:一般并发消费(非次序)
- 每条音讯别离丢到消费线程池(20线程)履行;
- 处理消费成果;
ProcessQueueImpl#eraseMessage:
消费成果分红两种情况,成功-ack,失利-nack。
4、ack
客户端
ProcessQueueImpl#ackMessage:
客户端ack有重试逻辑,假如ack呼应失利,推迟1s重试,无限次数重试。
ConsumerImpl#wrapAckMessageRequest:客户端会带着ReceiptHandle建议ack。
proxy
AckMessageActivity#processAckMessage:
- 移除缓存句柄(归于renew逻辑,后边看);
- 履行ack;
ConsumerProcessor#ackMessage:
校验句柄未过期(默许invisibleTime=60s,60s收到音讯没ack,句柄就失效)。
转化remoting协议,恳求broker,ReceiptHandle便是broker收到的ExtraInfo。
5、unack
客户端
ProcessQueueImpl#nackMessage:unack便是POP消费剖析的changeInvisibleTime。
- 不行见时刻由proxy经过settings同步回来,从1s到7200s。
- changeInvisibleTime恳求也支撑重试,推迟1s,无限次数。
ConsumerImpl#wrapChangeInvisibleDuration:
恳求包括group、topic、ReceiptHandle、不行见时刻。
proxy
ChangeInvisibleDurationActivity#changeInvisibleDuration:
proxy侧,和ack逻辑相似,仅仅调用broker办法不同。
6、renew
renew是proxy独有的逻辑。
假如consumer收到音讯后,长时刻不回来消费成果(ack/nack),proxy将主意向broker建议changeInvisibleTime(nack) 。
至于为什么这么做,我猜测是这样。假设下面这种场景:
- consumer-proxy-broker之间能正常通讯;
- 由于consumer消费慢,broker超时(invisibleTime)未收到消费成果(ack或nack都行);
- broker会持续向重试topic投递音讯;
重试音讯将进一步加重consumer的音讯积压。
传统非pop消费形式,是否sendMessageBack重试,彻底取决于客户端,不会有这种问题。
ReceiptHandleProcessor:担任办理缓存的receiptHandle(checkpoint)
ReceiptHandleProcessor只要两个public办法:
- receiveMessage时,缓存receiptHandle;
- ack/unack时,清除缓存的receiptHandle;
ReceiptHandleProcessor#init:
发动阶段,敞开5s守时使命,扫描缓存handle。
ReceiptHandleProcessor#scheduleRenewTask:
- clientIsOffline:假如channel+group找不到consumer心跳,代表consumer失联,移除consumer下一切handle,对这些handle履行renew;
- 假如音讯即将被从头投递,履行renew;是否即将从头被投递,取决于pop时刻、invisible时刻、proxy装备renewAheadTimeMillis(默许10s),即broker从头投递前10s内renew;
ReceiptHandleProcessor#startRenewMessage:
renew底层是用handle回复broker一个changeInvisibleTime,代表consumer正常收到音讯,需求过段时刻重试(默许最多renew3次,不行见时刻1m、3m、5m)。
留意,proxy内存中原始handle会映射到broker回来的新handle(checkpoint) 。
假如changeInvisibleTime的新handle不ack或nack,相同会导致音讯重投。
所以必定要在consumer随后的ack或nack时,将consumer的原始handle转化为renew后的handle,传给broker。
总结
读完新架构下的client和proxy,总体来说,模型和代码结构都更清晰易读了。
1、独立实例客户端
新版客户端将producer和consumer实例彻底分开了:
- rpc客户端:producer和consumer运用不同rpc客户端;
- 路由:producer和consumer各自缓存自己关心的topic的路由数据;
- 心跳:producer和consumer各自发送独立心跳;
2、路由
proxy侧,对路由做了caffeine本地缓存:
- 运转时cache miss从nameserver加载;
- 假如nameserver回来空,缓存空方针;
- 缓存写入20s后主动改写;
client侧:
- 发动时,支撑提前从proxy加载路由;
- 运转时,本地缓存cache miss,从proxy加载路由;
- 每隔30s,改写本地缓存路由;
需求留意的是,client查询路由会传入proxy的endpoints地址,proxy会将一切broker地址替换为proxy地址。
3、心跳
客户端
客户端心跳依靠路由,发送端点决于路由回来的broker地址,即client传入的proxy地址。
客户端心跳包十分轻量:
- producer:只包括客户端类型(producer);
- consumer:除了客户端类型之外,还需求带着group消费组,可是不包括订阅联系;
新版客户端,每隔10s发送心跳包给一个proxy端点。
注:remoting客户端心跳包较重,每隔30s才发送一个,且需求循环一切broker发送。
proxy端
proxy端收到心跳,需求先依据clientId拿到Settings,即心跳依靠Settings。
依据客户端类型履行不同的注册(续期)逻辑:
- producer:适配remoting协议,将客户端信息放入ProducerManager(rocketmq-broker模块);
- consumer:适配remoting协议,将客户端信息放入ConsumerManager(rocketmq-broker模块);
特殊的是,consumer的心跳需求经过HeartbeatSyncer同步至其他proxy节点。
consumer心跳同步方法是:
- 收到心跳的proxy节点,HeartbeatSyncer发送心跳同步音讯到一个系统topic(heartbeatSyncerTopicName=DefaultHeartBeatSyncerTopic);
- 其他proxy节点,HeartbeatSyncer广播消费心跳音讯(remoting客户端) ,存储到ConsumerManager中;
4、Settings
Settings是新客户端和proxy之间出现的新模型,首要是一些装备信息。
两头敞开grpc双向流,守时同步装备,部分装备由client决定,部分装备由proxy决定。
Settings同步由client建议,每隔5分钟履行一次。
- 客户端发送本地settings到proxy;
- proxy将自己的装备兼并到客户端settings,回来客户端;
consumer的订阅联系是经过Settings同步给proxy的,这点比较重要。
5、发音讯
发音讯逻辑比较简略,首要区别在于:
- 负载均衡:producer不会选行列,一切音讯都直接发送给proxy,一般音讯由proxy轮询挑选queue;
- 主动创立topic:新版客户端不支撑主动创立topic;
- 重试战略:由proxy经过Settings同步确认,默许依照指数退避战略重试3次(10ms、20ms、40ms);
6、收音讯
新架构下消费逻辑都走POP形式。
queryAssignments
客户端查询分配给自己的queue:
- 客户端,每隔5秒,改写本地缓存Assignments;
- proxy端,依据topic,回来MessageQueue=master broker+可读queue(-1);
- 客户端,关于每个MessageQueue创立ProcessQueue,针对每个ProcessQueue开端拉音讯,即一个topic+一个master broker有一个id=-1的ProcessQueue;
receiveMessage
- consumer,针对每个ProcessQueue向proxy建议拉音讯恳求,打开一个grpc server stream;
- proxy,用remoting协议,向broker建议长轮询,queueId=-1由broker随机选queue消费;
- broker,履行pop消费拉音讯逻辑,回来音讯;
- proxy,履行tag准确匹配,关于匹配失利音讯向broker发送ack;
- proxy,将收到的音讯经过grpc server stream呼应consumer;
- consumer,提交音讯集合到消费线程池(20线程),提交新的receiveMessage恳求到rpc线程池;
处理音讯
假如消费成功,客户端发送ack给proxy,proxy简直透传给broker;
假如消费失利,客户端发送unack给proxy,本质上是changeInvisibleTime(推迟时刻由Settings同步proxy确认,1s-7200s),proxy也是简直透传给broker;
renew
proxy侧针对push顾客默许敞开renew逻辑:
- 在客户端receiveMessage时,proxy会缓存broker回来的每条音讯的句柄ReceiptHandle(checkpoint信息);
- proxy守时扫描缓存handle,假如consumer收到音讯后,长时刻(消费超时前10s)不回来消费成果(ack/nack),proxy将主意向broker建议changeInvisibleTime(nack),更新本地缓存ReceiptHandle;
- 在客户端ack/unack时,proxy会移除缓存句柄,运用renew后的句柄ack/unack broker;
参阅文档:
- proxy支撑gRPC协议:RIP-39
- proxy支撑remoting协议:RIP-55
- 5.x布置方法:rocketmq.apache.org/docs/deploy…
- 5.x客户端SDK概览:rocketmq.apache.org/docs/sdk/01…
- 5.x客户端仓库:github.com/apache/rock…
欢迎我们谈论或私信讨论问题。
本文原创,未经许可不得转载。
欢迎重视大众号【程序猿阿越】。