Broker消息存储机制
RocketMQ首先将消息数据写入操作系统PageCache,然后定时将数据刷入磁盘。接下来主要分析RocketMQ是如何接收发送消息请求并将消息写入PageCache的,整个过程如图
Commit目录下有多个CommitLog文件,其实CommitLog只有一个文件,
为了方便保存和读写,被切分为多个子文件,所有的子文件通过其保存的
第一个和最后一个消息的物理位点进行连接。Broker按照时间和物理的offset顺序写CommitLog文件,每次写的时候需要加锁
1.Broker接收客户端发送消息的请求并做预处理
SendMessageProcessor.processRequest()方法会自动被调用接收、解析客户端请求为消息实例。
该方法执行分为四个过程:解析请求参数、执行发送处理前的Hook、调用保存方法存储消息、执行发送处理后的Hook
随着RocketMQ版本的迭代更新,通信层的协议也出现了不兼容的变化,比如解析请求需要根据不同的客户端请求协议版本做不同处理
2.Broker存储前预处理消息
预处理方法为SendMessageProcessor.sendMessage()
Netty是异步执行的,也就是说,请求发送到Broker被处理后,返回结果时,在客户端的处理线程已经不再时发送亲贵的线程,那么客户端如何确定返回结果对应哪个请求呢?很简单,我们可以通过返回标志来判断。
其次,做一系列存储前发送请求的数据检查,比如死信消息处理、Broker是否拒绝事务消息处理、消息基本检查等。消息基本检查方法为AbstractSendMessageProcessor.msgCheck():该方法的主要功能如下:
a.校验Broker是否配置可写
b.校验Topic名字是否为默认值
c.校验Topic配置是否存在
d.校验queueId与读写队列数是否匹配
e.校验Broker是否支持事务消息(msgCheck之后进行的校验)
3.执行DefaultMessageStore.putMessage()方法进行消息校验和存储模块检查
在真正保存消息前,会对消息数据做基本检查、对存储服务做可用性检查、对Broker做是否Slave的检查等
总结如下:
a.校验存储模块是否已经关闭
b.校验Broker是否是Slave
c.校验存储模块运行标记
d.校验Topic长度
e.校验扩展信息的长度
f.校验操作系统Page Cache是否繁忙
begin:CommitLog加锁开始时间,写CommitLog成功后,该值为0
diff:当前时间和CommitLog持有锁时间的差值
如果isOSPageCacheBusy()方法返回true,则表示当前有消息正在写入CommitLog,并且持有锁的时间超过设置的阈值
4.执行CommitLog.putMessage()方法,后面版本中将默认异步保存
存储消息的核心处理过程如下:
a.设置消息保存时间为当前时间戳,设置消息完整性校验码CRC(循环冗余码)
b.延迟消息处理.如果发送的消息是延迟消息,这里会单独设置延迟消息的
数据字段,比如修改Topic为延迟消息特有的Topic–SCHEDULE_TOPIC_XXX,并且备份原来的Topic和queueId,以便延迟消息在投递后被消费者消费
c.获取最后一个CommitLog文件实例MappedFile。锁住该MappedFile.默认为自旋锁,也可以通过useReetrantLockWhenPutMessage进行配置、修改和使用ReentrantLock
d:校验最后一个MappedFile,如果结果为空或已写满,则新创建一个MappedFile返回
e:调用MappedFile.appendMEssage()方法,将消息写入MappedFile
根据消息是单个消息还是批量消息来调用AppendMessageCallback.doAppend()方法,
并将消息写入PageCache,该方法的功能包含以下几点:
1.查找即将写入的消息物理Offset
2.事务消息单独处理。这里主要处理Prepared类型和Rollback类型的消息,设置消息queueOffset为0
3.序列化消息,并将序列化结果保存到ByteBuffer中(文件内存映射的PageCache或Direct Memory,简称DM).特别地,如果将输盘设置为异步刷盘,那么当transientStorePoolEnable=true时,会先写入DM,
DM中地数据再异步写入文件内存映射地PageCache中,因为消费者始终时从PageCache中读取消息消费的,所以这个机制也称为"读写分离"
4.更新消息所在Queue的位点
以上代码中,CommitLog.this.TopicQueueTable类型是HashMap<String/* topic-queueid /, Long/ offset */>,
CommitLog.this.TopicQueueTable的key是Topic名字与消息所在的Queue的QueueId的构成,value是消息位点值
在消息存储完成后,会处理刷盘逻辑和主从同步逻辑,分别调用(有些版本是handleDiskFlush()方法和handleHA()方法)
CommitLog.submitFlushRequest()和submitReplicaRequest()
在Broker处理发送消息时,由于处理器SendMessageProcessor本身是一个线程池服务,所以涉及了快速失败逻辑,方便在高峰时自我保护。实现代码在BrokerFastFailure.cleanExpiredRequest()方法中在BrokerController启动BrokerFailure服务时,会启动一个定时任务处理快速失败的的异常
从以上代码可以看到,每间隔10ms会执行一次cleanExpiredRequest()方法,清理一些非法过期的请求。
第一种,系统繁忙时发送消息请求快速失败处理。
当操作系统PageCache繁忙时,会将发送消息请求从发送消息请求线程池工作队列中取出来,直接返回SYSTEM_BUSY。如果此种情况持续发生说明系统已经不堪重负,需要增加系统资源或者扩容来减轻当前Broker的压力
第二种,发送请求超时处理
第三种,拉取消息请求超时处理
第二种和第三种的代码逻辑与第一种代码逻辑的处理类似,如果出现了,说明请求在线程池的工作队列中排队时间超过预期配置的时间,那么增加排队等待时间即可。如果请求持续超时,说明系统可能达到瓶颈,那么需要增加系统资源或者扩容