一、前言
在我的日常开发工作中,有碰到过一些运用RocketMQ音讯堆积的场景,音讯堆积是某个出产者发送了音讯到topic下的行列中,但是顾客未及时消费,导致顾客组的音讯堆积的状况,本文将剖析常见的音讯堆积的场景以及实践的处理计划。
二、音讯堆积呈现的场景
1、顾客注册未成功
这种状况产生在新上线顾客组,咱们有遇到过消费组下的顾客实例设置的instanceName都共同的状况下,这个时候顾客组下某些顾客注册不成功,从而会产生音讯堆积的状况。
2、音讯出产过快,而顾客消费过慢
这种状况是咱们的事务消费逻辑消费慢的场景,比方消费用户访问app主页音讯,这种音讯出产速度是非常快的,假如咱们的消费逻辑非常慢的话,就呈现音讯堆积的状况。
三、音讯堆积处理计划
- 假如呈现了音讯堆积的状况,咱们需求怎样处理呢。
上面第一种状况是归于MQ运用姿态问题,顾客都没有注册上去,这种状况需求及时调整代码,将顾客注册好
,关于堆积的音讯进行抛弃或许加快消费。
- 假如是第二种状况,这是非运用姿态问题,顾客消费能力的问题。咱们能够怎样处理呢?
手法一、进步消费并行度
1、同一个ConsumerGroup,经过增加 Consumer 实例数量
来进步并行度(需求留意的是超过订阅行列数的 Consumer 实例无效
)。能够经过加机器,或许在已有机器发动多个进程的方法。
2、进步单个 Consumer 的消费并行线程
,经过修改参数 consumeThreadMin
、consumeThreadMax
完成。RocketMQ默许消费线程是20
个。咱们能够将他装备到分布式装备中心,比方apollo,完成动态的调整。
手法二、批量消费方法
某些事务流程假如支撑批量方法消费,则能够很大程度上进步消费吞吐量,例如订单扣款类使用,一次处理一个订单耗时 1 s,一次处理 10 个订单或许也只耗时 2 s,这样即可大幅度进步消费的吞吐量。
经过设置 consumer的 consumeMessageBatchMaxSize 参数
,默许是 1,即一次只消费一条音讯,例如设置为 N,那么每次消费的音讯数小于等于 N。
手法三、越过非重要音讯,追赶上出产者进展
产生音讯堆积时,假如消费速度一直追不上发送速度,假如事务对数据要求不高的话,能够挑选丢弃不重要的音讯。例如,当某个行列的音讯数堆积到100000条以上,则测验丢弃部分或悉数音讯,这样就能够快速追上发送音讯的速度。示例代码如下:
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
long offset = msgs.get(0).getQueueOffset();
String maxOffset =
msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
long diff = Long.parseLong(maxOffset) - offset;
if (diff > 100000) {
// TODO 音讯堆积状况的特殊处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// TODO 正常消费进程
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
手法四、优化消费代码事务逻辑
举例如下,某条音讯的消费进程如下:
这条音讯的消费进程中有4次与 DB的 交互,假如依照每次 5ms 核算,那么一共耗时 20ms,假定事务核算耗时 5ms,那么总过耗时 25ms,所以假如能把 4 次 DB 交互优化为 2 次,那么总耗时就能够优化到 15ms,即整体性能进步了 40%。所以使用假如对时延灵敏的话,能够把DB部署在SSD硬盘,相比于SCSI磁盘,前者的RT会小很多。
四、总结
音讯堆积通常由于运用姿态问题或许顾客消费速率过慢导致,其间顾客过慢的处理计划也有很多种,咱们的处理实践问题期间需求根据实践的运用场景挑选处理最快并且耗费更少资源的计划。
你有没有其他的计划呢?欢迎在谈论区和JY共享。