前言

从服务端来说,rocketmq5.x的布置方法和之前有所不同,新增了rocketmq-proxy人物。详细布置方法,参阅官方文档(rocketmq.apache.org/docs/deploy…)。

RocketMQ5源码(五)新架构下的一般音讯收发

从客户端来说,官方主张运用新版grpc轻量级客户端github.com/apache/rock…),本来的私有remoting协议将作为服务端内部协议运用,详细兼容性问题见官方文档(rocketmq.apache.org/docs/sdk/01…)。

在前面的几个章节,我运用的都是remoting客户端+nameserver+broker的布置方法,其实兼容性没有什么大问题。

架构之下,依靠两个重要特性,之前已经依据remoting协议做了铺垫(服务端内部运用相同的协议,不影响了解新架构):

  1. POP消费(依靠恣意时刻推迟音讯)
  2. 恣意时刻推迟音讯

本文首要剖析,grpc客户端+proxy下一般音讯的收发流程,broker侧逻辑将疏忽。

RocketMQ5源码(五)新架构下的一般音讯收发

注:本文服务端依据rocketmq 5.1.1版别,客户端依据rocketmq-clients java-5.0.5版别;

一、事例

rocketmq-proxy

装备文件rmq-proxy.json:

  1. namesrvAddr:NameServer地址,分号切割;
  2. proxyMode:cluster形式;
  3. rocketMQClusterName:方针broker集群名称(proxy是集群等级的);
  4. remotingListenPort:remoting协议端口,默许8080;
  5. grpcServerPort:grpc协议端口,默许8081;
  6. useEndpointPortFromRequest:查询路由,是否运用入参endpoint端点的端口;
  7. enableACL:是否敞开acl(client和proxy之间),这儿为了阐明proxy职责规模,所以敞开了;
{
  "namesrvAddr": "127.0.0.1:9876",
  "proxyMode": "cluster",
  "rocketMQClusterName": "DefaultCluster",
  "remotingListenPort": 8080,
  "grpcServerPort": 8081,
  "useEndpointPortFromRequest": true,
  "enableACL": true
}

本地发动vm参数:

-Dcom.rocketmq.proxy.configPath=rmq-proxy.json
-Drocketmq.client.localOffsetStoreDir=/tmp/rmqstore/proxy1/local-offset
-Drocketmq.home.dir=path/to/rocketmq_home

出产者

运用出产者发送简略音讯。

// 1. 经过SPI加载客户端完成
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 2. ACL 设置ak/sk
String accessKey = "RocketMQ";
String secretKey = "12345678";
SessionCredentialsProvider sessionCredentialsProvider =
    new StaticSessionCredentialsProvider(accessKey, secretKey);
// 3. 设置proxy端点,分号切割
String endpoints = "127.0.0.1:8081;127.0.0.1:8091";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
    .setEndpoints(endpoints) // proxy端点,分号切割
    .setRequestTimeout(Duration.ofDays(1)) // 恳求超时时刻
    .setCredentialProvider(sessionCredentialsProvider) // ak sk
    .build();
String topic = "MyTopic";
// 4. 创立producer
final Producer producer = provider.newProducerBuilder()
    .setClientConfiguration(clientConfiguration)
    .setTopics(topic)
    .build();
byte[] body = "Hello".getBytes(StandardCharsets.UTF_8);
String tag = "TagA";
// 5. 创立音讯
final Message message = provider.newMessageBuilder()
    .setTopic(topic)
    .setTag(tag)
    .setKeys("123")
    .setBody(body)
    .build();
while (true) {
    try {
        Thread.sleep(1000);
        // 6. 发送音讯
        final SendReceipt sendReceipt = producer.send(message);
        log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
    } catch (Throwable t) {
        log.error("Failed to send message", t);
        break;
    }
}
// 7. 封闭出产者
producer.close();

顾客

一般Push顾客。

// 1. 经过SPI加载客户端完成
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 2. ACL 设置ak/sk
String accessKey = "RocketMQ";
String secretKey = "12345678";
SessionCredentialsProvider sessionCredentialsProvider =
    new StaticSessionCredentialsProvider(accessKey, secretKey);
// 3. 设置proxy端点,分号切割
String endpoints = "127.0.0.1:8081;127.0.0.1:8091";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
    .setEndpoints(endpoints)
    .setCredentialProvider(sessionCredentialsProvider)
    .build();
// 4. 创立Push顾客
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
String consumerGroup = "groupA";
String topic = "MyTopic";
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    // 消费组
    .setConsumerGroup(consumerGroup)
    // 订阅
    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    .setMessageListener(messageView -> {
        // 业务逻辑
        log.info("Consume message={}", messageView);
        return ConsumeResult.SUCCESS;
    })
    .build();
Thread.sleep(Long.MAX_VALUE);
pushConsumer.close();

二、客户端概览

客户端完成

rocketmq-client-apis模块界说了producer、consumer、message笼统api接口。

RocketMQ5源码(五)新架构下的一般音讯收发

ClientServiceProvider是一切客户端api进口,loadService经过JDK SPI加载ClientServiceProdiver完成。

RocketMQ5源码(五)新架构下的一般音讯收发

现在ClientServiceProvider完成只要一个ClientServiceProviderImplrocketmq-client-java-noshade模块中。

ClientServiceProvider结构出来的客户端:

  1. producer有一种:ProducerImpl
  2. consumer有两种:
    1. PushConsumerImpl:PUSH形式;
    2. SImpleConsumerImpl:疏忽;

RocketMQ5源码(五)新架构下的一般音讯收发

RPC客户端

往往一个进程中既有出产者又有顾客。

MQClientManager#getOrCreateMQClientInstance:

传统remoting协议客户端(DefaultMQProducer/DefaultMQPushConsumer等)默许instanceName都是DEFAULT,终究底层都会共用一个netty客户端MQClientInstance(MQClientAPIImpl)。

RocketMQ5源码(五)新架构下的一般音讯收发

新版客户端,ProducerImpl和ConsumerImpl都承继自ClientImpl。

ClientImpl中各自会保护一个ClientManagerImpl实例。

RocketMQ5源码(五)新架构下的一般音讯收发

每个ClientManagerImpl依照proxy端点Endpoints纬度,单独保护RpcClient。

grpc出产和顾客实例独立保护一套底层rpc客户端

RocketMQ5源码(五)新架构下的一般音讯收发

在RpcClient的完成是RpcClientImpl

RpcClientImpl结构操作grpc api构建ManagedChannelStub

这儿需求留意,grpc客户端负载均衡战略并没有指定(defaultLoadBalancePolicy),默许会取pick_first,挑选一个可用的address树立长衔接,即除非出现问题,将只与这个proxy通讯。

这意味着,经过简略ip列表指定proxy,客户端无负载均衡

RocketMQ5源码(五)新架构下的一般音讯收发

如下面单元测试:虽然地址列表有多个address,终究传入grpc api的端点是ipv4:127.0.0.1:8080,127.0.0.2:8081,由grpc客户端经过pick_first战略挑选一个可用proxy树立长衔接。

@Test
public void testEndpointsWithMultipleIpv4() {
    final Endpoints endpoints = new Endpoints("127.0.0.1:8080;127.0.0.2:8081");
    Assert.assertEquals(AddressScheme.IPv4, endpoints.getScheme());
    Assert.assertEquals(2, endpoints.getAddresses().size());
    final Iterator<Address> iterator = endpoints.getAddresses().iterator();
    final Address address0 = iterator.next();
    Assert.assertEquals("127.0.0.1", address0.getHost());
    Assert.assertEquals(8080, address0.getPort());
    final Address address1 = iterator.next();
    Assert.assertEquals("127.0.0.2", address1.getHost());
    Assert.assertEquals(8081, address1.getPort());
    Assert.assertEquals(AddressScheme.IPv4, endpoints.getScheme());
    Assert.assertEquals("ipv4:127.0.0.1:8080,127.0.0.2:8081", endpoints.getFacade());
    Assert.assertEquals("ipv4:127.0.0.1:8080,127.0.0.2:8081", endpoints.getGrpcTarget());
}

Endpoints也能够经过域名结构,在服务端完成负载均衡,如nginx。

终究传入grpc api的端点是rocketmq.apache.org:80。

@Test
public void testEndpointsWithDomain() {
    final Endpoints endpoints = new Endpoints("rocketmq.apache.org");
    Assert.assertEquals(AddressScheme.DOMAIN_NAME, endpoints.getScheme());
    final Iterator<Address> iterator = endpoints.getAddresses().iterator();
    final Address address = iterator.next();
    Assert.assertEquals("rocketmq.apache.org", address.getHost());
    Assert.assertEquals(80, address.getPort());
    Assert.assertEquals("dns:rocketmq.apache.org:80", endpoints.getFacade());
    Assert.assertEquals("rocketmq.apache.org:80", endpoints.getGrpcTarget());
}

详细逻辑见org.apache.rocketmq.client.java.route.Endpoints结构。

三、路由

1、proxy路由办理

TopicRouteService:proxy侧路由办理服务。

结构时界说topic路由本地缓存(caffeine):

  1. 缓存20s后主动改写;
  2. 运转阶段,假如topic缓存不存在,从nameserver加载;
  3. 运转阶段,假如nameserver回来topic不存在,本地缓存一个空方针WRAPPED_EMPTY_QUEUE;

RocketMQ5源码(五)新架构下的一般音讯收发

TopicRouteCacheLoader经过remoting协议与nameserver通讯。

RocketMQ5源码(五)新架构下的一般音讯收发

本来nameserver要为n个客户端供给路由服务,现在只需求为m个proxy供给路由服务。

往往m<n吧,proxy能减轻nameserver压力。

可是现在client和proxy都有本地缓存。

2、客户端查询路由

每个producer和consumer实例针对自己关心的topic单独缓存一份路由数据

注:传统客户端出产和消费路由混合在一个MQClientInstance实例里。

RocketMQ5源码(五)新架构下的一般音讯收发

ClientImpl#startUp:

  1. 支撑发动阶段主动加载路由(producer设置topic、consumer发动前订阅topic),假如改写路由失利,发动失利;(运转阶段新增topic懒加载)
  2. 后期每隔30s改写路由缓存;

RocketMQ5源码(五)新架构下的一般音讯收发

ClientImpl#fetchTopicRoute0:客户端发送topic+proxy的endpoints

RocketMQ5源码(五)新架构下的一般音讯收发

当然运转阶段,假如客户端新增topic,也会懒加载路由。

比方producer在运转阶段发送新topic,consumer运转阶段新增订阅topic。

路由在新版客户端中是十分重要的,一切动作都离不开路由,包括心跳

3、proxy回来路由

RouteActivity#queryRoute:查询路由

  1. 入参模型转化;
  2. 查询;
  3. 出参模型转化;

RocketMQ5源码(五)新架构下的一般音讯收发

RouteActivity#convertToAddressList:
客户端会带着自己装备的proxy端点进来,在broker侧会对端点的adress地址列表做转化。

默许情况下,host不变,port会修改为当时proxy实例装备的grpcServerPort

可是假如经过端口转化(各类负载均衡,如nginx、k8s),能够经过设置useEndpointPortFromRequest=true,选用客户端传入port。

RocketMQ5源码(五)新架构下的一般音讯收发

ClusterTopicRouteService#getTopicRouteForProxy:Cluster形式下

  1. 查询topic对应路由信息,先走本地caffeine缓存,cache miss走nameserver;
  2. 一切broker地址,都替换为客户端传入endpoint,即rocketmq-proxy实例地址

留意,这儿一切broker地址都是客户端传入endpoints,很重要,由于一个客户端实例只会对应一个Endpoints

RocketMQ5源码(五)新架构下的一般音讯收发

出参是一组MessageQueue

  1. topic;
  2. id:queueId;
  3. permission:读写权限;
  4. broker:broker名、地址(proxy地址)等;
  5. MessageType:5.x多了MessageType,在topic纬度界说音讯类型,分红四类:一般NORMAL、次序FIFO、推迟DELAY、业务TRANSACTION。

RocketMQ5源码(五)新架构下的一般音讯收发

四、心跳

1、客户端

ClientManagerImpl#startUp:

每个客户端实例(每个producer和consumer实例)每隔10s向proxy发送独立心跳

注:传统客户端producer和consumer的instanceName相同,每隔30s会发送一个心跳(包括producer和consumer信息)一切相关master broker。

RocketMQ5源码(五)新架构下的一般音讯收发

ClientImpl#doHeartbeat:

值得留意的是,心跳包方针端点需求从路由数据中获取,即心跳依靠路由

而路由中的endpoints是客户端查路由传入的endpoints(见路由部分),所以理论上只要一个

RocketMQ5源码(五)新架构下的一般音讯收发

grpc客户端的心跳包HeartbeatRequest是十分单纯的心跳包:

  1. group:消费组(PRODUCER不传);
  2. client_type:客户端类型,PRODUCER/PUSH_CONSUMER/SIMPLE_CONSUMER;

注:传统客户端的心跳包HeartbeatData有许多数据,包括consumer的订阅联系

message HeartbeatRequest {
  optional Resource group = 1;
  ClientType client_type = 2;
}

ClientImpl#doHeartbeat:客户端侧多了一个阻隔逻辑,假如心跳成功,endpoints将从阻隔集合中移除,这与producer相关。

RocketMQ5源码(五)新架构下的一般音讯收发

2、proxy端

ClientActivity#heartbeat:

proxy端处理心跳,依靠于一个新概念Settings。(后边再看)

关于producer和consumer履行不同的注册逻辑。

RocketMQ5源码(五)新架构下的一般音讯收发

GrpcClientSettingsManager#getClientSettings:
从缓存CLIENT_SETTINGS_MAP中获取clientId对应Settings,依照producer或consumer别离兼并回来。

注:每个客户端恳求都会在header里传入一些信息,其间包括clientId,见客户端Signature#sign。

RocketMQ5源码(五)新架构下的一般音讯收发

producer注册

ClientActivity#heartbeat:producer注册,循环settings中的topic。

RocketMQ5源码(五)新架构下的一般音讯收发

ClientActivity#registerProducer:grpc模型适配remoting协议模型

  1. GrpcClientChannel,承继netty的AbstractChannel;
  2. ClientChannelInfo,和remoting协议的模型共同;

注:业务音讯逻辑后边再说。

RocketMQ5源码(五)新架构下的一般音讯收发

org.apache.rocketmq.broker.client.ProducerManager#registerProducer:

终究走的是broker模块的ProducerManager,即和broker原始逻辑共同,将ClientChannelInfo放入内存table。

需求留意的是,producer自身心跳恳求不包括group,这儿传入的group是Settings中的topic,弦外之音,producer没有出产组概念了

也能了解,5.x在topic纬度设置了音讯类型TRANSACTIONAL,直接将topic作为producerGroup。

RocketMQ5源码(五)新架构下的一般音讯收发

consumer注册

ClientActivity#heartbeat:

consumer注册,依靠于Settings中的订阅联系,可是updateSubscription=false,不更新订阅联系

RocketMQ5源码(五)新架构下的一般音讯收发

ClientActivity#registerConsumer:相同适配remoting协议模型。

RocketMQ5源码(五)新架构下的一般音讯收发

ClusterConsumerManager#registerConsumer:

终究consumer注册走ClusterConsumerManager。

和producer相似,底层逻辑走的也是broker的ConsumerManager,将consumer信息保存到内存table,可是consumer多了HeartbeatSyncer

RocketMQ5源码(五)新架构下的一般音讯收发

ConsumerManager#registerConsumer:

此处仅更新心跳时刻,不会更新订阅联系。

RocketMQ5源码(五)新架构下的一般音讯收发

HeartbeatSyncer#onConsumerRegister:向系统topic发送一条心跳同步音讯

音讯包括:

  1. ClientChannelInfo,客户端信息,包括clientId;
  2. consumerGroup,消费组;
  3. localProxyId,当时proxy的标识(本机ip+remoting端口+grpc端口);
  4. RemoteChannel,经过GrpcClientChannel转化而来,包括当时grpc客户端与proxy的衔接信息,比方双方address等;
  5. subscriptionDataSet,消费组订阅联系;

RocketMQ5源码(五)新架构下的一般音讯收发

AbstractSystemMessageSyncer#sendSystemMessage:

这个系统topic默许为heartbeatSyncerTopicName=DefaultHeartBeatSyncerTopic

proxy侧,依照broker纬度做负载均衡(onlyBroker=true),queueId都是-1;

broker侧,会依照可写queue数量做负载均衡(随机)。

RocketMQ5源码(五)新架构下的一般音讯收发

consumer心跳同步

传统客户端,循环向一切broker发送心跳包。

proxy客户端,只会向一个proxy实例发送心跳包。

proxy之间需求彼此同步consumer心跳

AbstractSystemMessageSyncer#start:

proxy发动阶段会发动一个广播顾客,订阅topic=DefaultHeartBeatSyncerTopic。

注:所以proxy也不是彻底无状态,由于广播消费进展存储在本地

RocketMQ5源码(五)新架构下的一般音讯收发

AbstractSystemMessageSyncer#createSysTopic:

proxy发动会判别心跳同步topic是否存在,假如不存在,会在指定cluster下创立,

cluster下每个broker一个queue。

RocketMQ5源码(五)新架构下的一般音讯收发

HeartbeatSyncer#consumeMessage:消费心跳同步音讯

  1. 反序列化;
  2. 判别localProxyId,假如是自己发送的音讯,疏忽;
  3. 履行consumer注册逻辑;

RocketMQ5源码(五)新架构下的一般音讯收发

ConsumerManager#registerConsumer:数据同步需求更新订阅联系。

RocketMQ5源码(五)新架构下的一般音讯收发

HeartbeatSyncer#onConsumerRegister:数据同步不需求再次发送心跳同步音讯。

RocketMQ5源码(五)新架构下的一般音讯收发

心跳超时

ClusterServiceManager#init:心跳超时由proxy扫描。

producer超时时刻写死2分钟;

consumer超时时刻默许2分钟,能够经过channelExpiredTimeout装备;

RocketMQ5源码(五)新架构下的一般音讯收发

超时逻辑与broker原始逻辑大致共同,超时从内存table中移除producer或consumer。

不同的是,直连proxy的grpc客户端衔接,不会由于扫描producer/consumer心跳超时而断开。

SimpleChannel#close:直连proxy的GrpcClientChannel和同步proxy的RemoteChannel都承继自SimpleChannel,close办法什么都不做。

RocketMQ5源码(五)新架构下的一般音讯收发

GrpcServerBuilder:

grpc客户端衔接是否断开,取决于grpc服务端装备的空闲时长grpcClientIdleTimeMills=120s。

RocketMQ5源码(五)新架构下的一般音讯收发

五、Settings

1、模型

从客户端侧来看,settings依照客户端类型(ClientType)分为三类:

  1. PublishingSettings:PRODUCER;
  2. PushSubscriptionSettings:PUSH_CONSUMER;
  3. SimpleSubscriptionSettings:SIMPLE_CONSUMER(pull);

RocketMQ5源码(五)新架构下的一般音讯收发

基类Settings,包括客户端类型,ClientId等属性。

RocketMQ5源码(五)新架构下的一般音讯收发

ClientId由hostname+pid+index+startTime四部分组成,是客户端的仅有标识。

RocketMQ5源码(五)新架构下的一般音讯收发

PublishingSettings:producer的Settings包括发布的topic。

RocketMQ5源码(五)新架构下的一般音讯收发

PushSubscriptionSettings:Push顾客Settings

  1. group:消费组;
  2. subscriptionExpressions:订阅联系,key是topic;
  3. fifo:是否次序消费;
  4. receiveBatchSize:批处理大小;
  5. longPollingTimeout:长轮询超时时刻,30s;

RocketMQ5源码(五)新架构下的一般音讯收发

2、客户端发送Settings

ClientManagerImpl#startUp:

每隔5分钟,producer/consumerImpl与proxy履行settings同步

RocketMQ5源码(五)新架构下的一般音讯收发

ClientImpl#syncSettings:

  1. 封装Settings到TelemetryCommand;
  2. 从路由搜集proxy的endpoints;
  3. 发送TelemetryCommand到proxy;

RocketMQ5源码(五)新架构下的一般音讯收发

RocketMQ5源码(五)新架构下的一般音讯收发

ClientSessionImpl:proxy的endpoints会对应一个单例ClientSessionImpl

初次发送TelemetryCommand,客户端与proxy会树立一个grpc bistream双向流

RocketMQ5源码(五)新架构下的一般音讯收发

ClientSessionImpl#write:操作grpc api向proxy发送Settings。

RocketMQ5源码(五)新架构下的一般音讯收发

3、proxy收到Settings

ClientActivity#processAndWriteClientSettings:

不管是producer仍是consumer的Settings过来,都会先履行一次注册逻辑。

所以心跳(注册/续期)依靠Settings,Settings依靠注册/续期。

承认注册完成后,将输出流注入GrpcClientChannel。

留意,consumer的订阅联系,会在Settings同步中更新,而不是在心跳部分

RocketMQ5源码(五)新架构下的一般音讯收发

ClientActivity#processAndWriteClientSettings:

注册完成后,proxy会对settings做处理再写回客户端

RocketMQ5源码(五)新架构下的一般音讯收发

ClientActivity#processClientSettings:处理Settings

  1. updateClientSettings:将客户端Settings放入内存table(GrpcClientSettingsManager.CLIENT_SETTINGS_MAP);
  2. getClientSettings:兼并客户端Settings和proxy装备
  3. 回来兼并后的Settings;

RocketMQ5源码(五)新架构下的一般音讯收发

GrpcClientSettingsManager#mergeProducerData:

关于producer的Settings,用proxy装备掩盖,比方音讯体大小限制等。

RocketMQ5源码(五)新架构下的一般音讯收发

GrpcClientSettingsManager#mergeSubscriptionData:

关于consumer的Settings,用proxy的装备broker的订阅组装备(caffeine缓存)掩盖。

RocketMQ5源码(五)新架构下的一般音讯收发

4、客户端收到Settings

ClientSessionImpl#onNext:

客户端收到兼并后的Settings。

RocketMQ5源码(五)新架构下的一般音讯收发

ClientImpl#onSettingsCommand:producer和consumer履行不同的兼并逻辑。

RocketMQ5源码(五)新架构下的一般音讯收发

PublishingSettings#sync:

producer兼并proxy回来的Settings。

RocketMQ5源码(五)新架构下的一般音讯收发

PushSubscriptionSettings#sync:
push consumer相似。

RocketMQ5源码(五)新架构下的一般音讯收发

六、一般音讯发送

1、客户端发送

ProducerImpl#send:发音讯,疏忽前置各种校验

  1. 获取路由:一组可写MessageQueue,broker地址都是proxy端点;

这儿和传统客户端不相同,传统客户端能够获取topic=TBW102作为默许路由,主动创立topic,可是新客户端不支撑,所以新客户端必定要手动创立topic

  1. 选行列:其实这儿选哪个都不太重要,由于树立grpc长衔接的proxy只要一个,后续在proxy侧会做实践负载均衡,这儿首要是为了拿到路由回来的endpoints;
  2. send0:调用proxy发送音讯;

RocketMQ5源码(五)新架构下的一般音讯收发

ProducerImpl#send0:模型转化,发送proxy。

注:messageId仍是由客户端生成,见Message模型PublishingMessageImpl结构。

RocketMQ5源码(五)新架构下的一般音讯收发

2、proxy发送

SendMessageActivity#sendMessage:模型转化。

RocketMQ5源码(五)新架构下的一般音讯收发

ProducerProcessor#sendMessage:

  1. 依据topic路由选queue;
  2. 调用queue对应broker,发送音讯;

RocketMQ5源码(五)新架构下的一般音讯收发

SendMessageQueueSelector#select:proxy担任发送音讯queue等级负载均衡

  1. 一般音讯,从可写行列里,轮询选一个queue;
  2. 次序音讯,依据messageGroup.hashCode,从可写行列里选一个queue;

RocketMQ5源码(五)新架构下的一般音讯收发

3、客户端重试

假如客户端发送音讯失利,依照战略履行重试。

ProducerImpl#send0:

  1. 阻隔endpoints,直到proxy能正确处理心跳;
  2. 超出最大重试次数3(proxy指定,经过Settings同步),失利;
  3. 业务音讯,快速失利;
  4. 假如由于服务端流控(TooManyRequests)失利,依照指数退避战略(proxy指定,经过Settings同步),推迟10ms、20ms、40ms重试;
  5. 其他,当即重试;

RocketMQ5源码(五)新架构下的一般音讯收发

七、一般音讯消费(Push)

1、概览

POP消费概览

新架构之下,消费流程都遵循POP消费形式( POP消费源码剖析

  1. queryAssignment:查询Assignment。Push顾客需求,Simple顾客不需求;
  2. receiveMessage:拉音讯;
  3. 处理音讯:
    1. ackMessage:消费成功,ack;
    2. changeInvisibleTime:消费失利,nack,过n时长后重试;

新版客户端

从新版客户端的线程模型上来看,相较于传统客户端简略了许多,分为三组线程:

RocketMQ5源码(五)新架构下的一般音讯收发

回顾broker

回顾POP消费形式下,broker关于pop消费的处理逻辑。

queryAssignment

以4.x的角度来说,这阶段也是客户端rebalance。

办法进口:QueryAssignmentProcessor#queryAssignment。

broker默许为每个consumer实例分配一切queue,仅仅queueId=-1

(broker侧默许装备defaultPopShareQueueNum=-1的行为)

RocketMQ5源码(五)新架构下的一般音讯收发

拉音讯

pop拉音讯特色:

  1. 客户端针对每个master broker提交一个Pop恳求,而恳求中未清晰指定queueId,也未清晰指定消费进展
  2. broker需求担任挑选queue和消费进展;
  3. 关于一次pop恳求,broker需求记录checkpoint信息,topic=rmq_sys_REVIVE_LOG_{clusterName},queueId=random(8),tag=ck,要运用timer音讯,方针投递时刻戳=pop恳求时刻+invisibleTime(60s)-1s
  4. 回来用户音讯的properties中会追加checkpoint信息,用于后续ack和changeInvisibleTime;

RocketMQ5源码(五)新架构下的一般音讯收发

处理音讯

pop处理音讯特色:

  1. 客户端做tag准确匹配(broker只能依据consumequeue记录的hascode过滤一遍),假如不匹配,直接异步ack不匹配音讯;
  2. 不管消费成功仍是失利,都需求回复broker,ack代表消费成功,changeInvisibleTime代表消费失利,需求重试,重试时刻由consumer定夺;
  3. broker关于ack恳求发送一条ack音讯,topic=rmq_sys_REVIVE_LOG_{clusterName},queueId=checkpoint音讯所在queueId,tag=ack,要运用timer音讯,方针投递时刻戳=pop恳求时刻+invisibleTime(60s)
  4. broker关于changeInvisibleTime恳求先创立一个新的checkpoint音讯,方针投递时刻戳=当时时刻+invisibleTime(客户端指定);然后针对本次pop恳求发送一个ack音讯

RocketMQ5源码(五)新架构下的一般音讯收发

consumer消费进展提交

broker发动PopBufferMergeService线程扫描缓存的checkpoint。

承认ck音讯发送成功,对这些用户音讯主动提交offset

重试

broker发动n个PopReviveService线程,消费revive topic的n个行列。

假如ck音讯超过invisibleTime未匹配ack音讯,从头投递用户音讯到重试topic。

2、queryAssignment

Assignments模型

从领域上来说,Assignments也被归入路由,只针对PUSH_CONSUMER有用

从客户端来看,Assignments本质上是n个MessageQueue

RocketMQ5源码(五)新架构下的一般音讯收发

RocketMQ5源码(五)新架构下的一般音讯收发
RocketMQ5源码(五)新架构下的一般音讯收发

客户端查询Assignments

PushConsumerImpl#startUp:

PUSH_CONSUMER客户端每隔5秒,改写本地缓存Assignments。

RocketMQ5源码(五)新架构下的一般音讯收发

PushConsumerImpl#scanAssignments:

循环每个订阅topic,调用proxy查询Assignments。

RocketMQ5源码(五)新架构下的一般音讯收发

PushConsumerImpl#queryAssignment:

  1. 从topic路由中搜集endpoints;
  2. 恳求包括:topic+endpoints+group;

RocketMQ5源码(五)新架构下的一般音讯收发

TopicRouteData#pickEndpointsToQueryAssignments:从一切queue中选一个queue的endpoints,要求:

  1. master上线;
  2. 有权限;

RocketMQ5源码(五)新架构下的一般音讯收发

proxy回来Assignments

RouteActivity#queryAssignment:proxy查询Assignment

  1. 将入参endpoints转化为地址列表,和路由查询共同;
  2. 查询路由,和路由查询共同;
  3. 出参转化

留意:proxy没有将查询Assignment恳求代理到broker,假如运用传统客户端敞开pop消费,查询Assignment逻辑走的是broker,见POP消费源码剖析

RocketMQ5源码(五)新架构下的一般音讯收发

RouteActivity#queryAssignment:出参转化是重点

  1. 挑选可读行列;
  2. 选master broker,与路由同理,地址是入参的endpoints,即proxy地址;
  3. 行列id都设置为-1(由broker做queue负载均衡);

RocketMQ5源码(五)新架构下的一般音讯收发

从成果上来看,终究回来客户端的Assignments只包括topic下master broker数量个queueId=-1的MessageQueue,且地址都为proxy端点

客户端处理Assignments

PushConsumerImpl#scanAssignments:

  1. 更新topic对应缓存Assignments;
  2. 更新ProcessQueue;

RocketMQ5源码(五)新架构下的一般音讯收发

PushConsumerImpl#syncProcessQueue:

依照Assignment指示:

删去queue,标记ProcessQueue为drop,从table中移除。

新增queue,依照proxy回来,一个topic,一个master broker会有一个ProcessQueue,queueId=-1,关于每个queue履行fetchMessageImmediately当即开端拉音讯。

RocketMQ5源码(五)新架构下的一般音讯收发

3、receiveMessage

客户端拉音讯

至此,consumer针对每个ProcessQueue向proxy建议拉音讯恳求。

ProcessQueueImpl#receiveMessageImmediately:

  1. 决策本次拉取音讯数量;
  2. 向proxy建议拉音讯恳求,scheduler线程切rpc线程;

RocketMQ5源码(五)新架构下的一般音讯收发

ProcessQueueImpl#getReceptionBatchSize:

拉取音讯数量,包括必定流控逻辑,不超过32条(Settings同步由proxy确认)。

RocketMQ5源码(五)新架构下的一般音讯收发

proxy处理拉音讯

proxy向broker建议长轮询

ReceiveMessageActivity#receiveMessage:proxy处理receiveMessage恳求进口

  1. actualInvisibleTime:pop消费的invisibleTime,proxy操控,60s;
  2. pollingTime:proxy与broker之间长轮询挂起时刻,proxy操控,20s;

RocketMQ5源码(五)新架构下的一般音讯收发

ConsumerProcessor#popMessage:先选行列,再向broker建议长轮询。

RocketMQ5源码(五)新架构下的一般音讯收发

ReceiveMessageQueueSelector#select:基本上归于透传。

broker侧直接取consumer指定的broker的queue,queueId相同等于-1

之后由broker做pop消费负载均衡(随机选一个queue)。

RocketMQ5源码(五)新架构下的一般音讯收发

ConsumerProcessor#popMessage:

proxy侧,协议转化,向broker建议长轮询。

RocketMQ5源码(五)新架构下的一般音讯收发

broker回来音讯

broker侧,pop消费处理逻辑见POP消费源码剖析

重点是,broker将checkpoint信息放在回来的每条message的properties的POP_CK里

包括ack/unack音讯需求的必要信息,比方queueId、offset等信息。

RocketMQ5源码(五)新架构下的一般音讯收发

proxy收到broker回来音讯

ConsumerProcessor#popMessage:

tag准确过滤逻辑放在了proxy侧,不匹配的音讯会直接发送ack给broker。

RocketMQ5源码(五)新架构下的一般音讯收发

MessagingProcessor#popMessage:

  1. push消费,有一个renew逻辑(autoRenew=true,默许敞开),将checkpoint信息(receiptHandle)缓存;(后边再看,这段逻辑没有也不影响流程)
  2. 将呼应写回客户端;

RocketMQ5源码(五)新架构下的一般音讯收发

ReceiveMessageResponseStreamWriter#writeAndComplete:写回客户端。

能够看到拉音讯走的是ServerStream,proxy会按次序写三种包呼应客户端:

  1. STATUS:1个状态包,标识本次receiveMessage呼应是否成功;
  2. MESSAGE:0-n个音讯包,即拉到的音讯;
  3. DELIVERY_TIMESTAMP:1个呼应时刻戳包,这个首要是metrics收集用,能够疏忽;

RocketMQ5源码(五)新架构下的一般音讯收发

RocketMQ5源码(五)新架构下的一般音讯收发

proxy侧,checkpoint信息被转化为ReceiptHandle

RocketMQ5源码(五)新架构下的一般音讯收发

客户端收到音讯

RpcClientImpl#receiveMessage:客户端收到proxy呼应。

RocketMQ5源码(五)新架构下的一般音讯收发

ConsumerImpl#receiveMessage:proto模型转化。

RocketMQ5源码(五)新架构下的一般音讯收发

ProcessQueueImpl#onReceiveMessageResult:ProcessQueue处理拉到的音讯

  1. 缓存音讯;
  2. 提交音讯到消费线程;
  3. 再次提交receiveMessage恳求到rpc线程,形成循环;

RocketMQ5源码(五)新架构下的一般音讯收发

StandardConsumeService#consume:一般并发消费(非次序)

  1. 每条音讯别离丢到消费线程池(20线程)履行;
  2. 处理消费成果;

RocketMQ5源码(五)新架构下的一般音讯收发

ProcessQueueImpl#eraseMessage:

消费成果分红两种情况,成功-ack,失利-nack。

RocketMQ5源码(五)新架构下的一般音讯收发

4、ack

客户端

ProcessQueueImpl#ackMessage:

客户端ack有重试逻辑,假如ack呼应失利,推迟1s重试,无限次数重试。

RocketMQ5源码(五)新架构下的一般音讯收发

ConsumerImpl#wrapAckMessageRequest:客户端会带着ReceiptHandle建议ack。

RocketMQ5源码(五)新架构下的一般音讯收发

proxy

AckMessageActivity#processAckMessage:

  1. 移除缓存句柄(归于renew逻辑,后边看);
  2. 履行ack;

RocketMQ5源码(五)新架构下的一般音讯收发

ConsumerProcessor#ackMessage:

校验句柄未过期(默许invisibleTime=60s,60s收到音讯没ack,句柄就失效)。

转化remoting协议,恳求broker,ReceiptHandle便是broker收到的ExtraInfo。

RocketMQ5源码(五)新架构下的一般音讯收发

5、unack

客户端

ProcessQueueImpl#nackMessage:unack便是POP消费剖析的changeInvisibleTime。

  1. 不行见时刻由proxy经过settings同步回来,从1s到7200s。
  2. changeInvisibleTime恳求也支撑重试,推迟1s,无限次数。

RocketMQ5源码(五)新架构下的一般音讯收发
ConsumerImpl#wrapChangeInvisibleDuration:

恳求包括group、topic、ReceiptHandle、不行见时刻。

RocketMQ5源码(五)新架构下的一般音讯收发

proxy

ChangeInvisibleDurationActivity#changeInvisibleDuration:

proxy侧,和ack逻辑相似,仅仅调用broker办法不同。

RocketMQ5源码(五)新架构下的一般音讯收发

6、renew

renew是proxy独有的逻辑。

假如consumer收到音讯后,长时刻不回来消费成果(ack/nack),proxy将主意向broker建议changeInvisibleTime(nack)

至于为什么这么做,我猜测是这样。假设下面这种场景:

  1. consumer-proxy-broker之间能正常通讯;
  2. 由于consumer消费慢,broker超时(invisibleTime)未收到消费成果(ack或nack都行);
  3. broker会持续向重试topic投递音讯;

重试音讯将进一步加重consumer的音讯积压。

传统非pop消费形式,是否sendMessageBack重试,彻底取决于客户端,不会有这种问题。

ReceiptHandleProcessor:担任办理缓存的receiptHandle(checkpoint)

RocketMQ5源码(五)新架构下的一般音讯收发

ReceiptHandleProcessor只要两个public办法:

  1. receiveMessage时,缓存receiptHandle;
  2. ack/unack时,清除缓存的receiptHandle;

RocketMQ5源码(五)新架构下的一般音讯收发

ReceiptHandleProcessor#init:

发动阶段,敞开5s守时使命,扫描缓存handle。

RocketMQ5源码(五)新架构下的一般音讯收发

ReceiptHandleProcessor#scheduleRenewTask:

  1. clientIsOffline:假如channel+group找不到consumer心跳,代表consumer失联,移除consumer下一切handle,对这些handle履行renew;
  2. 假如音讯即将被从头投递,履行renew;是否即将从头被投递,取决于pop时刻、invisible时刻、proxy装备renewAheadTimeMillis(默许10s),即broker从头投递前10s内renew;

RocketMQ5源码(五)新架构下的一般音讯收发

ReceiptHandleProcessor#startRenewMessage:

renew底层是用handle回复broker一个changeInvisibleTime,代表consumer正常收到音讯,需求过段时刻重试(默许最多renew3次,不行见时刻1m、3m、5m)。

留意,proxy内存中原始handle会映射到broker回来的新handle(checkpoint)

RocketMQ5源码(五)新架构下的一般音讯收发

假如changeInvisibleTime的新handle不ack或nack,相同会导致音讯重投。

所以必定要在consumer随后的ack或nack时,将consumer的原始handle转化为renew后的handle,传给broker。

RocketMQ5源码(五)新架构下的一般音讯收发

总结

读完新架构下的client和proxy,总体来说,模型和代码结构都更清晰易读了。

RocketMQ5源码(五)新架构下的一般音讯收发

1、独立实例客户端

新版客户端将producer和consumer实例彻底分开了

  1. rpc客户端:producer和consumer运用不同rpc客户端;
  2. 路由:producer和consumer各自缓存自己关心的topic的路由数据;
  3. 心跳:producer和consumer各自发送独立心跳;

2、路由

proxy侧,对路由做了caffeine本地缓存:

  1. 运转时cache miss从nameserver加载;
  2. 假如nameserver回来空,缓存空方针;
  3. 缓存写入20s后主动改写;

client侧:

  1. 发动时,支撑提前从proxy加载路由;
  2. 运转时,本地缓存cache miss,从proxy加载路由;
  3. 每隔30s,改写本地缓存路由;

需求留意的是,client查询路由会传入proxy的endpoints地址,proxy会将一切broker地址替换为proxy地址。

3、心跳

客户端

客户端心跳依靠路由,发送端点决于路由回来的broker地址,即client传入的proxy地址。

客户端心跳包十分轻量

  1. producer:只包括客户端类型(producer);
  2. consumer:除了客户端类型之外,还需求带着group消费组,可是不包括订阅联系

新版客户端,每隔10s发送心跳包给一个proxy端点

注:remoting客户端心跳包较重,每隔30s才发送一个,且需求循环一切broker发送。

proxy端

proxy端收到心跳,需求先依据clientId拿到Settings,即心跳依靠Settings

依据客户端类型履行不同的注册(续期)逻辑:

  1. producer:适配remoting协议,将客户端信息放入ProducerManager(rocketmq-broker模块);
  2. consumer:适配remoting协议,将客户端信息放入ConsumerManager(rocketmq-broker模块);

特殊的是,consumer的心跳需求经过HeartbeatSyncer同步至其他proxy节点

consumer心跳同步方法是:

  1. 收到心跳的proxy节点,HeartbeatSyncer发送心跳同步音讯到一个系统topic(heartbeatSyncerTopicName=DefaultHeartBeatSyncerTopic);
  2. 其他proxy节点,HeartbeatSyncer广播消费心跳音讯(remoting客户端) ,存储到ConsumerManager中;

4、Settings

Settings是新客户端和proxy之间出现的新模型,首要是一些装备信息。

两头敞开grpc双向流,守时同步装备,部分装备由client决定,部分装备由proxy决定。

Settings同步由client建议,每隔5分钟履行一次

  1. 客户端发送本地settings到proxy;
  2. proxy将自己的装备兼并到客户端settings,回来客户端;

consumer的订阅联系是经过Settings同步给proxy的,这点比较重要。

5、发音讯

发音讯逻辑比较简略,首要区别在于:

  1. 负载均衡:producer不会选行列,一切音讯都直接发送给proxy,一般音讯由proxy轮询挑选queue;
  2. 主动创立topic:新版客户端不支撑主动创立topic;
  3. 重试战略:由proxy经过Settings同步确认,默许依照指数退避战略重试3次(10ms、20ms、40ms);

6、收音讯

新架构下消费逻辑都走POP形式

queryAssignments

客户端查询分配给自己的queue:

  1. 客户端,每隔5秒,改写本地缓存Assignments;
  2. proxy端,依据topic,回来MessageQueue=master broker+可读queue(-1);
  3. 客户端,关于每个MessageQueue创立ProcessQueue,针对每个ProcessQueue开端拉音讯,即一个topic+一个master broker有一个id=-1的ProcessQueue

receiveMessage

  1. consumer,针对每个ProcessQueue向proxy建议拉音讯恳求,打开一个grpc server stream
  2. proxy,用remoting协议,向broker建议长轮询,queueId=-1由broker随机选queue消费
  3. broker,履行pop消费拉音讯逻辑,回来音讯;
  4. proxy,履行tag准确匹配,关于匹配失利音讯向broker发送ack;
  5. proxy,将收到的音讯经过grpc server stream呼应consumer;
  6. consumer,提交音讯集合到消费线程池(20线程),提交新的receiveMessage恳求到rpc线程池;

处理音讯

假如消费成功,客户端发送ack给proxy,proxy简直透传给broker;

假如消费失利,客户端发送unack给proxy,本质上是changeInvisibleTime(推迟时刻由Settings同步proxy确认,1s-7200s),proxy也是简直透传给broker;

renew

proxy侧针对push顾客默许敞开renew逻辑:

  1. 在客户端receiveMessage时,proxy会缓存broker回来的每条音讯的句柄ReceiptHandle(checkpoint信息);
  2. proxy守时扫描缓存handle,假如consumer收到音讯后,长时刻(消费超时前10s)不回来消费成果(ack/nack),proxy将主意向broker建议changeInvisibleTime(nack),更新本地缓存ReceiptHandle;
  3. 在客户端ack/unack时,proxy会移除缓存句柄,运用renew后的句柄ack/unack broker;

参阅文档

  1. proxy支撑gRPC协议:RIP-39
  2. proxy支撑remoting协议:RIP-55
  3. 5.x布置方法:rocketmq.apache.org/docs/deploy…
  4. 5.x客户端SDK概览:rocketmq.apache.org/docs/sdk/01…
  5. 5.x客户端仓库:github.com/apache/rock…

欢迎我们谈论或私信讨论问题。

本文原创,未经许可不得转载。

欢迎重视大众号【程序猿阿越】。