这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
RocketMQ版本
- 5.1.0
背景
入口
这里源码入口我们就从broker
启动开始查看吧,然后慢慢到NameServer
由于不知道具体代码在哪,所以我们就漫无目的的找找看吧
想了下算了还是直接搜索registerBroker
试试
我们很快在BrokerController
的start()
方法找到了
这里是有区分broker
和Proxy
是否隔离,然后执行不同的方法
但是核心还是registerBrokerAll
方法
实际我们通过查看registerBrokerAll
方法的时候发现,如果执行了topic
相关的更新操作,也会触发重新注册broker
,这里也正常,因为要更新NameServer
的路由元数据
实际在执行完方法
this.registerBrokerAll(true, false, true);
下面又马上启动了一个定时任务用于注册broker
到NameServer
默认30s执行一次
可配置最大时间为60s一次
registerBrokerAll
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {// 组装Topic配置 即我们的topics.jsonTopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion());topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));// 组装完成// 检查broker的权限如果不拥有可读、可写权限if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {// 将所有topic的权限替换为broker的权限TopicConfig tmp =new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),topicConfig.getPerm() & this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag());topicConfigTable.put(topicConfig.getTopicName(), tmp);}topicConfigWrapper.setTopicConfigTable(topicConfigTable);}// 强制注册或者需要注册if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.brokerConfig.getRegisterBrokerTimeoutMills(),this.brokerConfig.isInBrokerContainer())) {// 实际的注册逻辑doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);}}
上面的代码基本上有注册了,但是我们还是整理一下逻辑
- 获取
topics.json
的配置文件的topic信息组装成要发送到Nameserve的TopicConfigAndMappingSerializeWrapper
类 - 如果
broker
的权限没有可读可写,就将topic
的所有权限设置为broker
的权限,但是这里不会去更新topics.json
配置文件 - 判断
broker
是否需要注册 - 注册
是否需要注册broker
实际的逻辑是在
List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills, isInBrokerContainer);
我们进去看看
可以看到就是将broker
的自身一些信息发送到NameServer
查询是否需要注册
我们通过请求状态码QUERY_DATA_VERSION
看看NameServer
的处理逻辑
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#queryBrokerTopicConfig
这里主要是判断broker
的topic
信息是否发生了变化,如果发生了则返回true
可以看到这里的注册肯定是针对后续的一些更新,我们第一次启动注册肯定是强制注册不执行这里的逻辑
为false
主要还是topic
更新相关的请求,回去对比是否需要重新注册。
现在我们还是回到主流程,看看注册的处理逻辑
注册doRegisterBrokerAll
org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll
实际的逻辑被封装在BrokerOuterAPI
的registerBrokerAll
方法
这里的代码虽然看着很长,但是实际代码逻辑很简单
主要是组装一个RegisterBrokerRequestHeader
对象,然后发送到NameServer
,其中还做了一个crc32
数据校验
这里还有一个编码技巧,使用了CountDownLatch
并发的向多个NameServer
注册,提升性能
我们进入
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
方法看看
里面是很标准的网络请求代码我们直接通过状态码
public static final int REGISTER_BROKER = 103;
查看NameServer
那边的处理逻辑
NameServer如何处理broker的注册请求
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#registerBroker
注册代码看着挺长的,我们重点分析下
- crc32数据校验
- 对
V3_0_11
之前的版本做兼容处理 - 核心注册方法封装在
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker(java.lang.String, java.lang.String, java.lang.String, long, java.lang.String, java.lang.String, java.lang.Long, java.lang.Boolean, org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper, java.util.List<java.lang.String>, io.netty.channel.Channel)
里面
4. 判断是否允许读取kv配置中顺序消息topic配置
可以看到核心逻辑在3,所以我们进入到这个方法看看
代码有点长,我们来慢慢分析
public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final String zoneName,final Long timeoutMillis,final Boolean enableActingMaster,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {//加锁this.lock.writeLock().lockInterruptibly();//init or update the cluster infoSet<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>());brokerNames.add(brokerName);// 默认不是第一次注册boolean registerFirst = false;// 获取broker信息如果为空则表示为第一次注册BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {registerFirst = true;// 组装broker信息,放入brokerAddrTable中brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());this.brokerAddrTable.put(brokerName, brokerData);}boolean isOldVersionBroker = enableActingMaster == null;brokerData.setEnableActingMaster(!isOldVersionBroker && enableActingMaster);brokerData.setZoneName(zoneName);Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();boolean isMinBrokerIdChanged = false;long prevMinBrokerId = 0;if (!brokerAddrsMap.isEmpty()) {prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());}if (brokerId < prevMinBrokerId) {isMinBrokerIdChanged = true;}// 如果ip 端口相同但是 brokerId不同则删除重复的//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>//The same IP:PORT must only have one record in brokerAddrTablebrokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey());//If Local brokerId stateVersion bigger than the registering one,String oldBrokerAddr = brokerAddrsMap.get(brokerId);if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) {BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new BrokerAddrInfo(clusterName, oldBrokerAddr));if (null != oldBrokerInfo) {long oldStateVersion = oldBrokerInfo.getDataVersion().getStateVersion();long newStateVersion = topicConfigWrapper.getDataVersion().getStateVersion();if (oldStateVersion > newStateVersion) {log.warn("Registered Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +"Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);//Remove the rejected brokerAddr from brokerLiveTable.brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr));return result;}}}if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) {log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.",topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);return null;}String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));// 如果brokerId为0则为masterboolean isMaster = MixAll.MASTER_ID == brokerId;boolean isPrimeSlave = !isOldVersionBroker && !isMaster&& brokerId == Collections.min(brokerAddrsMap.keySet());// 如果 topics.config不为空并且为master,后面这个isPrimeSlave不知道是干嘛的if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {ConcurrentMap<String, TopicConfig> tcTable =topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,topicConfigWrapper.getDataVersion(), brokerName,entry.getValue().getTopicName())) {final TopicConfig topicConfig = entry.getValue();if (isPrimeSlave) {// Wipe write perm for prime slavetopicConfig.setPerm(topicConfig.getPerm() & (~PermName.PERM_WRITE));}// 创建更新实际的消费queuethis.createAndUpdateQueueData(brokerName, topicConfig);}}}if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {TopicConfigAndMappingSerializeWrapper mappingSerializeWrapper = TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = mappingSerializeWrapper.getTopicQueueMappingInfoMap();//the topicQueueMappingInfoMap should never be null, but can be emptyfor (Map.Entry<String, TopicQueueMappingInfo> entry : topicQueueMappingInfoMap.entrySet()) {if (!topicQueueMappingInfoTable.containsKey(entry.getKey())) {topicQueueMappingInfoTable.put(entry.getKey(), new HashMap<>());}//Note asset brokerName equal entry.getValue().getBname()//here use the mappingDetail.bnametopicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), entry.getValue());}}}// 构建broker信息放入brokerLiveTableBrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,new BrokerLiveInfo(System.currentTimeMillis(),timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);}//filterServerList没用过if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddrInfo);} else {this.filterServerTable.put(brokerAddrInfo, filterServerList);}}if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerAddrInfo masterAddrInfo = new BrokerAddrInfo(clusterName, masterAddr);BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddrInfo);if (masterLiveInfo != null) {result.setHaServerAddr(masterLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {notifyMinBrokerIdChanged(brokerAddrsMap, null,this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());}} catch (Exception e) {log.error("registerBroker Exception", e);} finally {this.lock.writeLock().unlock();}return result;}
首先我们看看第一次注册的参数
然后可以看到整体的代码虽然很长,实际的逻辑还是比较简单的
- 组装broker信息,放入brokerAddrTable中
- 创建或者更新queueData数据,也就是
Map<String/* topic */, Map<String, QueueData>> topicQueueTable
- 更新
Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable
- 更新broker的存活信息,即
Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
可以看到主要是还是在Nameserve对broker的一些元数据做维护,比如broker
的topic
信息、queue
信息、broker
的存活信息
总结
总的来说就是broker
启动后会向所有的Nameserver
注册自己的相关元数据信息,然后定时发送心跳。如果执行修改topic
相关的信息,也会同时更新broker
和`Nameserver·上面的元数据信息