RocketMQ系统性学习-RocketMQ原理分析之NameServer 路由注册机制、生产者的发送消息流程

文章目录

      • NameServer 路由注册机制
      • 生产者的发送消息流程

NameServer 路由注册机制

在 Broker 启动时,通过 BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); 向 NameServer 中注册自己

那么 NameServer 中,注册 Broker 信息的入口在: DefaultRequestProcessor # processRequest

  • 判断请求码,如果是 Broker 注册,则进行注册 Broker 信息

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {if (ctx != null) {log.debug("receive request, {} {} {}",request.getCode(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),request);}switch (request.getCode()) {// ... 省略// 如果是 Broker 注册case RequestCode.REGISTER_BROKER:return this.registerBroker(ctx, request);// ... 省略}
    }
    

  • this.registerBroker 真正开始注册 Broker 信息

    在注册信息之前,会先使用 crc32 来检验消息的正确性(安全检查)

    在这里插入图片描述

    之后会调用 this.namesrvController.getRouteInfoManager().registerBroker() 来注册 Broker 的信息,这个 Broker 的信息是 BrokerController 启动时通过 Netty 发送过来的

    通过 getRouteInfoManager 获取 RouteInfoManager,在该类中注册 Broker 信息,那么 RouteInfoManager 肯定是管理了 Broker 的信息

    可以点进去 RouteInfoManager,可以发现其中管理了很多路由的信息

    在这里插入图片描述

    其中 brokerLiveTable 存储的是存活的 Broker 列表,那么可以查看该变量的引用链,来判断 Nameserver 在哪里进行心跳扫描

    在这里插入图片描述

    可以看到在 scanNotActiveBroker 方法中,会将 brokerLiveTable 中不活跃的 Broker 给剔除掉

    在这里插入图片描述

生产者的发送消息流程

下面会将整体的一个发送消息的流程图片先展示出来,再通过代码进行一步一步梳理:

在这里插入图片描述

既然要看生产者的发送消息流程,就先通过方法的调用作为入口,一步一步探究流程:

在这里插入图片描述

那么通过这个 send 方法点进去,入口为:DefaultMQProducer # send(Message msg) 方法,从该方法点击进入,调用链如下:

在这里插入图片描述

如果你在看源码的话,可以从上边的调用链一步一步点击,最后发送消息的逻辑就在 this.sendDefaultImpl 方法中展开

  1. 首先,会先根据 Topic 获取对应的路由信息,表示该 Topic 需要向哪个 MessageQueue 中进行发送,这个路由信息会先从本地缓存中取,如果没有取到,会向 NameServer 发送请求来获取 Topic 的路由信息
  2. 设置消息发送失败的 重试次数 ,同步情况下重试次数为预设次数 +1,异步情况下默认重试次数为 1
  3. 接下来就根据 重试次数 循环发送消息,为 Topic 选择要发送的队列 MessageQueue 进行消息发送

选择队列之后,就进入到发送消息的核心逻辑:this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

  1. 在该方法中,先通过队列 MessageQueue 找到对应的 brokerAddr
  2. 之后,会尝试对消息进行压缩
  3. 判断是否存在一些需要对消息进行 禁止发送前置拦截 的钩子函数,进行一些消息的拦截处理
  4. 判断通信模式:ASYNC、ONEWAY、SYNC,将消息以对应的方式发送出去,这里以同步 SYNC 为例

如果是同步的话,会通过 this.mQClientFactory.getMQClientAPIImpl().sendMessage() 方法将消息发送出去,接下来又是层层的调用,最后真正通过 Netty 将消息发送出去的地方在 NettyRemotingClient # invokeSync() 的方法中

在这个方法中,还会对消息进行前置拦截和后置拦截,为开发者的使用提供了很多的扩展点,在这里就 真正通过 Netty 将消息发送出去了

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/286498.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

功能点估算的常规流程

功能点估算流程在软件项目管理中起着重要的作用,它可以帮助项目团队更好地理解项目的需求和目标,从而提高项目的成功率和效率。如果功能点估算未按流程进行,可能会导致项目估算不准确,估算时间超出预期等问题。 因此功能点估算的常…

Authorized users only. All activities may be monitored and reported.

“Authorized users only. All activities may be monitored and reported.”是SSH隧道建立成功的提示,如图1所示。 图1 建立SSH隧道(成功提示) 如果SSH隧道建立失败,会在这个提示下显示“Permission denied”、“Connection clo…

微信私域管理工具如何帮助企业提升销售业绩?

现如今,微信已经从社交通讯软件,慢慢被默认为常规办公软件,工作沟通、业务洽谈、网络会议等都在微信上进行,完全变成职场首选的社交工具。 但受限于微信平台,许多公司在微信私域营销方面面临诸多挑战。 微信私域管理工…

【vSphere | PowerCLI】使用 PowerCLI 连接 vCenter 查看 VM 故障排错

这里写目录标题 1. 连接vCenter Server2. 客户机操作系统内提供网络配置信息3. 创建VM4. 迁移VM5.故障排错连接 VC 报错: Error: Invalid server certificate解决方法 参考资料 1. 连接vCenter Server PS C:\Users\Administrator> Connect-VIServer 192.168.1.1…

史上最细,老鸟软件测试-接口测试总结,看这篇就够了

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 后端接口都测试什…

工作纪实38-排查cpu彪高

昨天晚上上线了一个服务,第二天发现CPU持续飙高到70~90%,触发平台的自动扩容,后定位出问题后降低到3% 怀疑部分代码使用的线程在持续工作没有释放(死循环)进入机器,使用top -H 找出系统中使用C…

D36|背包问题

从图中我们可以看出背包问题主要涉及01背包、完全背包、多重背包和分组背包。 01背包问题 1.暴力解法是一个回溯问题 2.动态规划方法涉及二维dp数组和一维dp数组解法,二维dp数组是1维dp数组的基础 二维dp数组解法: 首先考虑动态规划五部曲 1&#…

鸿蒙-arkTs:访问控制授权申请

module.json5文件中 requestPermissions 进行配置(值为数组,可配置多个) ohos.permission.INTERNET {"name": "ohos.permission.INTERNET" }

解除Java反复编译的困扰方法,优化开发效率

在Java开发过程中,反复编译是一个常见的问题,特别是在大型项目或者需要频繁修改代码的情况下。每次修改代码后都需要重新编译整个项目,这样耗费了大量的时间和资源,降低了开发效率。为了解决这个问题,我们可以采取以下…

EPICS asynPortDriver使用示例

在文本中,将展示如何将EPICS asyn模块和其他库联用,从而实现对arm单板机上GPIO口的控制。 在本例中使用到的硬件是: 在程序中需要厂家提供的wringPi库,才能通过C语言库函数调用实现对其GPIO的控制。 以下是这个单板机GPIO的管脚…

手动搭建koa+ts项目框架(swagger文档篇)

文章目录 一、安装依赖二、直接使用json文件生成三、根据对应api注释生成新建swagger.ts文件新建./routes/users.ts文件入口文件引入对应数据如有启发,可点赞收藏哟~ 一、安装依赖 swagger-jsdoc 读取您的JSDoc带注释的源代码并生成OpenAPI (Swagger) 规范koa2-swa…

意大利语文章翻译成中文怎么收费?

随着全球化的步伐加快,不同语言之间的交流日益频繁,其中意大利语翻译成中文的需求逐渐增多。那么,意大利语文章翻译成中文该如何收费呢?又在哪里能找到专业的意大利语翻译呢? 翻译是将一种语言文字转化为另一种语言文字…