消息中间件之RocketMQ源码分析(十三)

Broker消息存储机制

RocketMQ首先将消息数据写入操作系统PageCache,然后定时将数据刷入磁盘。接下来主要分析RocketMQ是如何接收发送消息请求并将消息写入PageCache的,整个过程如图
在这里插入图片描述

Commit目录下有多个CommitLog文件,其实CommitLog只有一个文件,
为了方便保存和读写,被切分为多个子文件,所有的子文件通过其保存的
第一个和最后一个消息的物理位点进行连接。Broker按照时间和物理的offset顺序写CommitLog文件,每次写的时候需要加锁
在这里插入图片描述

1.Broker接收客户端发送消息的请求并做预处理

SendMessageProcessor.processRequest()方法会自动被调用接收、解析客户端请求为消息实例。
该方法执行分为四个过程:解析请求参数、执行发送处理前的Hook、调用保存方法存储消息、执行发送处理后的Hook
随着RocketMQ版本的迭代更新,通信层的协议也出现了不兼容的变化,比如解析请求需要根据不同的客户端请求协议版本做不同处理
在这里插入图片描述
在这里插入图片描述

2.Broker存储前预处理消息

预处理方法为SendMessageProcessor.sendMessage()
Netty是异步执行的,也就是说,请求发送到Broker被处理后,返回结果时,在客户端的处理线程已经不再时发送亲贵的线程,那么客户端如何确定返回结果对应哪个请求呢?很简单,我们可以通过返回标志来判断。
其次,做一系列存储前发送请求的数据检查,比如死信消息处理、Broker是否拒绝事务消息处理、消息基本检查等。消息基本检查方法为AbstractSendMessageProcessor.msgCheck():该方法的主要功能如下:
a.校验Broker是否配置可写
b.校验Topic名字是否为默认值
c.校验Topic配置是否存在
d.校验queueId与读写队列数是否匹配
e.校验Broker是否支持事务消息(msgCheck之后进行的校验)
在这里插入图片描述
在这里插入图片描述

3.执行DefaultMessageStore.putMessage()方法进行消息校验和存储模块检查

在真正保存消息前,会对消息数据做基本检查、对存储服务做可用性检查、对Broker做是否Slave的检查等
总结如下:
a.校验存储模块是否已经关闭
b.校验Broker是否是Slave
c.校验存储模块运行标记
d.校验Topic长度
e.校验扩展信息的长度
f.校验操作系统Page Cache是否繁忙
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
begin:CommitLog加锁开始时间,写CommitLog成功后,该值为0
diff:当前时间和CommitLog持有锁时间的差值
如果isOSPageCacheBusy()方法返回true,则表示当前有消息正在写入CommitLog,并且持有锁的时间超过设置的阈值

4.执行CommitLog.putMessage()方法,后面版本中将默认异步保存

存储消息的核心处理过程如下:
a.设置消息保存时间为当前时间戳,设置消息完整性校验码CRC(循环冗余码)
b.延迟消息处理.如果发送的消息是延迟消息,这里会单独设置延迟消息的
数据字段,比如修改Topic为延迟消息特有的Topic–SCHEDULE_TOPIC_XXX,并且备份原来的Topic和queueId,以便延迟消息在投递后被消费者消费
c.获取最后一个CommitLog文件实例MappedFile。锁住该MappedFile.默认为自旋锁,也可以通过useReetrantLockWhenPutMessage进行配置、修改和使用ReentrantLock
d:校验最后一个MappedFile,如果结果为空或已写满,则新创建一个MappedFile返回
e:调用MappedFile.appendMEssage()方法,将消息写入MappedFile
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
根据消息是单个消息还是批量消息来调用AppendMessageCallback.doAppend()方法,
并将消息写入PageCache,该方法的功能包含以下几点:
1.查找即将写入的消息物理Offset
2.事务消息单独处理。这里主要处理Prepared类型和Rollback类型的消息,设置消息queueOffset为0
3.序列化消息,并将序列化结果保存到ByteBuffer中(文件内存映射的PageCache或Direct Memory,简称DM).特别地,如果将输盘设置为异步刷盘,那么当transientStorePoolEnable=true时,会先写入DM,
DM中地数据再异步写入文件内存映射地PageCache中,因为消费者始终时从PageCache中读取消息消费的,所以这个机制也称为"读写分离"
4.更新消息所在Queue的位点
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
以上代码中,CommitLog.this.TopicQueueTable类型是HashMap<String/* topic-queueid /, Long/ offset */>,
CommitLog.this.TopicQueueTable的key是Topic名字与消息所在的Queue的QueueId的构成,value是消息位点值
在这里插入图片描述
在消息存储完成后,会处理刷盘逻辑和主从同步逻辑,分别调用(有些版本是handleDiskFlush()方法和handleHA()方法)
CommitLog.submitFlushRequest()和submitReplicaRequest()
在Broker处理发送消息时,由于处理器SendMessageProcessor本身是一个线程池服务,所以涉及了快速失败逻辑,方便在高峰时自我保护。实现代码在BrokerFastFailure.cleanExpiredRequest()方法中在BrokerController启动BrokerFailure服务时,会启动一个定时任务处理快速失败的的异常
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
从以上代码可以看到,每间隔10ms会执行一次cleanExpiredRequest()方法,清理一些非法过期的请求。
第一种,系统繁忙时发送消息请求快速失败处理。
当操作系统PageCache繁忙时,会将发送消息请求从发送消息请求线程池工作队列中取出来,直接返回SYSTEM_BUSY。如果此种情况持续发生说明系统已经不堪重负,需要增加系统资源或者扩容来减轻当前Broker的压力
第二种,发送请求超时处理
第三种,拉取消息请求超时处理
第二种和第三种的代码逻辑与第一种代码逻辑的处理类似,如果出现了,说明请求在线程池的工作队列中排队时间超过预期配置的时间,那么增加排队等待时间即可。如果请求持续超时,说明系统可能达到瓶颈,那么需要增加系统资源或者扩容
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/484661.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

并发编程(2)基础篇-管程

4 共享模型之管程 本章内容 共享问题synchronized线程安全分析Monitorwait/notify线程状态转换活跃性Lock 4.1 共享带来的问题 4.1.1 小故事 老王&#xff08;操作系统&#xff09;有一个功能强大的算盘&#xff08;CPU&#xff09;&#xff0c;现在想把它租出去&#xff…

01 Linux简介

Linux背景 发展史 linux从哪来的&#xff1f;怎么发展的&#xff1f;得从UNIX说起 1968年&#xff0c;一些来自通用电气公司、贝尔实验室和麻省理工学院的研究人员开发了一个名叫Multics的特殊操作系统。Multics在多任务文件管理和用户连接中综合了许多新概念1969-1970年&am…

【GPTs分享】每日GPTs分享之Canva

简介 Canva&#xff0c;旨在帮助用户通过Canva的用户友好设计平台释放用户的创造力。无论用户是想设计海报、社交媒体帖子还是商业名片&#xff0c;Canva都在这里协助用户将创意转化为现实。 主要功能 设计生成&#xff1a;根据用户的描述和创意需求&#xff0c;生成定制的设…

基于springboot+vue的教学资源库系统(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

超酷的可视化python库Altair

Altair是基于Vega-Lite的Python下的声明式统计可视化库。Altair是一个 Python统计可视化库。与Matplotlib 和Seaborn相比&#xff0c;Altair 更注重统计特征。Altair凭借其强大而简洁的可视化语法&#xff0c;可帮助你快速构建各种可视化效果。 Altair源码&#xff1a; https:/…

智能分析网关V4助力打造“AI+视频监管”明厨亮灶智能监管平台

一、背景分析 随着人们对食品安全和卫生的关注度不断提高&#xff0c;餐饮业的后厨卫生问题成为了社会热点。餐饮业作为人们日常生活中的重要组成部分&#xff0c;其后厨卫生状况直接关系到消费者的健康。由于生产流程复杂&#xff0c;传统的监管方式往往难以做到全面覆盖&…

Java Z 垃圾收集器 (ZGC):彻底改变内存管理

欢迎来到百战百胜&#xff01;我们致力于为广大IT从业者、学生和爱好者提供全面、实用的资源和服务。加入我们的聊天群&#xff0c;这里有专业大佬为你提供有价值的建议和指导&#xff01; 微信搜索&#xff1a;IT开DD那点小事 更多访问&#xff1a;www.besthub.tech Z 垃圾收集…

Leetcode155(设计最小栈)

例题&#xff1a; 分析&#xff1a; 题目要求我们必须在常数时间内检索到最小元素。 我们可以使用两个栈&#xff08;A、B&#xff09;来实现&#xff0c;A栈用来正常存储数据、弹出数据&#xff0c; B栈用于存储A栈中的最小元素&#xff0c;如下图&#xff1a; 刚开始&#…

为什么做测试既要懂开发又要懂产品?这3点看完,你就懂了!

本篇讨论的是什么呢&#xff1f;何谓一个真正的测试&#xff1f; 纯粹是个人的理解&#xff0c;仅供参考。 ● 论一个真正的软件测试工程师 ● 自动化在项目中的应用 ● 性能专项在项目中的应用 半个产品、半个开发 有人觉得这个标题有点讽刺&#xff0c;真正的测试&…

Socket通信---Python发送数据给C++程序

0. Problems 很多时候实现某种功能&#xff0c;需要在不同进程间发送数据&#xff0c;目前有几种主流的方法&#xff0c;如 让python和C/C程序互相发送数据&#xff0c;其实有几种方法&#xff1a; 共享内存共享文件Socket通信 在这里只提供Socket通信的例程&#xff0c;共享…

【JavaScript】如何自定义事件并触发

前言 有些教程中说使用 Event.initEvent() 创建事件&#xff0c;但是此方法已弃用&#xff0c;所以下文使用new Event()。 mdn官方文档中&#xff0c;明确说明Event.initEvent()弃用。 建议使用Event()构造函数&#xff0c;这里以chrome为例&#xff0c;兼容请很强&#xff0c;…

[Git] 配置Access Token 解决Github 认证弹窗

[Git] 配置Access Token 解决Github 认证弹窗 1. 前言2. 解决2.1 申请Personal Access Token2.2. 配置Token2.3. 授权激活Token 博主热门文章推荐&#xff1a; 1. 前言 最近从bitbucket切换到了Github Enterprise, 刚使用几次发现 每次操作 都有弹窗认证&#xff0c; 虽然手动点…