RocketMQ 5.X PopAck 源码拆解

目录

一、RocketMQ 5.X 架构

  1. RocketMQ 5.X 架构
  2. RocketMQ 5.X 为什么发明 Pop

二、 Pop流程

  1. Pop 流程
  • 锁 Consumer Queue
  • 计算 Pop Offset
  • 读取消息
  • 添加 Check Point
  • 释放 Consumer Queue 锁
  1. Pop 关键数据结构介绍
  • Pop Offset
  • Check Point
  • ReceiptHandle
  • StartOffsetInfo
  • MsgOffsetInfo
  • OrderCountInfo

三、Ack流程

  • Proxy 提交 Ack 请求
  • 内存标记消费进度
  • 持久化 Ack 到 Revive Topic
  • 异步标记消费进度
  • 可见时间过后,消息恢复消费

四、结语

RocketMQ 5.X 架构

RocketMQ 已经开启 5.X 时代,4.X 已成为 LTS 版本。

各大云厂商也推出支持 RocketMQ 5.X 版本的产品,在介绍 Pop,Ack 之前需要先了解 RocketMQ 5.X 的架构。

1. RocketMQ 5.X 架构

image.png

如上图,在 RocketMQ 5.X 的架构中,新增组件如下:

  • 02 Controller:控制器,帮助 Broker 做主从切换。

  • 04 Proxy:RocketMQ 的代理服务,支持 gRPC 协议客户端、Remoting 客户端收发消息。

  • 05 gRPC Client:RocketMQ 5.X 的新客户端,使用 gRpc 协议访问RocketMQ Proxy。

注意:社区很多朋友咨询这个客户端是否可以访问 4.X 集群,答案是不支持。

5.X 其实有2个客户端,优先推荐使用 gRPC 客户端:

5.X gRPC客户端源码: https://github.com/apache/rocketmq-clients

5.X Remoting客户端源码:

https://github.com/apache/rocketmq/tree/develop/client

5.X gRPC 客户端使用 gRPC 协议访问 Proxy,5.X Remoting 客户端可以使用 Remoting 协议访问 Namesrv 和 Proxy。

其余组件是 RocketMQ 4.X 的原组件。想要了解更多 5.X 和 4.X 的差别,请看:RocketMQ 5.0 vs 4.9.X 图解架构对比。

2. RocketMQ 5.X为什么发明 Pop

Pop主要解决 Push、Pull 消费者常见的4种问题:

  • 消费者卡住问题

下图是 Push 消费者订阅 Consumer Queue 的情况,如果 Push Client 2客户端由于 GC 等原因执行特别慢,此时 Broker 1-1 中 Queue 1,Broker2-1 中的Queue1 将出现堆积。

图片

下图是 Pop 消费者订阅 Consumer Queue 的情况:

图片

从上图可以看出来,每个 Pop Client 消费全部 Broker 的全部 Consumer Queue。
如果 Pop Client2 卡住了,其他的 Pop Client 会消费全部的 Consumer Queue,在 Push 消费中 Queue 由于消费卡住或者无人消费而堆积的问题得到解决。

  • 负载均衡慢的问题

如果出现 Push Consumer 卡住,或者 GC 导致消费慢,此时我们一般通过重启消费者程序来临时解决。

消费者重启后 Reblance,消费者数量越多,Reblance 花费的时间越长,在 Reblance 时消费者无法消费消息。

Pop 消费时,一个消费者的上线、下线不会触发 Reblance,故而不存在负载均衡慢的问题。

  • 有状态变无状态

Pop 是无状态消费者。在云原生大环境下,无状态服务在扩/缩容时更方便、快捷。

  • 消费实例数上限问题

Push 消费者的消费者个数最大不能超过 Consumer Queue 的个数,在 Pop 时不再有这个限制,但是也有自身限制。

Broker 在 Pop 时通过 Lock Consumer Queue 实现 Pop 消息,多个 Pop 消费者客户端进行锁竞争的时间和 Pop 消费者的个数成此起彼伏的形态,故而 Pop 消费者个数不能无限扩张。

如何锁 Consumer Queue,接下来讲解。

Pop 流程

在 Broker 中,Pop 的实现代码从

PopMessageProcessor.processRequest(ChannelHandlerContext, RemotingCommand) 方法开始的。

图片

1. Pop 流程

图片

笔者把 Pop message 主要分为5个流程,这里忽略数据校验、Pop 前参数准备。

这里大家直接看代码 PopMessageProcessor.processRequest()中调用了 PopMsgFromQueue()方法,此方法为 Pop 消息的实现关键,笔者将其分解为5步:

1、锁 Consumer Queue

图片

在 Pop 消息的时候,调用 QueueLockManager.tryLock(lockKey) 方法,实现加锁。 锁 Key 的格式如下:

String lockKey =topic + // topic名字            PopAckConstants.SPLIT +  // 分隔符            requestHeader.getConsumerGroup() + // 消费者组            PopAckConstants.SPLIT +  // 分隔符            queueId; // consumer queue id

可以得知:一个 Consumer Queue 同时只会被同一个消费者组中的某一个1个消费者实例锁住。

如果同一个消费者组中同时有2个消费者实例来 Pop 消息,只有一个会锁成功。

这个锁的实现代码如下:

图片

关于这把锁有2点需要注意:

  • 这是一把 TimedLock。 锁是有超时时间的,超过锁的时间自动释放。通过图中标注1可以看出, 当前锁服务是一个 ServiceThread,在 RocketMQ 中这个代表这个服务是一个后台线程,会自动执行检查。

  • 高效锁。锁是通过 ConcurrentHashMap 的线程安全实现的,在一般的 Java面试中相信大家也经常被问到。

2、计算 Pop Offset

Pop Offset 表示当前需要从这个 Consumer Queue 的哪个 Offset 开始拉取消息。

在 Pop 消息流程中有2处地方计算了 Pop Offset。

图片

  • 第一处。在加锁 QueueXX 前计算 Pop Offset。在加锁失败时,会根据 Pop Offset 估算在这个 Consumer Queue 中还有多少个消息没有被消费。

也就是返回字段 :RestNum。

  • 第二处。在加锁 QueueXX 成功后计算 Pop Offset。

为什么需要重新计算?从第一次执行计算 Pop Offset 到加锁成功后这段时间可能有其他人更新了消费位点,导致第一次计算的 Pop Offset 不准确,在加锁 QueueXX 成功后,QueueXX 只会被当前客户端 Pop 消息,此时重新计算 Pop Offset 的值是准确值,根据这个再去 Store 中读取消息。

3、读取消息

读取消息调用this.brokerController.getMessageStore().getMessageAsync()方法读取,下次有机会再细聊。

4、添加 Check point

Check Point 消息(简称 CK 消息),其中记录了每次 Pop 的消息信息。

在读取完消息后,将生成一个 Check point 消息(简称 CK 消息)。CK 消息将写入一个 Buffer 中。调用代码如下:

图片

写入 Buffer 后,这些消息将进入不可见时间,也就是同一个消费者组的其他消费者实例无法再读取到,为什么呢?

在写入 Buffer 后,其他消费者实例计算 Pop Offset 时,会把 Buffer 中已经 Pop 的消息计算在内,所以就不会读取到消息。

如果不可见时间到了,用户也没有 Ack,这些被 Pop 的消息会被 Revive 服务再次恢复到用户的 Topic 中被用户消费

5、释放 Consumer queue 锁

这个没啥说的,直接看加锁的那个图。

2. Pop 关键数据结构介绍

  • Pop Offset

Pop Offset 是每个消费者实例在 Pop 消息的时候会计算的,被 Pop 的 Queue 中可以被消费的消息的起始位点。

RocketMQ 会用这个 Pop Offset 去存储中读取消息。

读取消息的流程和 4.X 差不多,这里不再赘述。

下图展示了 Pop Offset 是如何计算的:

图片

Pop Offset 的值计算有3个来源:
第一步:查询已经提交的位点。每次用户消费完成,提交消费位点后,会更新到这里。

第二步:检查重置消费位点。目前5.1.4版本的重置消费位点也会单独存储。这个是 5.X 中新增的逻辑, 如果 Broker 配置 useServerSideResetOffset=true,则通过 Admin API 可以直接重置位点, 重置的位点会临时保存,提供给 Pop 这个时候使用。

第三步:检查 Ack 提交的消费位点。一次 Pop 一般会 Pop 一批消息, 而 Ack 可能是一条一条的 Ack 的,所以需要检查当前 Ack 提交到哪条消息,已经被 Pop 而没有被 Ack 的不能再次 Pop,直到被重试或者被恢复到用户 Topic。

  • Check point

数据结构如下:

public class PopCheckPoint implements Comparable<PopCheckPoint> {
// 本次pop消息d的起始consumer queue offset
private long startOffset;  // 本次pop时的时间戳,单位毫秒
private long popTime;
// 本次pop消息d的不可见时间,单位毫秒    // 一般来自pop客户端请求的request header
private long invisibleTime;    // 特别重要    // 记录本次pop消息的ack情况
private int bitMap;
// 本次pop消息d的条数
private byte num;
// 本次pop的consumer queue id
private int queueId;    // 本次pop 的topic
private String topic;    // 本次pop 的消费者组
private String cid;
// 特别重要    // revieve topic的位点,后面详细讲解
private long reviveOffset;
// 特别重要    // 本次拉取消息d的每个消息的queue offset 减去 pop offset    // 的差值
private List<Integer> queueOffsetDiff;
// 本次pop 消息所在d的broker
private String brokerName;}

下面将一些特别重要的字段详细说明:

  • BitMap

这个字段是一个 int 类型,1个 Int 是由32个 Bit 表示,每个 Bit 其实就是0,1,RocketMQ 利用 BitMap 标记本次 Pop 的消息哪些被 Ack(标记为1),哪些未 Ack(标记为0)。具体过程详见后面讲解 Ack 的过程。

  • ReviveOffset

Revive 英文翻译是恢复的含义,那些不可见消息基础信息(非消息 Body)会保存到 Revive Topic 中,到时间后会被 Revive 服务恢复到用户的原始 Topic 中让用户再次消费。ReviveOffset 就是这个 Revive Topic 的 Consumer Queue 位点。

  • QueueOffsetDiff

QueueOffsetDiff 是一个数组,保存了本次 Pop 的每条消息的消费位点和 Pop Offset 的差值,用来辅助 RocketMQ 实现 Ack。
大家感兴趣了解详细的可以翻看源码中是如何使用的这个字段的就知道了。

  • ReceiptHandle

这个值叫一条消息的句柄,每个消息一条,Ack 的时候会给到 Broker,Broker 通过解析判断 Ack 的哪次 Pop 的哪条消息, 格式如下:

图片

通过上图我们可以知道,所谓的句柄其实是消息的一堆属性拼接起来的一个字符串。

这个字符串实际长这个样子:

图片

  • StartOffsetInfo

这个值是一次 Pop 一个值, 记录了 Pop 的起始位点信息, 实际格式如下:

图片

这个数据结构主要在 Proxy 中被用到,用来帮助构造 Pop_ck, 也就是 Pop 消息的句柄。因为数据简单, 样例大家自行 Debug 看看吧。Proxy 中使用的代码如下:

图片

上文不是说句柄 Broker 已经构造了, 为什么 Proxy 还需要再构造一次呢? 大家可以思考下。

  • MsgOffsetInfo

这个值是一次 Pop 一个值, 记录了 Pop 的每个消息的位点信息, 实际格式如下:

图片

这个数据结构主要在 Proxy 中被用到,用来帮助构造 Pop_ck, 也就是 Pop 消息的句柄。因为数据简单, 样例大家自行 Debug 看看吧。Proxy 中使用的代码如下:

图片

  • OrderCountInfo

这个值是一次 Pop 一个值, 记录了 Pop 顺序消息的每个消息 Reconsume Times,格式如下:

图片

Proxy 中使用的代码如下:

图片

通过以上核心数据结构,我们可以看出来:Broker 针对 Pop 输出了很多数据结构给 Proxy 使用。

这里笔者也有一个疑问:这些数据结构加大了 Proxy 与 Broker 的耦合逻辑,这使得 Proxy 做纯粹的无状态变得困难。

是否可以只做到接口耦合,不用做逻辑耦合?

Ack 流程

Ack 是针对 Pop 的, 一次 Pop 可以 Pop 出多条消息, 但是 Ack 的需要解决以下几种情况:

  • 用户每次 Ack 一条消息。
  • 用户每次 Ack 一批消息。
  • 用户出错了, 永远没有 Ack。

上面3种情况的结果有4种:

  • 用户 Ack 完成了这次 Pop 的全部消息。
  • 用户 Ack 完成了部分消息, 并且 Ack 的位点有空洞。
  • 用户 Ack 完成了部分消息, 并且 Ack 的位点无空洞。
  • 用户没有 Ack 任何一条消息。

基于上面的几种场景, RocketMQ Ack 是如何实现的呢?

笔者总结了 Ack 的流程:

图片

在 Broker 中, Ack 的入口是 AckMessageProcessor.processRequest() 方法,其中虚线是异步的流程,实线是同步流程。笔者将其分为以下5步。

第一步:Proxy 提交 Ack 请求

用户提交 Ack 请求,Ack 请求被 Broker 的AckMessageProcessor.processRequest(Channel, RemotingCommand, boolean)方法处理,并解析 AckMessageRequestHeader。

AckMessageRequestHeader 中包含 Pop ck 信息, 这里逻辑上区分单个消息Ack 还是批量消息 Ack:

图片

标记1: 单个消息 Ack。

标记2: 批量消息 Ack。

标记3: AppendAck() 方法是 Ack 核心逻辑,后面的全部逻辑都在这个方法中实现。

标记4: 批量执行 AppendAck() 方法。

可以看到标记4处理非原子操作是一种风险,批量提交结果未知,以最终结果一致为准。

第二步:内存标记消费进度

经过第一步后,我们知道核心逻辑在 AppendAck() 中: RocketMQ 将 Ack Request Header 解析为 AckMsg,并且调用 PopBufferMergeService.addAk() 将 AckMsg 写入 PopBufferMergeService 的缓存中。

PopBufferMergeService 顾名思义,是一个在内存中提供合并的服务。

合并什么呢, 合并 Ack和 CK 消息,也就是用 Ack 的 Consumer Queue Offset 去标记 CK 中的 BitMap。

其实就是标记一个 CK 中的哪些消息被 Ack 了,也就是标记了消费进度。

图片

下面讲解一些关键变量:
Point:是当前 Ack 对应的 Pop Check Point 对象,里面有一个 BitMap 用来标记每个消息是否被 Ack,
具体如何标记呢:

  • 假设拉取了4个消息,组成一个数组,每个“消息的下标”分别为:0,1,2,3
  • 4个消息是否消费的标记由4个“二进制标记”组成一个数组
  • 二进制标记数组,可以转化为“1个10进制数int”保存ck对象中

图示如下:

图片

我们从 Pop Check Point 对象初始化的时候可以知道, BitMap 是一个 Int,并且初始化的值为0。将0转化为二进制,可以知道每一个 Bit 都是0。

我们用这个 BitMap 的前4个 Bit 来举例说明是如何标记每条消息是否 Ack 的。

将 Int 转化为 BitMap,是一个 Bit 数组,每个数组元素的下标表示 Pop 的消息的下标。
比如 Pop 了4条消息,按照 Consumer Queue Offset 从小到大排序就会有4个 Consumer Queue Offset 的下标。

假如在时间t1 Pop 了4条消息,Consumer Queue Offset 为[100, 101, 102, 103]。
如果第一次 Ack 了100,则 BitMap 中下标=0的 Bit 设置为1。

Bit 数组的结果就是上图第一列。

图片

如果第一次 Ack 了101,则 BitMap 中下标=1的 Bit 设置为1。
Bit数组的结果就是上图第列二列。

图片

如果分别 Ack 了第一个、第三个消息,则 BitMap 的结果如上图最右一列。

图片

每次 Ack 后,BitMap 都可以转化为 Int,并且将这个 Int 保存到 Pop Check中。
这里会有3个问题

  • 全部的消息都 Ack。
  • 用户在允许的时间内没有 Ack 完成全部消息。
  • 用户 Ack 的时候, Check Point 消息已经不存在了。

这些问题在下一步会被处理。

第三步:持久化 Ack 到 Revive Topic

在上一步中, 如果消息全部被 Ack 了, 这个是正常情况, 将最终的消费位点提交到 Consumer Offset Manager 中,Consumer Offset Manager 会定时自动持久化消费位点。

如果用户在允许的时间内,没有 Ack 完成全部的消息, 此时 Pop Check Point 会被删除,这些消息用户可以继续 Pop。

图片

下面介绍了这个超时时间是如何计算的:来自 Pop 时间和不可见时间。这里可以解释不可见时间超过后, 为什么可以再次 Pop 到消息了。

图片

如果用户在 Ack 的时候, Pop Check Point 消息不存在了怎么办?
首先是为什么 Pop Check Point 会不存在?

  • 内存不能保存全部的 CK。Pop Check Point 信息会保存到内存中, 这里不可能保存全部的 Pop Check Point, Broker 提供配置 PopCkMaxBufferSize 内存最大可以保存的 Pop Check Point 数,默认20w。

超过后, Pop Check Point 消息会直接持久化到 Revieve Topic。

  • 允许时间内没有 Ack 的的 CK 需要丢弃,这个 CK 对应的全部消息全部对用户再次可见。

图片

如果 Check Point 不存在了, 则将 Ack 消息保存到 Revieve Topic 中,方便与持久化的 Pop Check Point 再次匹配标记哪些消息被 Ack 了。

图片

第四步:异步标记消费进度

经过上一步,我们知道有一些 Check Point 信息和 Ack 信息会被持久化到 Revieve Topic。

PopBufferMergeService 服务是一个后台服务, 会消费 Revieve Topic 中的 Ack、CK 信息,然后做异步匹配, 来标记 CK 信息中的用户消息哪些被 Ack 了。

图片

这里细节特别多, 建议大家 Debug 查看,这里如果需要细讲大家留言我们再出一期。

经过 Scan 后,可以知道哪些 CK 中的用户消息被全部 Ack了, 就会提交消费位点到 Consumer Queue Offset Manager:

图片

如果经过这一步,还是有 CK 没有完全被 Ack 呢?请看下一步。

第五步:可见时间过后,消息恢复消费

如果经过上一步还有 CK 没有被 Ack 完全匹配,此时这些 CK 对应的用户消息将被重新可见,用户可以重新 Pop。

图片

这个过程是在 PopReviveService 服务中实现的, 这也是一个后台服务, 会定时检查哪些 CK 没有被完全 Ack, 然后根据 CK 将这个 CK 包含的全部消息重新恢复到重试 Topic 中。

结语

PopBufferMergeService 还有大量的细节, 建议大家通过在每个关键点打日志,然后生产消费模拟 Ack 的几种情况再查看日志输出,再结合代码很快就会了解更多的细节。
结尾也留2个问题,欢迎大家讨论

  1. 同一个 Pop CK,多次重复 Ack 会出现什么情况, Broker 是如何处理的?
  2. 如果 Pop 没有读取到消息需要写 CK 信息吗, 为什么?
  3. 下期准备讲 Proxy 或者基于时间轮的任意定时消息,想看什么请留言。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/277251.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[NAND Flash] 3.3 Flash闪存工艺知识深度解析

依公知及经验整理,原创保护,禁止转载。 专栏 《深入理解NAND Flash》 <<<< 返回总目录 <<<< 全文 4400 字。 Wafer即晶圆,是半导体组件“晶片”或“芯片”的基材,从沙子里面高温拉伸生长出来的高纯度硅晶体柱(Crystal Ingot)上切下来的圆形薄片称…

计算机网络:物理层(编码与调制)

今天又学会了一个知识&#xff0c;加油&#xff01; 目录 一、基带信号与宽带信号 1、基带信号 2、宽带信号 3、选择 4、关系 二、数字数据编码为数字信号 1、非归零编码【NRZ】 2、曼彻斯特编码 3、差分曼彻斯特编码 4、归零编码【RZ】 5、反向不归零编码【NRZI】 …

Linux Zabbix企业级监控平台本地部署并实现远程访问

前言 Zabbix是一个基于WEB界面的提供分布式系统监视以及网络监视功能的企业级的开源解决方案。能监视各种网络参数&#xff0c;保证服务器系统的安全运营&#xff1b;并提供灵活的通知机制以让系统管理员快速定位/解决存在的各种问题。 本地zabbix web管理界面限制在只能局域…

Elasticsearch:相关性工作台 - BM25 及 ELSER 的相关性比较

我们知道 Elastics Learned Sparse EncoderR (ELSER) 可以被用来做语义搜索。它是一个 out-of-domain 的语义搜索模型。无需训练&#xff0c;我们就可以得到很好的相关性。有关 ELSER 的更多知识&#xff0c;请参考文章 “Elastic Learned Sparse Encoder 简介&#xff1a;Elas…

猫头虎博主深度解析:Tomcat中的`IllegalArgumentException`异常处理全攻略 ️

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

uniapp点击按钮,防止按钮多次点击多次触发事件【防抖操作】

图片、 一、在根目录下新建common文件并创建common.js文件&#xff0c;输入下面代码 // 防止处理多次点击function noMultipleClicks(methods, info) {// methods是需要点击后需要执行的函数&#xff0c; info是点击需要传的参数let that this;if (that.noClick) {// 第一次点…

活动预告 | 微盟技术沙龙 - Elasticsearch 在微盟的实践 12/21/2023

微盟技术沙龙 「微盟技术沙龙」是由微盟研发中心发起并联合各方小伙伴为开发者举办的系列技术沙龙&#xff0c;从用户&#xff0c;产品&#xff0c;技术等方面与开发者进行交流。 微盟技术沙龙关注开发者在实际应用中遇到的问题。提供最真实的干货&#xff0c;以技术会友&…

DeepStream--调试Gstreamer

DeepStream是基于Gstreamer开发的。有时候需要在Gstreamer加日志&#xff0c;比如想在rtpjitterbuffer里加日志。 首先&#xff0c;执行gst-inspect-1.0 rtpjitterbuffer命令。 从结果中可以看到&#xff0c;rtpjitterbuffer插件的源码是gst-plugins-good&#xff0c;版本是1…

Halcon参考手册异常检测知识总结

1.1异常检测介绍 本章将介绍如何使用基于深度学习的异常检测和全局上下文异常检测。通过这两种方法&#xff0c;我们想要检测图像是否包含异常(异常是指偏离正常的事物&#xff0c;未知的事物)。 异常检测或全局上下文异常检测模型学习无异常图像的共同特征。经过训练的模型将…

map 和 multimap 存储区别 、取消自动排序 unordered_map

测试代码 std::map<int, CString > Map1;Map1.insert({ 6, L"HN400*200*11*8" });Map1.insert({ 5, L"HN200*200*11*8" });Map1.insert({ 7, L"HN100*200*11*8" });Map1.insert({ 4, L"HN200*200*11*8" });Map1.insert({ 4, L…

Spring对JUnit4和junit5的支持

Junit4支持 第一步&#xff1a;准备工作&#xff1a; 引入JUnit4的依赖&#xff0c;Spring对JUnit支持的依赖还是&#xff1a;spring-test&#xff0c;如下&#xff1a; <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://ma…

致命解药,冒险游戏剧情!

致命解药&#xff1a;僵尸末日的生存探索 《致命解药》是一款由两人团队精心制作的单机冒险游戏。这款游戏的故事发生在僵尸横行的末日世界&#xff0c;人类在绝望中寻找希望&#xff0c;在黑暗中寻找光明。而玩家的使命&#xff0c;就是在这个残酷的世界中&#xff0c;寻找能…