深入分析 RocketMQ 的 push 消费办法完成

深入分析 RocketMQ 的 push 消费办法完成

前语

RocketMQ 是阿里巴巴旗下一款开源的 MQ 框架,经历过双十一考验,由 Java 编程语言完成,有非常完好的生态系统。RocketMQ 作为一款纯 Java、分布式、行列模型的开源音讯中间件,支撑事务音讯、次序音讯、批量音讯、定时音讯、音讯回溯等。

RocketMQ 首要由以下四个部分组成:

深入分析 RocketMQ 的 push 消费办法完成

核心概念简述

  • NameServer:可以了解为是一个注册中心,首要是用来保存 Topic 路由信息,办理 Broker,支撑 Broker 的动态注册和发现,保存 Topic 和 Borker 之间的联系。在 NameServer 的集群中,NameServer 与 NameServer 之间是没有任何通信的。
  • Broker:核心的一个角色,首要是担任音讯的存储、查询消费,在发动时会向 NameServer 进行注册。Broker 实例可以有许多个,相同的 BrokerName 可以称为一个 Broker 组,每个 Broker 组只保存一部分音讯。
  • Topic:可以了解为一个音讯的调集的名字,一个 Topic 可以分布在不同的 Broker 组下。
  • 行列(Queue) :一个 Topic 可以有许多行列,默认是一个 Topic 在同一个 Broker 组中是4个。假如一个Topic 在2个 Broker 组中,那么就有可能有8个行列。
  • 生产者:生产音讯的一方便是生产者。
  • 生产者组:一个生产者组可以有许多生产者,只需求在创立生产者的时分指定生产者组,那么这个生产者就在那个生产者组。
  • 顾客:用来消费生产者音讯的一方。
  • 顾客组:跟生产者一样,每个顾客都有所在的顾客组,一个顾客组可以有许多的顾客,不同的顾客组消费音讯是互不影响的。

MQ 的消费办法

RocketMQ 消费办法便是指顾客怎么从 MQ 中获取到音讯,分为两种办法,Push(推办法)和 Pull(拉办法)。

Push(推办法)

Push,便是推音讯。当 Broker 收到生产者的音讯时,会自动的将音讯推送给顾客端进行消费,这种消费形式就叫 Push,也便是 MQ 将音讯推给到顾客的意思。

深入分析 RocketMQ 的 push 消费办法完成

Push(推办法)形式的优势便是呼应速度快,音讯的实时性比较高,一旦 Brocker 收到音讯后,就能立马将音讯推送给顾客,顾客也就能立马对收到的音讯进行消费。

可是这种 Push 的消费形式存在必定的缺点,便是一旦音讯量比较大时,对顾客性能要求较高,由于顾客无法操控 MQ 音讯的推送速度,因而一旦音讯量大,那么顾客消费的压力就比较大。

此外,Push 形式还会面对以下一些问题:

1) Broker 端需求保护 Consumer 的状况,这不利于 Broker 支撑很多 Consumer 的场景。 2) Consumer 的消费速度是不一致的,独自经过 Broker 推送音讯,难以处理不同的 Consumer 的状况。 3) Broker 难以处理 Consumer 无法消费音讯的状况,由于Broker 无法确定 Consumer 仅仅暂时的毛病仍是永久性的毛病。

4) 很多的推送音讯会加剧 Consumer 的负载,乃至冲垮 Consumer。

Pull(拉办法)

Pull 刚好跟 Push 相反,便是顾客自动去 MQ 中拉取音讯。

深入分析 RocketMQ 的 push 消费办法完成

天然, Pull 的优缺点也和 Push 正好相反。顾客端可以依据自身的消费状况,来决定是否去拉音讯,何时去拉音讯,这种自动去 MQ 拉取音讯的形式,使得顾客端的压力相对较小。可是,由于拉取的时机和频率需求自己操控,拉取频频简单形成服务端和客户端的压力,拉取距离长又简单形成消费不及时。

Pull形式由 Consumer 自动从 Broker 获取音讯,其长处为: 1) Broker 不再需求保护 Consumer 的状况(每一次 Pull 都包含了其实偏移量等必要的信息)。

2) 状况保护在 Consumer,所以 Consumer 可以很简单的依据自身的负载等状况来决定从 Broker 获取音讯的频率。 3) 由于 Broker 无法猜测写一条音讯发生的时刻,所以在收到音讯之后只能立即推送给 Consumer,所以无法对音讯聚合后再推送给 Consumer。 而 Pull 形式由 Consumer 自动来获取音讯,每一次 Pull 时都尽可能多的获取已经在 Broker 上的音讯。

此外,Pull 形式还会面对以下一些问题:

1) 实时性的问题,自动的拉取音讯会发生无法猜测的推迟,假如单纯提高 Pull 的履行频率,可能会形成很多的无效 Pull 恳求,而频率过低时,就会形成消费的推迟。

RocketMQ 中关于这两种消费办法的调用办法

RocketMQ 作为阿里开源的一款高性能、功能丰富的 MQ,天然一起完成了 Push 和 Pull 的两种消费办法,用户可以挑选在项目中使用 Push 仍是 Pull。

深入分析 RocketMQ 的 push 消费办法完成

在一般状况下,项目中都是使用 Push 的办法来消费,由于 Pull 除了时实性差外,Pull 办法还得让开发人员自动去保护音讯消费进展,添加额定的操作。

所以接下来就着重讲一下 RocketMQ 是怎么完成 Push 的逻辑。

RocketMQ 怎么完成 Push

RocketMQ 经过一种伪 Push 的形式,完成了 Brocker 和 Comsumer 之间的实时性和压力平衡,而这种伪 Push 形式其底层仍是依据 Pull 来完成的。这种完成办法就称之为长轮询机制。

轮询与长轮询

轮询和长轮询都是依据客户端自意向服务端发送恳求来自动获取数据的办法,属于一种拉取数据的完成办法。

轮询

轮询是指客户端每隔必定时刻发送恳求,不管服务端的数据是否有更新,都会回来给客户端。这种办法可能会形成很多无用的恳求,糟蹋服务器的资源,而且可能形成数据的推迟。

长轮询

长轮询是客户端发送恳求给服务端,假如服务端有数据更新,则立即回来;假如服务端没有数据更新,则将恳求坚持住,直到有新数据时再回来给客户端。长轮询可以处理频频恳求但无更新数据的问题,一起也可以使顾客在有新数据抵达时即时获取到数据,类似于推送的效果。

需求注意的是,长轮询可能会添加服务端代码完成的复杂度,但从效果上来看,它可以处理轮询频频恳求的问题,并具有必定的即时性。

Push消费办法源码探求

深入分析 RocketMQ 的 push 消费办法完成

顾客长轮询逻辑

  1. 顾客调用 PullKernelImpl 发送拉取恳求,调用时用 BrokerSuspendMaxTimeMillis 指定了 Broker 挂起的最长时刻,默以为 20s
  2. Broker 中 PullMessageProcess 处理拉取恳求,从 ConsumeQueue 中查询音讯。
  3. 假如没有查询到音讯,判别是否启用长轮询,调用 PullRequestHoldService#SuspendPullRequest() 办法将该恳求挂起。
  4. PullRequestHoldService 线程 Run() 办法循环等候轮询时刻,然后周期性调用 CheckHoldRequest() 办法检查挂起的恳求是否有音讯可以拉取。
  5. 假如检查到有新音讯可以拉取,调用 NotifyMessageArriving() 办法。
  6. ReputMessageService 的 DoReput() 假如被调用,阐明也有新音讯抵达,需求唤醒挂起的拉取恳求。这里也会发送一个 Notify,从而调用 NotifyMessageArriving() 办法。
  7. NotifyMessageArriving() 办法中也会查询 ConsumeQueue 的最大 Offset,假如的确有新音讯,那么将唤醒对应的拉取恳求,详细的办法是调用 ExecuteRequestWhenWakeup() 办法。
  8. ExecuteRequestWhenWakeup() 办法唤醒拉取恳求,调用 ProcessRequest() 办法处理该恳求。
1.PullMessageService#PullMessage
private void pullMessage(final PullRequest pullRequest) {
 //从pullRequest中获取顾客组
 final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
 if (consumer != null) {
  //强转为push形式顾客
  DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
  //真实履行拉取音讯的办法
  impl.pullMessage(pullRequest);
  } else {
  log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
  }
}
/**
 * 处理拉取音讯的恳求
 */
@Override
public void run() {
 log.info(this.getServiceName() + " service started");
​
 //在它的run办法中,循环不断的从pullRequestQueue中堵塞式的获取并移除行列的头部数据,即拉取音讯的恳求,
 // 然后调用pullMessage办法依据该恳求去broker拉取音讯。
 while (!this.isStopped()) {
  try {
   PullRequest pullRequest = this.pullRequestQueue.take();
   //调用pullMessage办法
   this.pullMessage(pullRequest);
   } catch (InterruptedException ignored) {
   } catch (Exception e) {
   log.error("Pull Message Service Run Method exception", e);
   }
  }
 log.info(this.getServiceName() + " service end");
}

在 Consumer 端, Push 形式的音讯拉取由 PullMessageService 类完成, PullMessageService 承继了 ServiceThread 类,并完成了 Run 办法,经过异步的办法,循环从 PullRequestQueue 中堵塞式的获取并移除行列头部的数据,最终调用了 DefaultMQPushConsumerImpl 类的 PullMessage 办法。其中,PullRequestQueue 行列是在负载均衡之时关于新分配到的音讯行列而创立的,因而只要该行列中有拉取的恳求,就会去 Brocker 拉取音讯,假如没有就会堵塞。

2.DefaultMQPushConsumerImpl#pullMessage
/**
 * 处理正在拉取音讯的代码
 */
public void pullMessage(final PullRequest pullRequest) {
 //服务状况校验
 //...
 //流控校验
 //取得processQueue中已缓存的音讯总数量
 long cachedMessageCount = processQueue.getMsgCount().get();
 //获取processQueue中已缓存的音讯总巨细MB
 long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
​
 // 判别还未音讯的数量,数量太多就等会再履行从头履行拉取音讯的逻辑.
 if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
  // 等会再履行从头履行拉取音讯的逻辑.
  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
  if ((queueFlowControlTimes++ % 1000) == 0) {
    log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
  this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
   }
  return;
  }
​
 // 判别还未音讯的巨细,假如还未音讯的音讯占用的内存过大,就等会再履行从头履行拉取音讯的逻辑.
 if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
 // 等会再履行从头履行拉取音讯的逻辑.
 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
 if ((queueFlowControlTimes++ % 1000) == 0) {
  log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
  this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
  }
 return;
  }
 //...
 //次序消费和并发消费的校验
 //调用pullAPIWrapper.pullKernelImpl办法,拉取音讯
}
  

承接上文,这里是 DefaultMQPushConsumerImpl 的 PullMessage 办法的源码,该类中首要做了以下操作:

  1. 服务状况校验。在拉取音讯之前,会对顾客的状况进行校验,假如顾客已被丢掉或许处于暂停状况,会推迟发送拉取音讯的恳求。

  2. 流控校验。当顾客准备去拉音讯的时分,会先去判别当前顾客消费的压力再决定是否去拉取音讯。

    RocketMQ 供给了两种判别消费压力逻辑,一种是依据还未消费的音讯的数量的巨细,还有一种是依据还未消费的音讯所占内存的巨细。

    • 判别还未消费音讯的数量,数量 > 1000 就等会等候 50ms,并将此次 Pull 恳求放回行列中,Return 掉。并再履行从头履行拉取音讯的逻辑
    • 判别还未消费音讯的巨细,假如还未音讯的音讯占用的内存过大 > 100mb,就等会再履行从头履行拉取音讯的逻辑
  3. 次序消费和并发消费的校验。依据消费形式的不同,对消费音讯的次序性进行校验。假如是并发消费且未消费音讯的offset跨度大于设定的阈值,则推迟发送拉取音讯的恳求。假如是次序消费而且之前未锁定消费点方位,则需求设置消费点位。

  4. 创立拉取音讯的回调函数目标 PullCallback,在真实进行拉取音讯的恳求之前,会创立一个回调函数目标,用于在拉取恳求回来后履行相应的回调操作。

  5. 判别是否答应将消费点位上报给 Broker 进行耐久化。假如是集群消费形式且本地内存中存在与当前音讯行列相关的消费进展信息,则设置 CommitOffsetEnable 为 True,表示在拉取音讯时可以将消费点位上报给 Broker 进行耐久化。

  6. 调用 PullAPIWrapper.PullKernelImpl() 办法来实际履行拉取音讯的操作。

    这些过程用于确保在拉取音讯的过程中可以满足各种校验条件,并完成音讯的牢靠消费和流量操控等功能。

3.PullRequestHoldService#SuspendPullRequest
/*****************PullMessageProcessor#processRequest*****************/
case ResponseCode.PULL_NOT_FOUND:
 // 音讯没找到,假如答应恳求挂起的话,那么就会将恳求挂起,等有音讯的时分,再将音讯回来给客户端.
 if (brokerAllowSuspend && hasSuspendFlag) {
  long pollingTimeMills = suspendTimeoutMillisLong;
  if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
   pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
   }
  String topic = requestHeader.getTopic();
  long offset = requestHeader.getQueueOffset();
  int queueId = requestHeader.getQueueId();
  PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
  this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
  // 将拉音讯的恳求存起来
  this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
  // response 设置为null,就不会给客户端呼应的意思
  response = null;
  break;
  }
​
/********************PullRequestHoldService#suspendPullRequest**************************/
​
​
protected ConcurrentMap<String, ManyPullRequest> pullRequestTable =
 new ConcurrentHashMap<String, ManyPullRequest>(1024);
​
/**
 * 将拉取音讯的恳求挂起
 *
 * @param topic
 * @param queueId
 * @param pullRequest
 */
public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
 String key = this.buildKey(topic, queueId);
 ManyPullRequest mpr = this.pullRequestTable.get(key);
 if (null == mpr) {
  mpr = new ManyPullRequest();
  ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
  if (prev != null) {
   mpr = prev;
   }
  }
 mpr.addPullRequest(pullRequest);
}

假如在 Broker 端中没有查询到音讯,会经过呼应码为ResponseCode.PULL_NOT_FOUND 的代码块,而且发动长轮询。该代码块会调用 PullRequestHoldService 类的 SuspendPullRequest 办法将拉取音讯的恳求存储起来。PullRequestHoldService 是用来存储拉取恳求的类,该办法会将恳求进行分类并放在一个 ConcurrentHashMap 中。

4.PullRequestHoldService#NotifyMessageArriving
protected void checkHoldRequest() {
 for (String key : this.pullRequestTable.keySet()) {
  String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
  if (2 == kArray.length) {
   String topic = kArray[0];
   int queueId = Integer.parseInt(kArray[1]);
   //获取 这个topic 的 这个queueId的queue音讯的最大的offset
   final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
   try {
    //测验唤醒等候线程.
    this.notifyMessageArriving(topic, queueId, offset);
    } catch (Throwable e) {
    log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
    }
   }
  }
}
/**
 * 这个办法也会在 {@link NotifyMessageArrivingListener} 中调用,意思便是一旦有音讯来了,那么就测验唤醒长轮询的恳求
 */
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
 long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
 String key = this.buildKey(topic, queueId);
 ManyPullRequest mpr = this.pullRequestTable.get(key);
 if (mpr != null) {
  List<PullRequest> requestList = mpr.cloneListAndClear();
  if (requestList != null) {
   List<PullRequest> replayList = new ArrayList<PullRequest>();
   for (PullRequest request : requestList) {
    long newestOffset = maxOffset;
    if (newestOffset <= request.getPullFromThisOffset()) {
    //传过来的offset小于恳求拉取音讯的开始的offset,那么就从头读取音讯最大的offset
    //这一步其实是为了确保必定能拉取的需求的音讯
    newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
    }
​
    if (newestOffset > request.getPullFromThisOffset()) {
    //只有当行列音讯最大的offset大于顾客需求拉取的音讯的offset,那么才履行
    //其实很好了解,假定当前行列音讯的最大offset是10,可是顾客要拉取第11位的音讯,那么此时肯定没有音讯,就不用处理了
    boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
     new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
    // match by bit map, need eval again when properties is not null.
    if (match && properties != null) {
     match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
     }
​
    if (match) {
     try {
      // 从头履行一遍拉取的恳求,这样就能拉取到音讯了.
     this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(
        request.getClientChannel(),request.getRequestCommand());
      } catch (Throwable e) {
      log.error("execute request when wakeup failed.", e);
      }
     continue;
     }
    //...
    }
   //...
   }
  }
}

在 Broker 端,存在 PullRequestHoldService 服务来办理长轮询恳求的线程。当一个拉取恳求被挂起时,它将被保存在这个服务中。每隔一段时刻(长轮询或短轮询等候时刻),该服务会检查挂起的恳求中是否有可拉取的音讯。

PullRequestHoldService 会从本地缓存变量 PullRequestTable 中获取 PullRequest 恳求,并检查条件是否满足轮询条件(待拉取音讯的偏移量是否小于消费行列的最大偏移量)。假如条件成立,表示 Broker 端有新音讯抵达,那么就会经过 PullMessageProcessor 的 ExecuteRequestWhenWakeup() 办法从头测验发起 Pull 音讯的 RPC 恳求。

在 ExecuteRequestWhenWakeup() 办法中,会经过事务线程池 PullMessageExecutor 异步提交从头 Pull 音讯的恳求使命。这个使命会再次调用 PullMessageProcessor 的 ProcessRequest() 办法,完成对 Pull 音讯恳求的二次处理。

这样的设计可以确保在长轮询过程中,当有新音讯抵达 Broker 端时,可以及时触发从头 Pull 音讯的恳求,使顾客可以即时获取到新音讯。一起,经过异步提交使命的办法,避免了堵塞主线程,提高了系统的并发处理才能。

总结

本次讲解了 DefaultMQPushConsumer 顾客客户端怎么发起的拉取音讯恳求。

大多数音讯行列系统不管是 Pull 仍是 Push 机制,都借鉴了一些一起的理念和思想,特别是在处理很多音讯和确保音讯牢靠性方面的问题。类似的机制,在其他系统中也会得到应用,如 Nacos 中的 Push 和长轮询机制。

虽然在详细完成上可能会有所不同,但这些办法背后的思想类似,都是为了确保音讯在传递的过程中可以高效、牢靠地被处理,一起确保系统性能的高效性。关于这些机制的了解和掌握,关于开发人员来说是非常重要的。

推荐阅读

浅谈MySQL分页查询的工作原理

shardingjdbc发动优化

权限办理——多系统下的数据权限通用操控

SpringBoot自动装配

Java线程和CPU调度

招贤纳士

政采云技能团队(Zero),Base 杭州,一个赋有热情和技能匠心精神的生长型团队。规划 500 人左右,在日常事务开发之外,还分别在云原生、区块链、人工智能、低代码平台、中间件、大数据、物料系统、工程平台、性能体会、可视化等范畴进行技能探究和实践,推动并落地了一系列的内部技能产品,继续探究技能的新边界。此外,团队还纷纷投身社区建造,现在已经是 google flutter、scikit-learn、Apache Dubbo、Apache Rocketmq、Apache Pulsar、CNCF Dapr、Apache DolphinScheduler、alibaba Seata 等很多优秀开源社区的贡献者。

假如你想改动一向被事折腾,期望开始折腾事;假如你想改动一向被告诫需求多些想法,却无从破局;假如你想改动你有才能去做成那个成果,却不需求你;假如你想改动你想做成的事需求一个团队去支撑,但没你带人的方位;假如你想改动原本悟性不错,但总是有那一层窗户纸的模糊……假如你信任信任的力量,信任平凡人能成就非凡事,信任能遇到更好的自己。假如你期望参加到随着事务腾飞的过程,亲手推动一个有着深入的事务了解、完善的技能系统、技能创造价值、影响力外溢的技能团队的生长过程,我觉得我们该聊聊。任何时刻,等着你写点什么,发给 zcy-tc@cai-inc.com

微信大众号

文章同步发布,政采云技能团队大众号,欢迎关注

深入分析 RocketMQ 的 push 消费办法完成