RocketMQ系统性学习-RocketMQ原理分析之Broker接收消息的处理流程

Broker接收消息的处理流程?

既然要分析 Broker 接收消息,那么如何找到 Broker 接收消息并进行处理的程序入口呢?

那么消息既然是从生产者开始发送,消息是有单条消息和批量消息之分的,那么消息肯定是有一个标识,当 Broker 接收到消息之后,肯定是需要通过判断消息的标识来区分单条消息和批量消息,那么只需要找到发送消息的标识,再全局搜索,就可以找到这个标识在哪里被处理,被处理的地方一定就是 Broker 接收消息处理的位置了!

那么还是先找到发送消息的位置:DefaultMQProducer # send(Message msg) ,通过层层调用(这里在生产者发送消息流程中讲了)到达了 DefaultMQProducerImpl # this.sendKernelImpl()

在这个方法中就调用到了 MQ 客户端的发送消息的方法 this.mQClientFactory.getMQClientAPIImpl().sendMessage()

在这里真正的通过 Netty 去发送消息到 Broker 中去:

  1. 通过判断消息的类型构造一个 RemotiongCommand 类型的 request 参数

    这里有 4 个构造 request 参数的方法,如下图会走到第三个方法中,那么这里的请求标识为 RequestCode.SEND_MESSAGE_V2

    在这里插入图片描述

  2. this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request) 方法中通过 Netty 将消息发送出去,那么这个方法需要传入一个 request 参数

在上边构造了 request 并且通过 Netty 发送出去,request 的标识为 RequestCode.SEND_MESSAGE_V2 ,那么我们只需要找到处理该标识的 request 的位置,那就是 Broker 处理消息的位置,在 IDEA 中通过 Ctrl+Shift+F 全局搜索这个标识即可:

在这里插入图片描述

可以发现有三个进行 case 判断的地方:

  • 第一个在 PlainAccessResource 类中
  • 第二个在 SendMessageActivity 类中
  • 第三个在 SendMessageRequestHeader 类中

这里第三个 case 判断的地方就是 Broker 处理消息的位置(可以在三个 case 中都 debug,看断点走到哪里就知道了)

那么我们就在第三个 case 判断的位置打上断点

在这里插入图片描述

接下来启动 NameServer,再以 Debug 的方式启动 Broker,再启动生产者,根据调用堆栈信息来找到 Broker 处理消息的整个调用链:

在这里插入图片描述

根据这个堆栈信息,可以发现,调用链是从 NettyServerHandler 的 channelRead0 转移过来的,那么也就是再 NettyServerHandler 这个 Netty 的服务端接收到消息并进行处理,那么我们就在这个堆栈信息中找 Broker 是在哪里对消息进行处理了呢?

就是在 SendMessageProcessor # processRequest 方法中(也就是堆栈顶第3个方法),在这个方法中:

  1. 通过 parseRequestHeader(request) 先对请求头进行解码,也就是根据请求头 RequestCode.SEND_MESSAGE_V2 的类型做一些相应的处理
  2. 接下来通过 buildMsgContext(ctx, requestHeader, request) 创建消息的上下文对象
  3. this.executeSendMessageHookBefore(sendMessageContext) 执行一些消息发送前的钩子(扩展点)
  4. 核心:this.sendMessage() 真正去发送消息

那么在 this.sendMessage() 中就是真正发送消息的逻辑了:

  1. 首先是 preSend(ctx, request, requestHeader) 进行预发送,这里其实就是对发送的消息进行一些检查(Topic 是否合法?Topic 是否与系统默认 Topic 冲突?Topic 的一些配置是否存在?等等信息)

  2. 如果 queueIdInt < 0 是 true 的话,表明生产者没有指定要发送到哪个队列,那么就通过 99999999 % 队列个数 来选择一个队列发送

  3. 将超过最大重试次数的消息发送到 DLQ 死信队列中去

    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig, oriProps)) {return response;
    }
    
  4. 接下来判断 Broker 是否开启了 异步模式,如果开启的话,通过 asyncPutMessage() 处理

    如果没有开启 异步模式,通过 putMessage() 处理,这里其实还是调用了 asyncPutMessage(),只不过通过 get() 阻塞等待结果(复用代码)

那么在发送消息的时候,无论是否异步,都会进入到 DefaultMessageStore # asyncPutMessage() 方法中,我们就点进去看看进行了哪些处理:

  1. 执行一些钩子函数,作为扩展点:putMessageHook.executeBeforePutMessage(msg)

  2. 提交文件的写请求:CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg)

    在这个写文件的方法中,主要做一些文件的写操作,以及将文件写入到磁盘中

    1. 获取文件对象:this.mappedFileQueue.getLastMappedFile()
    2. 追加写文件的操作: mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext)
    3. 最后进行刷盘以及高可用的一些处理:handleDiskFlushAndHA(putMessageResult, msg, needAckNums, needHandleHA)
  3. 打印写文件消耗的时间 this.getSystemClock().now() - beginTime

那么 Broker 总体的接收消息的处理流程就是上边将的这么多了,当然还有一些边边角角的内容没有细说,先了解整体的处理流程,不要提前去学习太多的细节!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/288935.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LEFT JOIN

通過中間表説明 biz_email_sent table1 biz_email table2 biz_email_sent_address 中間表 LEFT JOIN 是 JOIN 左邊的記錄(biz_email_sent id52)全部查出&#xff0c;比如52 的記錄全部查出。 即使中間表se.sa_email_id 在 table2中找不到&#xff0c…

管理类联考——数学——真题篇——按知识分类——代数——数列

【等差数列 ⟹ \Longrightarrow ⟹ 通项公式&#xff1a; a n a 1 ( n − 1 ) d a m ( n − m ) d n d a 1 − d A n B a_n a_1(n-1)d a_m(n-m)dnda_1-dAnB an​a1​(n−1)dam​(n−m)dnda1​−dAnB ⟹ \Longrightarrow ⟹ A d &#xff0c; B a 1 − d Ad&#x…

MySQL的增删改查(进阶)--下

3. 新增 插入查询结果 在一张表中插入另一张表的查询结果 语法为&#xff1a; INSERT INTO table_name [(column [, column ...])] SELECT ...该语句是组合技&#xff1a;把插入语句和查询语句结合到一起了—以查询结果&#xff0c;来作为插入的值。即把表一查询出来的结果集合…

MobileNet-V2实现遥感土地利用图像识别

今天我们分享MobileNet V2实现遥感影像土地利用的图像分类。 数据集 本次使用的数据集是UC Merced Land-Use Dataset。UC Merced Land-Use Dataset 是一个用于研究的 21 级土地利用图像遥感数据集&#xff0c;均提取自 USGS National Map Urban Area Imagery&#xff08;美国地…

bp神经网络对csv文件或者xlsx文件进行数据预测

1.input(1:m,:)‘含义 矩阵A第一列的转置矩阵。(x,y)表示二维矩阵第x行第y列位置的元素&#xff0c;x为:则表示所有的行。因此&#xff0c;A(:,1)就表示A的第1列的所有元素&#xff0c;这是一个列向量。 所以这里input(1:m,:)表示1到m行&#xff0c;所有列&#xff0c;而后面…

玩转 Scrapy 框架 (一):Scrapy 框架介绍及使用入门

目录 一、Scrapy 框架介绍二、Scrapy 入门 一、Scrapy 框架介绍 简介&#xff1a; Scrapy 是一个基于 Python 开发的爬虫框架&#xff0c;可以说它是当前 Python 爬虫生态中最流行的爬虫框架&#xff0c;该框架提供了非常多爬虫的相关组件&#xff0c;架构清晰&#xff0c;可扩…

轮滑加盟培训机构管理系统源码开发方案

一、项目背景与目标 &#xff08;一&#xff09;项目背景 随着轮滑运动的普及和市场需求的增加&#xff0c;轮滑加盟培训机构逐渐兴起。这些机构面临着学员管理、课程排班、教师管理等多方面的挑战。为了提高管理效率和服务质量&#xff0c;需要开发一套专门针对轮滑加盟培训…

Java经典面试题:冒泡算法的使用

Hi i,m JinXiang ⭐ 前言 ⭐ 本篇文章主要介绍Java经典面试题&#xff1a;冒泡算法的使用以及部分理论知识 &#x1f349;欢迎点赞 &#x1f44d; 收藏 ⭐留言评论 &#x1f4dd;私信必回哟&#x1f601; &#x1f349;博主收将持续更新学习记录获&#xff0c;友友们有任何问题…

Jenkins 执行远程脚本的插件—SSH2 Easy

SSH2 Easy 是什么&#xff1f; SSH2 Easy 是一个 Jenkins 插件&#xff0c;它用于在 Jenkins 构建过程中通过 SSH2 协议与远程服务器进行交互。通过该插件&#xff0c;用户可以在 Jenkins 的构建过程中执行远程命令、上传或下载文件、管理远程服务器等操作。 以下是 SSH2 Eas…

程序员的20大Git面试问题及答案

文章目录 1.什么是Git&#xff1f;2.Git 工作流程3.在 Git 中提交的命令是什么&#xff1f;4.什么是 Git 中的“裸存储库”&#xff1f;5.Git 是用什么语言编写的&#xff1f;6.在Git中&#xff0c;你如何还原已经 push 并公开的提交&#xff1f;7.git pull 和 git fetch 有什么…

循环栅栏:CyclicBarrier

CyclicBarrier可以理解为循环栅栏&#xff0c;栅栏就是一种障碍物&#xff0c; 比如通常在私人宅院的周围就可以围上一圈栅栏&#xff0c;阻止闲杂人等入内。 这里当然就是用来阻止线程继续执行&#xff0c;要求线程在栅栏外等待。 前面的Cyclic意为循环&#xff0c;也就是说这…

模拟信号和数字信号的区别

模拟和数字信号是携带信息的信号类型。两种信号之间的主要区别在于模拟信号具有连续电信号&#xff0c;而数字信号具有非连续电信号。 模拟信号和数字信号之间的差异可以通过不同类型波的例子来观察。 什么是模拟信号(Analog Signals)&#xff1f; 许多系统使用模拟信号来传输…