RocketMQ系统性学习-RocketMQ高级特性之消息存储在Broker的文件布局

🌈🌈🌈🌈🌈🌈🌈🌈
【11来了】文章导读地址:点击查看文章导读!
🍁🍁🍁🍁🍁🍁🍁🍁

RocketMQ 高级特性

消息在 Broker 的文件布局

RocketMQ 的混合存储

在 RocketMQ 存储架构中,采用混合存储,其中有 3 个重要的存储文件:Commitlog、ConsumeQueue、IndexFile

  • Topic 的消息实体存储在 Commitlog 中,顺序进行写入
  • ConsumeQueue 可以看作是基于 Topic 的 Commitlog 的索引文件,在 ConsumeQueue 中记录了消息在 Commitlog 中的偏移量、消息大小的信息,用于进行消费
  • IndexFile 提供了可以通过 key 来查询消息的功能,key 是由 topic + msgId 组成的,可以很方便地根据 key 查询具体的消息

消费者去 Broker 中消费数据流程如下:

  1. 先读取 ConsumeQueue,拿到具体消息在 Commitlog 中的偏移量
  2. 通过偏移量在 Commitlog 读取具体 Topic 的信息

消费者去寻找 Commitlog 中的数据流程图如下:

在这里插入图片描述

那么先来看一下 Commitlog 文件在哪里进行写入

SendMessageProcessor # processRequest 作为入口,

经过层层调用 this.sendMessage() -> this.brokerController.getMessageStore().putMessage(msgInner) -> DefaultMessageStore # asyncPutMessage ,最终到达 asyncPutMessage() 方法中,在这里会进行消息的磁盘写的操作:

  1. 创建消息存储所对应的 ByteBuffer:putMessageThreadLocal.getEncoder().encode(msg)

    在这个方法中,会对 Commitlog 文件进行写入:

    在这里插入图片描述

    这里的 byteBuffer 也就是 Commitlog 文件的结构如下:

    在这里插入图片描述

  2. 将创建的 ByteBuffer 设置到 msg 中去: msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer())

  3. 开始向文件中追加消息: result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext)

appendMessage 方法中主要是写入消息之后,Commitlog 中一些数据会发生变化,因此需要进行修改,还是经过层层调用 appendMessage()-> appendMessagesInner()-> cb.doAppend(),最终到达 doAppend 方法,接下来看这个方法都做了些什么:

  1. 首先取出来在上边创建消息对应的 ByteBuffer:ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff()

  2. 接下来修改这个 ByteBuffer 中的一些数据:

    这个 ByteBuffer 在创建的时候已经将一些默认信息设置好了,这里只需要对写入消息后会变化的信息进行修改!

    • 先修改 QueueOffset (偏移量为 20 字节):preEncodeBuffer.putLong(20, queueOffset)
    • 再修改 PhysiclOffset (偏移量为 28 字节):preEncodeBuffer.putLong(28, fileFromOffset + byteBuffer.position())
    • 再修改 SysFlag、BornTimeStamp、BornHost 等等信息,都是通过偏移量在 ByteBuffer 中进行定位,再修改

那么通过上边就 完成了对 Commitlog 文件的追加操作 ,ReputMessageService 线程中的 run 方法,会每隔 1ms 就会去 Commitlog 中取出数据,写入到 ConsumeQueue 和 IndexFile 中

那么接下来寻找写 ConsumerQueue 的地方,也是通过调用链直接找到核心方法:

  • DefaultMessageStore # ReputMessageService # run
  • -> this.doReput()
  • -> DefaultMessageStore.this.doDispatch(dispatchRequest)
  • -> dispatcher.dispatch(req)
  • -> 这里进入到构建 ConsumeQueue 类的 dispatch 方法中:CommitLogDispatcherBuildConsumeQueue # dispatch()
  • -> DefaultMessageStore.this.putMessagePositionInfo(request)
  • -> this.consumeQueueStore.putMessagePositionInfoWrapper(dispatchRequest)
  • -> this.putMessagePositionInfoWrapper(cq, dispatchRequest)
  • -> consumeQueue.putMessagePositionInfoWrapper(request)
  • -> this.putMessagePositionInfo()

这个调用链比较长,如果不想一步一步点的话,直接找到 ConsumeQueue # this.putMessagePositionInfo() 这个方法即可,在这个方法中向 byteBufferIndex 中放了 3 个数据,就是 ConsumeQueue 的组成 = Offset + Size + TagsCode

在这里插入图片描述

那么 ConsumeQueue 的组成结构就如下所示,通过 ConsumeQueue 主要用于寻找 Topic 下的消息在 Commitlog 中的位置:

在这里插入图片描述

IndexFile 主要是通过 Key(Topic+msgId) 来寻找消息在 Commitlog 中的位置

接下来看一下 IndexFile 结构是怎样的,在上边寻找 ConsumeQueue 的调用链中,有一个 dispatcher.dispatch() 方法,这次我们进入到构建 IndexFile 的实现类的 dispatch 方法中,即:CommitLogDispatcherBuildIndex # dispatch(),那么接下来还是经过调用链到达核心方法:

  • CommitLogDispatcherBuildIndex # dispatch()
  • -> DefaultMessageStore.this.indexService.buildIndex(request)
  • -> indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()))
  • -> indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp())

那么核心方法就在 IndexFile # putKey() 中:

  1. 首先根据 key 计算出哈希值,key 也就是 Topic + 消息的 msgId

  2. 再通过哈希值对哈希槽的数量取模,计算出在哈希槽中的相对位置:slotPos = keyHash % this.hashSlotNum

  3. 计算 key 在 IndexFile 中的绝对位置,通过 哈希槽的位置 * 每个哈希槽的大小(4B) + IndexFile 头部的大小(40B)

    代码即:

    absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize

  4. 计算索引在 IndexFile 中的绝对位置,通过 absIndexPos = IndexFile 头部大小(40B) + 哈希槽位置 * 哈希槽大小(4B) + 消息的数量 * 消息索引的大小(20B)

    int absIndexPos =IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize+ this.indexHeader.getIndexCount() * indexSize;
    
  5. 向 IndexFile 的第三部分(索引列表)中放入数据的索引,索引包含 4 部分,共 20B:keyHash、phyOffset、timeDiff、slotValue

    在这里插入图片描述

  6. 向 IndexFile 的第二部分(哈希槽)中放入数据

    在这里插入图片描述

IndexFile 的结构如下图所示:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/295356.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【go-zero】 go-zero API 如何接入 Nacos 被 java 服务调用 | go集成java服务

一、场景 外层使用的是springcloud alibaba 这一套java的分布式架构 然后需要接入go-zero的api服务 这里我们将对api服务接入Nacos进行一个说明 二、实战 1、package 因为使用的是go-zero框架 这里我们会优先使用go-zero生态的包 github 包如下: github.com/nacos-group/naco…

yarn : 无法将“yarn”项识别为 cmdlet、函数、脚本文件或可运行程序的名称。‘yarn‘ 不是内部或外部命令,也不是可运行的程序.解决方案

文章目录 报错截图介绍方法一方法二评论截图 报错截图 介绍 我的npm已经安装好了, 是可以运行npm -v 来查看版本的 这个时候报 yarn 不是内部或外部命令 相信你的npm也已经安装好了 我下面两个方法都进行了, 具体起作用的我也不知道是哪个, 都试试吧, 我成功了 注意尝试后关…

python区块链简单模拟【01】

完整代码 https://gitee.com/ihan1001 https://github.com/ihan1001 重点:时间戳,MD5哈希,SHA256哈希,base64一种用64个字符表示任意二进制数据的方法,ECC椭圆曲线算法 import time time.time()datetime.now().strfti…

WPF中使用ListView封装组合控件TreeView+DataGrid-粉丝专栏

wpf的功能非常强大,很多控件都是原生的,但是要使用TreeViewDataGrid的组合,就需要我们自己去封装实现。 我们需要的效果如图所示: 这2个图都是第三方控件自带的,并且都是收费使用。 现在我们就用原生的控件进行封装一…

使用Springboot做测试的步骤详解

​ 📢专注于分享软件测试干货内容,欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!📢交流讨论:欢迎加入我们一起学习!📢资源分享:耗时200小时精选的「软件测试…

CV算法面试题学习

本文记录了CV算法题的学习。 CV算法面试题学习 点在多边形内(point in polygon)高斯滤波器 点在多边形内(point in polygon) 参考自文章1,其提供的代码没有考虑一些特殊情况,所以做了改进。 做法&#xff…

Zookeeper-应用实战

Zookeeper Java客户端实战 ZooKeeper应用的开发主要通过Java客户端API去连接和操作ZooKeeper集群。 ZooKeeper官方的Java客户端API。 第三方的Java客户端API,比如Curator。 ZooKeeper官方的客户端API提供了基本的操作:创建会话、创建节点、读取节点、更新数据、…

【Unity基础】9.地形系统Terrain

【Unity基础】9.地形系统Terrain 大家好,我是Lampard~~ 欢迎来到Unity基础系列博客,所学知识来自B站阿发老师~感谢 (一)地形编辑器Terrain (1)创建地形 游戏场景中大多数的山川河流地表地貌都是基…

16-高并发-队列术

队列,在数据结构中是一种线性表,从一端插入数据,然后从另一端删除数据。 在我们的系统中,不是所有的处理都必须实时处理,不是所有的请求都必须实时反馈结果给用户,不是所有的请求都必须100%一次性处理成功…

数据结构(超详细讲解!!)第二十八节 排序

1.排序的几个基本概念 排序就是将一个数据元素(或记录)的任意序列,重新排列成一个按关键字有序的序列。 数据表(Data List) :待排序的数据对象的有限集合。 关键字(Key):数据元素(或记录)中…

GPT2代码运行,个人文本生成助手,不依赖OpenAI API调用

0.前言: 感觉GPT很好玩,所以想要有个自己搭建GPT的写法,不依赖于OpenAI,需要翻墙太麻烦了,近日日本已经结合GPT4和机器,可以让他吓人,做出丰富的表情,如果自己训练的话,会塑造出什么样的机器人尚未可知…抱着好奇的心态,去github openai下载了个gpt2的模型来玩玩(其中遇到了许多…

HackTheBox - Medium - Linux - Sandworm (我的创作纪念日

Sandworm Sandworm 是一台中等难度的 Linux 机器,它托管了一个具有“PGP”验证服务的 Web 应用程序,该服务容易受到服务器端模板注入 (SSTI) 的攻击,导致“Firejail”监狱内的远程代码执行 (RCE&#xff0…