RocketMQ 5.x broker注册到Nameserve源码分析

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

RocketMQ版本

  • 5.1.0

背景

入口

这里源码入口我们就从broker启动开始查看吧,然后慢慢到NameServer

由于不知道具体代码在哪,所以我们就漫无目的的找找看吧

想了下算了还是直接搜索registerBroker试试

我们很快在BrokerControllerstart()方法找到了

这里是有区分brokerProxy是否隔离,然后执行不同的方法
但是核心还是registerBrokerAll方法

实际我们通过查看registerBrokerAll方法的时候发现,如果执行了topic相关的更新操作,也会触发重新注册broker,这里也正常,因为要更新NameServer的路由元数据

实际在执行完方法

this.registerBrokerAll(true, false, true);

下面又马上启动了一个定时任务用于注册brokerNameServer

默认30s执行一次

可配置最大时间为60s一次

registerBrokerAll


public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {// 组装Topic配置 即我们的topics.jsonTopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion());topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));// 组装完成// 检查broker的权限如果不拥有可读、可写权限if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {// 将所有topic的权限替换为broker的权限TopicConfig tmp =new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),topicConfig.getPerm() & this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag());topicConfigTable.put(topicConfig.getTopicName(), tmp);}topicConfigWrapper.setTopicConfigTable(topicConfigTable);}// 强制注册或者需要注册if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.brokerConfig.getRegisterBrokerTimeoutMills(),this.brokerConfig.isInBrokerContainer())) {// 实际的注册逻辑doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);}}

上面的代码基本上有注册了,但是我们还是整理一下逻辑

  1. 获取topics.json的配置文件的topic信息组装成要发送到Nameserve的TopicConfigAndMappingSerializeWrapper
  2. 如果broker的权限没有可读可写,就将topic的所有权限设置为broker的权限,但是这里不会去更新topics.json配置文件
  3. 判断broker是否需要注册
  4. 注册

是否需要注册broker

实际的逻辑是在

List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills, isInBrokerContainer);

我们进去看看

可以看到就是将broker的自身一些信息发送到NameServer查询是否需要注册

我们通过请求状态码QUERY_DATA_VERSION看看NameServer的处理逻辑

  • org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#queryBrokerTopicConfig

这里主要是判断brokertopic信息是否发生了变化,如果发生了则返回true

可以看到这里的注册肯定是针对后续的一些更新,我们第一次启动注册肯定是强制注册不执行这里的逻辑

false主要还是topic更新相关的请求,回去对比是否需要重新注册。

现在我们还是回到主流程,看看注册的处理逻辑

注册doRegisterBrokerAll

  • org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll

实际的逻辑被封装在BrokerOuterAPIregisterBrokerAll方法

这里的代码虽然看着很长,但是实际代码逻辑很简单
主要是组装一个RegisterBrokerRequestHeader对象,然后发送到NameServer,其中还做了一个crc32数据校验

这里还有一个编码技巧,使用了CountDownLatch并发的向多个NameServer注册,提升性能

我们进入

RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);

方法看看

里面是很标准的网络请求代码我们直接通过状态码

public static final int REGISTER_BROKER = 103;

查看NameServer那边的处理逻辑

NameServer如何处理broker的注册请求

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#registerBroker

注册代码看着挺长的,我们重点分析下

  1. crc32数据校验
  2. V3_0_11之前的版本做兼容处理
  3. 核心注册方法封装在org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker(java.lang.String, java.lang.String, java.lang.String, long, java.lang.String, java.lang.String, java.lang.Long, java.lang.Boolean, org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper, java.util.List<java.lang.String>, io.netty.channel.Channel) 里面


4. 判断是否允许读取kv配置中顺序消息topic配置

可以看到核心逻辑在3,所以我们进入到这个方法看看

代码有点长,我们来慢慢分析

public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final String zoneName,final Long timeoutMillis,final Boolean enableActingMaster,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {//加锁this.lock.writeLock().lockInterruptibly();//init or update the cluster infoSet<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>());brokerNames.add(brokerName);// 默认不是第一次注册boolean registerFirst = false;// 获取broker信息如果为空则表示为第一次注册BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {registerFirst = true;// 组装broker信息,放入brokerAddrTable中brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());this.brokerAddrTable.put(brokerName, brokerData);}boolean isOldVersionBroker = enableActingMaster == null;brokerData.setEnableActingMaster(!isOldVersionBroker && enableActingMaster);brokerData.setZoneName(zoneName);Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();boolean isMinBrokerIdChanged = false;long prevMinBrokerId = 0;if (!brokerAddrsMap.isEmpty()) {prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());}if (brokerId < prevMinBrokerId) {isMinBrokerIdChanged = true;}// 如果ip 端口相同但是 brokerId不同则删除重复的//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>//The same IP:PORT must only have one record in brokerAddrTablebrokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey());//If Local brokerId stateVersion bigger than the registering one,String oldBrokerAddr = brokerAddrsMap.get(brokerId);if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) {BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new BrokerAddrInfo(clusterName, oldBrokerAddr));if (null != oldBrokerInfo) {long oldStateVersion = oldBrokerInfo.getDataVersion().getStateVersion();long newStateVersion = topicConfigWrapper.getDataVersion().getStateVersion();if (oldStateVersion > newStateVersion) {log.warn("Registered Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +"Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);//Remove the rejected brokerAddr from brokerLiveTable.brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr));return result;}}}if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) {log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.",topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);return null;}String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));// 如果brokerId为0则为masterboolean isMaster = MixAll.MASTER_ID == brokerId;boolean isPrimeSlave = !isOldVersionBroker && !isMaster&& brokerId == Collections.min(brokerAddrsMap.keySet());// 如果 topics.config不为空并且为master,后面这个isPrimeSlave不知道是干嘛的if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {ConcurrentMap<String, TopicConfig> tcTable =topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,topicConfigWrapper.getDataVersion(), brokerName,entry.getValue().getTopicName())) {final TopicConfig topicConfig = entry.getValue();if (isPrimeSlave) {// Wipe write perm for prime slavetopicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));}// 创建更新实际的消费queuethis.createAndUpdateQueueData(brokerName, topicConfig);}}}if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();//the topicQueueMappingInfoMap should never be null, but can be emptyfor (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());}//Note asset brokerName equal entry.getValue().getBname()//here use the mappingDetail.bnametopicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());}}}// 构建broker信息放入brokerLiveTableBrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,new BrokerLiveInfo(System.currentTimeMillis(),timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);}//filterServerList没用过if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddrInfo);} else {this.filterServerTable.put(brokerAddrInfo, filterServerList);}}if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerAddrInfo masterAddrInfo = new BrokerAddrInfo(clusterName, masterAddr);BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddrInfo);if (masterLiveInfo != null) {result.setHaServerAddr(masterLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {notifyMinBrokerIdChanged(brokerAddrsMap, null,this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());}} catch (Exception e) {log.error("registerBroker Exception", e);} finally {this.lock.writeLock().unlock();}return result;}

首先我们看看第一次注册的参数

然后可以看到整体的代码虽然很长,实际的逻辑还是比较简单的

  1. 组装broker信息,放入brokerAddrTable中
  2. 创建或者更新queueData数据,也就是Map<String/* topic */, Map<String, QueueData>> topicQueueTable
  3. 更新Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable
  4. 更新broker的存活信息,即Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

可以看到主要是还是在Nameserve对broker的一些元数据做维护,比如brokertopic信息、queue信息、broker的存活信息

总结

总的来说就是broker启动后会向所有的Nameserver注册自己的相关元数据信息,然后定时发送心跳。如果执行修改topic相关的信息,也会同时更新broker和`Nameserver·上面的元数据信息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/57672.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IntelliJ IDEA 2021/2022关闭双击shift全局搜索

IDEA左上角 File-->Settings 找到Navigate -->Search Everywhere &#xff0c;右键添加快捷键。 OK --> Apply应用

Flutter 让软键盘不再自动弹起

1、问题说明&#xff1a; 在开发中&#xff0c;经常遇到这种事&#xff0c;一个页面有输入框&#xff0c;点击输入框后&#xff0c;会弹起软键盘&#xff0c;同时输入框会聚焦&#xff0c;手动收起软键盘后&#xff0c;点击另一个按钮前往下一个页面或者显示一个弹窗&#xff0…

力扣62.不同路径(动态规划)

/*** 一个机器人位于一个 m x n 网格的左上角 &#xff08;起始点在下图中标记为 “Start” &#xff09;。* 机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角&#xff08;在下图中标记为 “Finish” &#xff09;。* 问总共有多少条不同的路径&#xff1f; *…

十分钟掌握使用 SolidJS 构建全栈 CRUD 应用程序

我们可以开始讨论 SolidJS&#xff0c;说它比React更好&#xff0c;但没有必要做这种比较。SolidJS只是众多前端框架之一&#xff0c;旨在在Web上快速创建数据驱动。那么&#xff0c;我们为什么要突出这个新孩子呢&#xff1f; 首先&#xff0c;我们不能忽视SolidJS不使用虚拟…

GWJDN-400型2MHZ自动平衡高温介电温谱仪

GWJDN-400型2MHZ自动平衡高温介电温谱仪 GWJDN-400型2MHZ自动平衡高温介电温谱仪 关键词&#xff1a;介电常数&#xff0c;高温介电&#xff0c;自动平衡 主要功能&#xff1a; 材料介电常数测试仪 半导体材料的介电常数、导电率和C-V特性液晶材料:液晶单元的介电常数、弹性…

24届近5年同济大学自动化考研院校分析

今天给大家带来的是同济大学控制考研分析 满满干货&#xff5e;还不快快点赞收藏 一、同济大学 学校简介 同济大学历史悠久、声誉卓著&#xff0c;是中国最早的国立大学之一&#xff0c;是教育部直属并与上海市共建的全国重点大学。经过115年的发展&#xff0c;同济大学已经…

128.【Maven】

Maven仓库 (一)、Maven 简介1.传统项目管理的缺点2.Maven是什么3.Maven的作用 (二)、Maven 的下载与安装1.下载与认识目录2.配置Maven的全局环境 (三)、Maven 的基础概念1.Maven 仓库(1).仓库分类 2. Maven 坐标3.Maven 本地仓库配置(1).改变默认的仓库地址(2).改变远程仓库地址…

怎么合并多个视频?简单视频合并方法分享

合并多个视频可以将它们组合成一个更长的视频&#xff0c;这对于需要播放多个短视频的情况非常有用。此外&#xff0c;合并视频还可以使视频编辑过程更加高效&#xff0c;因为不必将多个独立的视频文件分别处理。最后&#xff0c;合并视频可以减少文件数量&#xff0c;从而使整…

python-opencv对极几何 StereoRectify

OpenCV如何正确使用stereoRectify函数 函数介绍 用于双目相机的立体校正环节中&#xff0c;这里只谈谈这个函数怎么使用&#xff0c;参数具体指哪些函数参数 随便去网上一搜或者看官方手册就能得到参数信息&#xff0c;但是&#xff01;&#xff01;相对关系非常容易出错&…

【SQL应知应会】表分区(四)• Oracle版

欢迎来到爱书不爱输的程序猿的博客, 本博客致力于知识分享&#xff0c;与更多的人进行学习交流 本文收录于SQL应知应会专栏,本专栏主要用于记录对于数据库的一些学习&#xff0c;有基础也有进阶&#xff0c;有MySQL也有Oracle 分区表 • Oracle版 前言一、分区表1.什么是表分区…

Web安全——Burp Suite基础上

Burp Suite基础 一、Burp Suite安装和环境配置如何命令行启动Burp Suite 二、Burp Suite代理和浏览器设置FireFox设置 三、如何使用Burp Suite代理1、Burp Proxy基本使用2、数据拦截与控制3、可选项配置Options客户端请求消息拦截服务器端返回消息拦截服务器返回消息修改正则表…

【Windbg】通过网络调试windows内核

环境 windows版本&#xff1a;win10_x64 1901 windbg版本&#xff1a;1.2306.12001.0 HOST 1、windbg软件设置。 点击菜单文件&#xff0c;然后如下图操作。 2、等待连接。 ************* Waiting for Debugger Extensions Gallery to Initialize **************>>&…