RocketMQ源码 创建Topic流程源码分析

前言

MQAdminImpl MQ管理组件提供了大量对mq进行管理的工具,其中一个就是创建Topic。它内部实现是通过 mqClient工具从 NameServer拉取当前 Topic对应的路由元数据信息,解析遍历和当前topic有关的 broker高可用分组集合,找到分组中的 master主节点,然后依次对 broker主节点发送创建topic请求。broker 高可用分组的主节点收到请求后,将 topic元数据存储在 TopicConfigManager元数据管理组件中,然后上报当前 broker节点 Topic元数据信息到所有 NameServer节点(统一管理路由数据)。

源码版本:4.9.3

源码架构图

源码分析

创建 topic源码入口

org.apache.rocketmq.client.impl.MQAdminImpl#createTopic(java.lang.String, java.lang.String, int, int) 创建 Topic源码入口,通过 mqClient客户端,从 NameServer获取当前 topic key对应的路由元数据,然后遍历 Broker分组元数据列表,对 broker分组中的主节点,发起创建 topic元数据请求。

    // 创建Topicpublic void createTopic(String key,String newTopic, // topic名称int queueNum,  // 队列数目int topicSysFlag // 系统标志) throws MQClientException {try {Validators.checkTopic(newTopic);Validators.isSystemTopic(newTopic);// 从NameServer获取路由数据TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(key, timeoutMillis);List<BrokerData> brokerDataList = topicRouteData.getBrokerDatas();if (brokerDataList != null && !brokerDataList.isEmpty()) {Collections.sort(brokerDataList);boolean createOKAtLeastOnce = false;MQClientException exception = null;StringBuilder orderTopicString = new StringBuilder();// 遍历获取到的broker分组元数据列表for (BrokerData brokerData : brokerDataList) {// 找到master节点地址String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (addr != null) {// 封装topic元数据配置TopicConfig topicConfig = new TopicConfig(newTopic);topicConfig.setReadQueueNums(queueNum);topicConfig.setWriteQueueNums(queueNum);topicConfig.setTopicSysFlag(topicSysFlag);boolean createOK = false;for (int i = 0; i < 5; i++) {try {// 往每个broker分组主节点,注册topic元数据this.mQClientFactory.getMQClientAPIImpl().createTopic(addr, key, topicConfig, timeoutMillis);createOK = true;createOKAtLeastOnce = true;break;} catch (Exception e) {if (4 == i) {exception = new MQClientException("create topic to broker exception", e);}}}if (createOK) {orderTopicString.append(brokerData.getBrokerName());orderTopicString.append(":");orderTopicString.append(queueNum);orderTopicString.append(";");}}}if (exception != null && !createOKAtLeastOnce) {throw exception;}} else {throw new MQClientException("Not found broker, maybe key is wrong", null);}} catch (Exception e) {throw new MQClientException("create new topic failed", e);}}

org.apache.rocketmq.client.impl.MQClientAPIImpl#createTopic  向 broker分组中的主节点发送创建 topic请求的源码,通过 netty网络通信服务器客户端,发送同步阻塞 创建、更新topic的请求。

    // 向指定Broker发送创建Topic创建请求public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,final long timeoutMillis)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {// 封装创建Topic请求头CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();requestHeader.setTopic(topicConfig.getTopicName());requestHeader.setDefaultTopic(defaultTopic);requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());requestHeader.setPerm(topicConfig.getPerm());requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());requestHeader.setOrder(topicConfig.isOrder());// 请求处理命令对应的请求code为  UPDATE_AND_CREATE_TOPICRemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);// 发送同步阻塞请求RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQClientException(response.getCode(), response.getRemark());}

Borker 接受创建 topic请求

org.apache.rocketmq.broker.processor.AdminBrokerProcessor#processRequest 

  1. 解码请求头;
  2. 提取、封装 topic元数据信息;
  3. 将元数据信息存储到本节点的 TopicConfigManager topic元数据管理组件;
  4. 上报 topic元数据信息到所有 NameServer节点。
    @Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {switch (request.getCode()) {// 更新 创建 topiccase RequestCode.UPDATE_AND_CREATE_TOPIC:return this.updateAndCreateTopic(ctx, request);case RequestCode.DELETE_TOPIC_IN_BROKER:return this.deleteTopic(ctx, request);}
    private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);// 解码请求头final CreateTopicRequestHeader requestHeader =(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));// 提取topic名称String topic = requestHeader.getTopic();if (!TopicValidator.validateTopic(topic, response)) {return response;}if (TopicValidator.isSystemTopic(topic, response)) {return response;}// 封装topic元数据TopicConfig topicConfig = new TopicConfig(topic);// 每个broker节点都有默认数量的读队列和写队列topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());topicConfig.setPerm(requestHeader.getPerm());topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());// 将topic元数据写入 topic元数据管理组件(注意:此处即创建topic),就是将topic元数据放入一个Map数据结构this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);// 注册topic元数据信息到 Namesrvthis.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());response.setCode(ResponseCode.SUCCESS);return response;}

Topic元数据管理组件更新topic

org.apache.rocketmq.broker.topic.TopicConfigManager#updateTopicConfig

  1. 更新元数据信息到内存元数据映射表;
  2. 更新元数据信息版本号;
  3. 将元数据信息持久化到磁盘文件;
   // 更新topic元数据public void updateTopicConfig(final TopicConfig topicConfig) {TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);if (old != null) {log.info("update topic config, old:[{}] new:[{}]", old, topicConfig);} else {log.info("create new topic [{}]", topicConfig);}// 更新数据版本号this.dataVersion.nextVersion();// 持久化元数据到磁盘this.persist();}

上报 topic元数据信息到所有 NameServer节点

org.apache.rocketmq.broker.BrokerController#registerIncrementBrokerData

public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {TopicConfig registerTopicConfig = topicConfig;if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {registerTopicConfig =new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),this.brokerConfig.getBrokerPermission());}ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig);TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();topicConfigSerializeWrapper.setDataVersion(dataVersion);topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);// 注册Broker信息到所有NameServerdoRegisterBrokerAll(true, false, topicConfigSerializeWrapper);}
    // 注册topic元数据到所有nameServer节点private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,TopicConfigSerializeWrapper topicConfigWrapper) {List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(this.brokerConfig.getBrokerClusterName(), // 集群名称this.getBrokerAddr(), // broker地址this.brokerConfig.getBrokerName(), // broker节点所在分组this.brokerConfig.getBrokerId(), // broker节点IDthis.getHAServerAddr(), // HA服务地址topicConfigWrapper, // topic元数据this.filterServerManager.buildNewFilterServerList(), // filter服务地址列表oneway, // 是否是oneway单向发送this.brokerConfig.getRegisterBrokerTimeoutMills(), // 注册超时时间this.brokerConfig.isCompressedRegister()); // 是否压缩注册信息if (registerBrokerResultList.size() > 0) {RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);if (registerBrokerResult != null) {if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());}this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());if (checkOrderConfig) {this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());}}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/318690.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

老胡的周刊(第122期)

老胡的信息周刊[1]&#xff0c;记录这周我看到的有价值的信息&#xff0c;主要针对计算机领域&#xff0c;内容主题极大程度被我个人喜好主导。这个项目核心目的在于记录让自己有印象的信息做一个留存以及共享。 &#x1f3af; 项目 movie-web[2] 开源可自部署的简约在线电影搜…

JAVA中对登录进行IP限制

一、获取登录用户的网络IP public String getIpAddress(HttpServletRequest request) {String ipAddress request.getHeader("x-forwarded-for");if (ipAddress null || ipAddress.length() 0 || "unknown".equalsIgnoreCase(ipAddress)) {ipAddress …

jdk和IDEA教育版下载和安装详解

前言 研究生专业是通信系统,为了寻找实习于是在研二时期学习java。但是在学习java的过程中没有进行系统总结,很多知识点或者一些细节已经忘记。由于工作找的是某行软件中心的软件开发。准备在毕业前对java知识进行系统性学习。本专栏将从零基础开始,从最简单的jdk和IDEA下载…

李沐机器学习系列3---深度学习计算

1 层和块 1.1 定义块 用class表示层&#xff0c;并只需要实现构造函数和前向传播函数 class MLP(nn.Module):# 用模型参数声明层。这里&#xff0c;我们声明两个全连接的层def __init__(self):# 调用MLP的父类Module的构造函数来执行必要的初始化。# 这样&#xff0c;在类实…

delete后,指针还能使用?!

int *bnew int(10);delete b;*b5;qDebug()<<*b; 结果&#xff1a;5 delete释放后的指针为什么还可以用-CSDN社区 delete后&#xff0c;系统只是把指针指向的堆空间回收&#xff0c; 但是没有将这个指针变量的值赋值为nullptr&#xff0c; 指针还是指向原来的堆空间&#…

Java学习苦旅(十六)——List

本篇博客将详细讲解Java中的List。 文章目录 预备知识——初识泛型泛型的引入泛型小结 预备知识——包装类基本数据类型和包装类直接对应关系装包与拆包 ArrayList简介ArrayList使用ArrayList的构造ArrayList常见操作ArrayList遍历 结尾 预备知识——初识泛型 泛型的引入 我…

vmware安装龙蜥操作系统

vmware安装龙蜥操作系统 1、下载龙蜥操作系统 8.8 镜像文件2、安装龙蜥操作系统 8.83、配置龙蜥操作系统 8.83.1、配置静态IP地址 和 dns3.2、查看磁盘分区3.3、查看系统版本 1、下载龙蜥操作系统 8.8 镜像文件 这里选择 2023年2月发布的 8.8 版本 官方下载链接 https://mirro…

Cytoscape3.8安装下载及安装教程

Cytoscape3.8下载链接&#xff1a;https://docs.qq.com/doc/DUmhZQ1lqTWhuSXJC 1.选中下载好的安装包右键选择“解压到 Cytoscape3.8.0”文件夹 2.打开解压好的”Cytoscape3.8.0“文件夹 3.选中“Cytoscape_3_8_0_windows_64bit.exe“右键以管理员身份运行 4.点击”Download“&…

MP3音乐播放器搜索引擎-在线搜索MP3歌曲实现(一)

首先添加网络模块和播放模块 下载文件&#xff0c;获取响应&#xff0c;错误处理,加上可以进行网络访问 要加上头文件#include<QNetworkAccessManager> 上面头文件发送请求后返回的响应类用下边的头文件 #include<QNetworkReply> 添加多媒体播放列表#include&…

四川天蝶电子商务有限公司带货服务可信吗?

四川天蝶电子商务有限公司&#xff0c;一个充满活力和创新精神的企业&#xff0c;近年来在抖音带货服务领域取得了令人瞩目的成绩。作为一家致力于提供全方位电子商务解决方案的企业&#xff0c;天蝶公司紧跟时代潮流&#xff0c;积极布局抖音电商市场&#xff0c;为商家提供了…

MidTool图文创作-GPT-4与DALL·E 3的结合

GPT-4与DALLE 3的结合 GPT-4是由OpenAI开发的最新一代语言预测模型&#xff0c;它在前代模型的基础上进行了大幅度的改进&#xff0c;不仅在文本生成的连贯性、准确性上有了显著提升&#xff0c;还在理解复杂语境和执行多步骤指令方面表现出了更高的能力。而DALLE 3则是一个创…

python+pytest接口自动化 — 参数关联

什么是参数关联&#xff1f; 参数关联&#xff0c;也叫接口关联&#xff0c;即接口之间存在参数的联系或依赖。在完成某一功能业务时&#xff0c;有时需要按顺序请求多个接口&#xff0c;此时在某些接口之间可能会存在关联关系。 比如&#xff1a;B接口的某个或某些请求参数是…