34.RocketMQ之Broker端消息存储流程详解


highlight: arduino-light

Broker消息存储概要设计

RocketMQ主要存储的文件包括Commitlog文件,ConsumeQueue文件,IndexFile文件。

RMQ把所有主题的消息存储在同一个文件中,确保消息发送时顺序写文件。

为了提高消费效率引入了ConsumeQueue消息队列文件,每个消息主题包含多个消息消费队列,每一个消息队列有一个消息文件。

IndexFile索引文件,其主要设计理念就是为了加速消息的检索性能,根据消息的属性快速从Commitlog文件中检索消息。

初识Broker消息存储

消息存储实现类:org.apache.rocketmq.store.DefaultMessageStore,它是存储模块里面最重要的一个类,包含了很多对存储文件的操作API。 java // 消息存储配置属性     private final MessageStoreConfig messageStoreConfig; // CommitLog 文件的存储实现类 private final CommitLog commitLog; // 消息队列存储缓存表,按消息主题分组 private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable; // ConsumeQueue刷盘线程 private final FlushConsumeQueueService flushConsumeQueueService; // 清除CommitLog文件服务 private final CleanCommitLogService cleanCommitLogService; // 清除ConsumeQueue文件服务 private final CleanConsumeQueueService cleanConsumeQueueService; // 索引文件实现类 private final IndexService indexService; // MappedFile分配服务 private final AllocateMappedFileService allocateMappedFileService; // CommitLog消息分发,根据CommitLog文件构建ConsumeQueue,IndexFile文件 private final ReputMessageService reputMessageService; // 存储HA机制 private final HAService haService; // 消息堆内存缓存 private final TransientStorePool transientStorePool; // 消息拉取长轮询模式消息达到监听器 private final MessageArrivingListener messageArrivingListener; // Broker属性配置类 private final BrokerConfig brokerConfig; // 文件刷盘监测点 private StoreCheckpoint storeCheckpoint; // CommitLog文件转发请求 private final LinkedList<CommitLogDispatcher> dispatcherList;

消息发送存储流程

Step1:如果Broker停止工作或者为Slave,则拒绝消息写入。

如果消息长度超过256个字符,消息属性长度超过65536个字符将拒绝改消息的写入。

Step2:如果消息的延迟级别大于0,将消息的原主题名称与原消息队列ID存入消息属性中,用延迟消息主题SCHEDULE_TOPIC、消息队列ID更新原先消息的主题与队列,这是并发消息消费重试关键的一步。

Step3:获取当前可以写入的Commitlog文件。每一个文件默认1G,一个文件写满后再创建另外一个,以该文件中第一个偏移量为文件名,偏移量小于20位用0补齐。第一个文件初始偏移量为0,第二个文件的偏移量是1073741824,代表该文件中的第一条消息的物理偏移量为1073741824,这样就能快速定位到消息。

Step4:对于rocketmq来说,存储消息的主要文件被称为CommitLog,因此就从该类入手。处理存储请求的入口方法是asyncPutMessage,主要流程如下: java public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { //可能会有多个线程并发请求,虽然支持集群,但是对于每个单独的broker都是本地存储,所以内存锁就足够了 putMessageLock.lock(); try { //获取最新的文件 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); ... //如果文件为空,或者已经存满,则创建一个新的commitlog文件 if (null == mappedFile || mappedFile.isFull()) { mappedFile = this.mappedFileQueue.getLastMappedFile(0); } ... //调用底层的mappedFile进行出处,但是注意此时还没有刷盘 result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); ... } finally { putMessageLock.unlock(); } PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); } 在写入CommitLog之前,先申请putMessageLock,也就是将消息存储到CommitLog文件中是串行的。

可能会有多个线程并发请求,虽然支持集群,但是对于每个单独的broker都是本地存储,所以单机内存锁就足够。

接口PutMessageLock java public interface PutMessageLock { void lock(); void unlock(); } 重入实现PutMessageReentrantLock ```java public class PutMessageReentrantLock implements PutMessageLock { private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync

@Overridepublic void lock() {putMessageNormalLock.lock();}@Overridepublic void unlock() {putMessageNormalLock.unlock();}
}

**自旋实现PutMessageSpinLock** java public class PutMessageSpinLock implements PutMessageLock { //true: Can lock, false : in lock. private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);

@Overridepublic void lock() {boolean flag;do {flag = this.putMessageSpinLock.compareAndSet(true, false);}while (!flag);}@Overridepublic void unlock() {this.putMessageSpinLock.compareAndSet(false, true);}
}

``` 异步刷盘建议用自旋锁,同步刷盘建议用重入锁:useReentrantLockWhenPutMessage默认false使用自旋锁;

异步刷盘建议开启TransientStorePoolEnable;建议关闭transferMsgByHeap,提高拉消息效率;

同步刷盘建议适当增大sendMessageThreadPoolNums,具体配置需要经过压测。

自旋锁

互斥同步对性能最大的影响是阻塞的实现,挂起线程和恢复线程的操作都需要转入内核态中完成,这些操作给Java虚拟机的并发性能带来了很大的压力,共享数据的锁定状态只会持续很短的一段时间,为了这段时间去挂起和恢复线程并不值得。

如果物理机器有一个以上的处理器或者处理器核心,能让两个或以上的线程同时并行执行,我们就可以让后面请求锁的那个线程“稍等一会”,但不放弃处理器的执行时间,看看持有锁的线程是否很快就会释放锁。为了让线程等待,我们只须让线程执行一个循环(自旋),这项技术就是所谓的自旋锁。

自旋等待本身虽然避免了线程切换的开销,但它是要占用处理器时间的,所以如果锁被占用的时间很短,自旋等待的效果就会非常好,反之如果锁被占用的时间很长,那么自旋的线程只会白白消耗处理器资源,而不会做任何有价值的工作,这就会带来性能的浪费。

也就是说我们要权衡自旋等待、线程的用户态与内核态切换的开销,哪个更大?

重入锁

当一个线程要获取一个被其他线程持有的独占锁时,该线程会被阻塞,那么当一个线程再次获取它自己己经获取的锁时是否会被阻塞呢?如果不被阻塞,那么我们说该锁是可重入的。

了解了自旋锁和可重入锁,我们在看看如何回答:为什么建议“异步刷盘建议使用自旋锁,同步刷盘建议使用重入锁”?

同步刷盘时,锁竞争激烈,会有较多的线程处于等待阻塞等待锁的状态,如果采用自旋锁会浪费很多的CPU时间,所以“同步刷盘建议使用重入锁”。

异步刷盘是间隔一定的时间刷一次盘,锁竞争不激烈,不会存在大量阻塞等待锁的线程,偶尔锁等待就自旋等待一下很短的时间,不要进行上下文切换了,所以采用自旋锁更合适。

Step5:设置消息的存储时间,如果mappedFile为空,表明 /commitlog 目录下不存在任何文件,说明本次消息是第一次消息发送,用偏移量0创建第一个 commit 文件。 java messageExtBatch.setStoreTimestamp(beginLockTimestamp); if (null == mappedFile || mappedFile.isFull()) { mappedFile = this.mappedFileQueue.getLastMappedFile(0); } if (null == mappedFile) { log.error("Create mapped file1 error"); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); } Step6:将消息追加到 MappedFile 中。首先先获取 MappedFile 当前写指针,如果currentPos大于或等于文件大小则表明文件已写满,抛出UNKNOWN_ERROR。如果currentPos小于文件大小,通过 slice() 方法创建一个与MappedFile的贡献内存区,并设置position为当前指针。 ```java int currentPos = this.wrotePosition.get();

if (currentPos < this.fileSize) { ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result; if (messageExt instanceof MessageExtBrokerInner) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); } else if (messageExt instanceof MessageExtBatch) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } this.wrotePosition.addAndGet(result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; } **Step7:创建全局唯一消息ID,消息ID有16字节,消息ID前4字节是IP,中间4字节是端口号,最后8字节是消息偏移量。** java long wroteOffset = fileFromOffset + byteBuffer.position(); this.resetByteBuffer(storeHostHolder, storeHostLength); String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset); ``` 但是为了消息ID可读性,返回给应用程序的msgId为字符类型,可以通过UtilAll.byte2string方法将msgId字节数组转换成字符串,通过UtilAll.string2bytes方法将msgId字符串还原成16个字节的字节数组,从而根据提取消息偏移量,可以快速通过msgId找到消息内容。

Step8:获取该消息在消息队列的偏移量,CommitLog中保存了当前所有消息队列的当前待写入偏移量 java keyBuilder.setLength(0); keyBuilder.append(msgInner.getTopic()); keyBuilder.append('-'); keyBuilder.append(msgInner.getQueueId()); String key = keyBuilder.toString(); Long queueOffset = CommitLog.this.topicQueueTable.get(key); if (null == queueOffset) { queueOffset = 0L; CommitLog.this.topicQueueTable.put(key, queueOffset); } Step9:根据消息体的长度、主题的长度、属性的长度结合消息存储格式计算消息的总长度。 ```java protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {

final int msgLen =4   //TOTALSIZE:该消息条目总长度,4字节+ 4 //MAGICCODE:魔数,4字节。固定值0xdaa320a7+ 4 //BODYCRC:消息体crc校验码,4字节+ 4 //QUEUEID:消息消息队列ID,4字节+ 4 //FLAG:消息FLAG,RMQ不做处理,供应用程序使用,默认4字节+ 8 //QUEUEOFFSET:消息在消息消费队列的偏移量,8字节+ 8 //PHYSICALOFFSET:消息在CommitLog文件中偏移量,8字节+ 4 //SYSFLAG:消息系统Flag,例如是否压缩,是否是事务消息等,4字节+ 8 //BORNTIMESTAMP:消息生产者调用消息发送API的时间戳,8字节+ bornhostLength //BORNHOST:发送者IP,端口,8字节+ 8 //STORETIMESTAMP:消息存储时间戳,8字节+ storehostAddressLength //STOREHOSTADDRESS:BrokerIP和端口,8字节+ 4 //RECONSUMETIMES:消息重试次数,4字节+ 8 //Prepared Transaction Offset:事务消息物理偏移量,8字节+ 4 + (bodyLength > 0 ? bodyLength : 0) //BODY:消息体长度,4字节//TOPIC:主题存储长度,1字节+ 1 + topicLength//propertiesLength:消息属性,长度为PropertiesLength中存储的值+ 2 + (propertiesLength > 0 ? propertiesLength : 0)+ 0;return msgLen;}

``` 上述表明CommitLog条目是不定长的,每一个条目的长度存储在前4个字节中。

Step10:如果消息长度 + END_FILE_MIN_BLANK_LENGTH 大于 CommitLog文件的空闲空间,则返回AppendMessageStatus.END_OF_FILE,Broker会创建一个新的CommitLog文件来存储该消息。

从这里可以看出,每个CommitLog文件最少会空闲8字节,高4字节存储当前文件剩余空间,低四字节存储魔数:CommitLog.BLANK_MAGIC_CODE。 ```java final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // Write messages to the queue buffer byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); Step11:将消息内容存储到ByteBuffer中,然后创建AppendMessageResult。把消息存储在MappedFile对应的内存映射Buffer中,并没有刷写到磁盘。 java // Return code private AppendMessageStatus status; // Where to start writing private long wroteOffset; // Write Bytes private int wroteBytes; // Message ID private String msgId; // Message storage timestamp private long storeTimestamp; // Consume queue's offset(step by one) private long logicsOffset; private long pagecacheRT = 0; private int msgNum = 1; //批量发送消息条数

/*** When write a message to the commit log, returns code*/
public enum AppendMessageStatus {PUT_OK,END_OF_FILE, //超过文件大小MESSAGE_SIZE_EXCEEDED, //消息长度超过最大允许长度PROPERTIES_SIZE_EXCEEDED, //消息属性超过最大允许长度UNKNOWN_ERROR,
}

``` Step12:更新消息队列逻辑偏移量

Step13:处理完消息追加逻辑后将释放putMessageLock锁

Step14:DefaultAppendMessageCallback#doAppend只是将消息追加在内存中,需要根据同步刷盘还是异步刷盘方式,将内存中的数据持久化道磁盘,然后执行HA主从同步复制。

存储文件组织与内存映射 RMQ通过使用内存映射文件来提高IO访问性能,无论是CommitLog、ConsumeQueue还是IndexFile,单个文件都被设计成固定长度,如果一个文件写满后再创建一个新文件,文件名就是第一条消息的偏移量。

RocketMQ使用MappedFile、MappedFileQueue来封装存储文件,一个MappedFileQueue可以包含多个MappedFile。

Broker收到消息之后的处理流程:简单来说,roker通过Netty网络服务器获取到条消息,接着就会把这条消息写入到一个Commitlog文件里去,一个Broker机器上就只有一个CommitLog文件,所有Topic的消息都会写入到一个文件里去。

然后还会以异步的方式把消息写入到ConsumeQueue文件里去,因为一个Topic有多个MessageQueue,任何条消息都是写入一个MessageQueue的,那个MessaqeQueue其实就是对应了一个ConsumeQueue文件 所以一条写入MessageQueue的消息,必然会异步进入对应的ConsumeQueue文件。

同时还会异步把消息写入一个ndexFile里,在里面主要就是把每条消息的key和消息在CommitLog中的offset偏移量做一个索引,这样后续如果要根据消息key从CommitLog文件里查询消息,就可以根据IndexFile的索引来了。

接着我们来一步一步分析一下这里写入这几个文件的一个流程。

首先Broker收到一个消息之后,必然是先写入CommitLog文件的,那么这个CommitLog文件在磁盘上的目录结构大致如何呢? 看下面 CommitLog文件的存储目录是在SROCKETMQ HOME/store/commitlog下的,里面会有很多的CommitLog文件,每个文件默认是1GB大小,一个文件写满了就创建一个新的文件,文件名的话,就是文件中的第一个偏移量,如人面所示。文件名如果不足20位的话,就用0来补齐就可以了。 000000000000000000000 000000000003052631924 在把消息写入CommitLog文件的时候,会申请 个putMessageLock锁 也就是说,在Broker上写入消息到CommitLog文件的时候,都是事行的,不会让你并发的写入,并发写入文件必然会有数据错乱的问题,下面是源码片段 java protected final PutMessageLock putMessageLock; putMessageLock .lock()

接着其实会对消息做出一通处理,包括设置消息的存储时间、创建全局唯一的消息D、计算消息的总长度,然后会走段很关键的源码,把消息写入到MappedFile里去,这个其实我们之前还讲解过里面的黑科技,看下面的源码。

java if (messageExt instanceof MessapeExtBrokerInner) { result = cb.doAppend(this.getFileFromoffset(), byteBuffer, this.filesize - currentpos, (MessapeExtBrokerInner) messageExt); }

上面源码片段中,其实最关键的是cb.doAppend这行代码,这行代码其实是把消息追加到MappedFile映射的一块内存里去,并没有直接刷入磁盘中.

至于具体什么时候才会把内存里的数据刷入磁盘,其实要看我们配置的刷盘策略,另外就是不管是同步刷盘还是异步刷盘,假设你配置了主从同步,一旦你写入完消息到CommitLog之后,接下来都会进行主从同步复制的。

Broker收到一条消息之后,其实就会直接把消息写入到Commitlod里去,但是他写入刚开始仅仅是写入到MappedFile映射的一块内存里去,后续是根据刷盘策略去决定是否立即把数据从内存刷入磁盘的.

这个消息写入CommitLog之后,然后消息是如何进入ConsumeQueue和IndexFile的。 实际上,Broker启动的时候会开启一个线程,ReputMessageService,他会把CommitLog更新事件转发出去,然后让任务处理器去更新ConsumeQueue和IndexFile.

我们看下面的源码片段,在DefaultMessageStore的start方法里,在里面就是启动了这个ReputMessageService线程。 这个DefaultMessageStore的start方法就是在Broker启动的时候调用的,所以相当于是Broker启动就会启动这个线程.

image.png

下面我们看这个ReputlMessageService线程的运行逻辑,源码片段如下所示

image.png

也就是说,在这个线程里,每隔1毫秒,就会把最近写入CommitLog的消息进行一次转发,转发到ConsumeQueue和ndexFile里去,通过的是doReput0方法来实现的,我们再看doReput方法里的实现逻辑,先看下面源码片段.

image.png

这段代码意思非常的清晰明了,就是从commitLog中去获取到一DispatchRequest拿到了一份需要进行转发的消息,也就是从CommitLog中读取的。

接着他就会通过下面的代码,调用doDispatch0方法去把消息进行转发,一个是转发到ConsumeQueue里去,一个是转发到IndexFile里去 大家看下面的源码片段,里面走了CommitLogDispatcher的循环

image.png

实际上F常来说这个CommitLoaDispatcher的实现类有两,分别是CommitLoaDispatcherBuildConsumeQueue和CommitLoaDispatcherBuildlindex,他们俩分别会负责把消息转发到ConsumeQueue和IndexFile。

接着我们看一下ConsumeQueueDispatche的源码实现逻辑,其实非常的简单,就是找到当前Topic的messageQueueld对应的一个ConsumeQueue文件 个MessageQueue会对应多个ConsumeQueue文件,找到一个即可,然后消息写入其中

image.png

再来看看indexFile的写入逻辑,其实也很简单,无非就是在ndexFile里去构建对应的索引罢了,如下面的源码片段

image.png

因此到这里为止,我想大家基本就看明白了,当我们把消息写入到CommitLog之后,有一个后台线程每隔1毫秒就会去拉取CommitLog中最新更新的一批消息,然后分别转发到ConsumeQueue和ndexFile里去,这就是他底层的实现原理

ConsumerQueue 消息消费队列是专门为消息订阅构建的索引文件,提高根据主题与消息队列检索消息的速度。

IndexFile主要用于根据Message Key和Unique Key检索消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/19880.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

云原生(第六篇)k8s-kubeadmin部署

master&#xff08;2C/4G&#xff0c;cpu核心数要求大于2&#xff09; 192.168.169.10 docker、kubeadm、kubelet、kubectl、flannel node01&#xff08;2C/2G&#xff09; 192.168.169.30 docker、kubeadm、kubelet、kubect…

如何解决git中拉取或提交代码出现的ssl证书问题?

问题描述 执行命令的时候&#xff0c;出现"…certificate problem…"报错&#xff0c;一般在执行"git push“ (推送分支) 或者 “git clone”(克隆仓库)时出现&#xff0c;原因时因为SSL安全验证问题&#xff0c;不能获取到本地的的证书。那么如何解决这个问题…

SpringBoot 如何使用 MockMvc 进行 Web 集成测试

SpringBoot 如何使用 MockMvc 进行 Web 集成测试 介绍 SpringBoot 是一个流行的 Java Web 开发框架&#xff0c;它提供了一些强大的工具和库&#xff0c;使得开发 Web 应用程序变得更加容易。其中之一是 MockMvc&#xff0c;它提供了一种测试 SpringBoot Web 应用程序的方式&…

Microsoft Remote Desktop for mac安装教程

适用于Mac的Microsoft远程桌面测试版&#xff01;Microsoft Remote Desktop Beta for Mac是一种远程工具&#xff0c;允许用户从Mac远程访问基于Windows的计算机。使用此工具&#xff0c;用户可以随时随地使用Mac连接到远程桌面、应用程序和资源。 Microsoft Remote Desktop B…

2023年Arm最新处理器架构分析——X4、A720和A520

1、引言 上一篇文章我们介绍了Arm的Cortex-X1至Cortex-X3系列处理器&#xff0c;2023年的5月底&#xff0c;Arm如期发布了新一年的处理器架构&#xff0c;分别为超级大核心Cortex-X4&#xff0c;大核心A720和小核心A520。在智能手机行业&#xff0c;Arm始终保持每年一迭代的处…

【GCD+MST】ABC210 E

这道题告诉我们&#xff0c;一道题一定要去手摸样例&#xff0c;多造几个数据&#xff0c;然后找思路 很多时候&#xff0c;题目看错了&#xff0c;码完发现思路错了&#xff0c;调半天调不出来&#xff0c;思路一直在旧框架打转&#xff0c;这些情况都是不去考察实际情况导致…

超速Python编程:利用缓存加速你的应用程序

引言 在软件开发中&#xff0c;缓存是一种常用的技术&#xff0c;用于提高系统性能和响应速度。Python提供了多种缓存技术和库&#xff0c;使我们能够轻松地实现缓存功能。本文将带您从入门到精通&#xff0c;逐步介绍Python中的缓存使用方法&#xff0c;并提供实例演示。 1.…

《MySQL》复合查询和连接

文章目录 查询单行子查询多行子查询合并查询 连接内连接外连接 点睛之笔&#xff1a;无论是多表还是单表&#xff0c;我们都可以认为只有一张表。 只要是表&#xff0c;就可以查询和连接成新表&#xff0c;所以select出来的结果都可以认为成一张表&#xff0c;既然是一张表&…

WAIC2023会后记

听了3天WAIC的会&#xff0c; 大开眼界&#xff0c;算是上了堂大课。 本次参会的目的是听听AI企业信息化的想法、理论和实践。以进一步探索可能的业务场景。三天的会结束后&#xff0c;留下深刻印象的有如下几点。 大模型当道 2023这次大会的主题成了大模型&#xff0c;谈的…

ElasticSearch学习笔记一——下载及安装

最近发现ES是个很重要的内容啊&#xff0c;各种大厂都会使用ES来做一些大范围的搜索之类的功能&#xff0c;所以今天我们也来学习一下。 首先我们要准备Java的环境&#xff0c;推荐版本8、11、14 ES官方的JDK兼容性列表(有些慢&#xff0c;需要耐心等待一下哈) 在我写文章时&…

iview-admin使用小结

首先在使用一个框架之前一定要完整的看一下相关文档&#xff0c;因为框架中会封装常用的功能&#xff0c;也会更加符合大众要求。在ui设计图上&#xff0c;可能实现某个功能设计图中给出的交互并不是很好&#xff0c;而在框架中有更好的组件可以实现&#xff0c;但因为没有看文…

Python中的迭代器

一、介绍 在Python中&#xff0c;迭代器是一种访问集合元素的方式&#xff0c;可以用于遍历数据集中的元素&#xff0c;而不需要事先知道集合的大小。迭代器可以被用于循环语句中&#xff0c;例如for循环&#xff0c;来遍历集合中的每个元素。 Python中的迭代器是一个实现了迭…