前言
MQAdminImpl MQ管理组件提供了大量对mq进行管理的工具,其中一个就是创建Topic。它内部实现是通过 mqClient工具从 NameServer拉取当前 Topic对应的路由元数据信息,解析遍历和当前topic有关的 broker高可用分组集合,找到分组中的 master主节点,然后依次对 broker主节点发送创建topic请求。broker 高可用分组的主节点收到请求后,将 topic元数据存储在 TopicConfigManager元数据管理组件中,然后上报当前 broker节点 Topic元数据信息到所有 NameServer节点(统一管理路由数据)。
源码版本:4.9.3
源码架构图
源码分析
创建 topic源码入口
org.apache.rocketmq.client.impl.MQAdminImpl#createTopic(java.lang.String, java.lang.String, int, int) 创建 Topic源码入口,通过 mqClient客户端,从 NameServer获取当前 topic key对应的路由元数据,然后遍历 Broker分组元数据列表,对 broker分组中的主节点,发起创建 topic元数据请求。
// 创建Topicpublic void createTopic(String key,String newTopic, // topic名称int queueNum, // 队列数目int topicSysFlag // 系统标志) throws MQClientException {try {Validators.checkTopic(newTopic);Validators.isSystemTopic(newTopic);// 从NameServer获取路由数据TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(key, timeoutMillis);List<BrokerData> brokerDataList = topicRouteData.getBrokerDatas();if (brokerDataList != null && !brokerDataList.isEmpty()) {Collections.sort(brokerDataList);boolean createOKAtLeastOnce = false;MQClientException exception = null;StringBuilder orderTopicString = new StringBuilder();// 遍历获取到的broker分组元数据列表for (BrokerData brokerData : brokerDataList) {// 找到master节点地址String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (addr != null) {// 封装topic元数据配置TopicConfig topicConfig = new TopicConfig(newTopic);topicConfig.setReadQueueNums(queueNum);topicConfig.setWriteQueueNums(queueNum);topicConfig.setTopicSysFlag(topicSysFlag);boolean createOK = false;for (int i = 0; i < 5; i++) {try {// 往每个broker分组主节点,注册topic元数据this.mQClientFactory.getMQClientAPIImpl().createTopic(addr, key, topicConfig, timeoutMillis);createOK = true;createOKAtLeastOnce = true;break;} catch (Exception e) {if (4 == i) {exception = new MQClientException("create topic to broker exception", e);}}}if (createOK) {orderTopicString.append(brokerData.getBrokerName());orderTopicString.append(":");orderTopicString.append(queueNum);orderTopicString.append(";");}}}if (exception != null && !createOKAtLeastOnce) {throw exception;}} else {throw new MQClientException("Not found broker, maybe key is wrong", null);}} catch (Exception e) {throw new MQClientException("create new topic failed", e);}}
org.apache.rocketmq.client.impl.MQClientAPIImpl#createTopic 向 broker分组中的主节点发送创建 topic请求的源码,通过 netty网络通信服务器客户端,发送同步阻塞 创建、更新topic的请求。
// 向指定Broker发送创建Topic创建请求public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,final long timeoutMillis)throws RemotingException, MQBrokerException, InterruptedException, MQClientException {// 封装创建Topic请求头CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();requestHeader.setTopic(topicConfig.getTopicName());requestHeader.setDefaultTopic(defaultTopic);requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());requestHeader.setPerm(topicConfig.getPerm());requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());requestHeader.setOrder(topicConfig.isOrder());// 请求处理命令对应的请求code为 UPDATE_AND_CREATE_TOPICRemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);// 发送同步阻塞请求RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQClientException(response.getCode(), response.getRemark());}
Borker 接受创建 topic请求
org.apache.rocketmq.broker.processor.AdminBrokerProcessor#processRequest
- 解码请求头;
- 提取、封装 topic元数据信息;
- 将元数据信息存储到本节点的 TopicConfigManager topic元数据管理组件;
- 上报 topic元数据信息到所有 NameServer节点。
@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {switch (request.getCode()) {// 更新 创建 topiccase RequestCode.UPDATE_AND_CREATE_TOPIC:return this.updateAndCreateTopic(ctx, request);case RequestCode.DELETE_TOPIC_IN_BROKER:return this.deleteTopic(ctx, request);}
private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);// 解码请求头final CreateTopicRequestHeader requestHeader =(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));// 提取topic名称String topic = requestHeader.getTopic();if (!TopicValidator.validateTopic(topic, response)) {return response;}if (TopicValidator.isSystemTopic(topic, response)) {return response;}// 封装topic元数据TopicConfig topicConfig = new TopicConfig(topic);// 每个broker节点都有默认数量的读队列和写队列topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());topicConfig.setPerm(requestHeader.getPerm());topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());// 将topic元数据写入 topic元数据管理组件(注意:此处即创建topic),就是将topic元数据放入一个Map数据结构this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);// 注册topic元数据信息到 Namesrvthis.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());response.setCode(ResponseCode.SUCCESS);return response;}
Topic元数据管理组件更新topic
org.apache.rocketmq.broker.topic.TopicConfigManager#updateTopicConfig
- 更新元数据信息到内存元数据映射表;
- 更新元数据信息版本号;
- 将元数据信息持久化到磁盘文件;
// 更新topic元数据public void updateTopicConfig(final TopicConfig topicConfig) {TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);if (old != null) {log.info("update topic config, old:[{}] new:[{}]", old, topicConfig);} else {log.info("create new topic [{}]", topicConfig);}// 更新数据版本号this.dataVersion.nextVersion();// 持久化元数据到磁盘this.persist();}
上报 topic元数据信息到所有 NameServer节点
org.apache.rocketmq.broker.BrokerController#registerIncrementBrokerData
public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {TopicConfig registerTopicConfig = topicConfig;if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {registerTopicConfig =new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),this.brokerConfig.getBrokerPermission());}ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig);TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();topicConfigSerializeWrapper.setDataVersion(dataVersion);topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);// 注册Broker信息到所有NameServerdoRegisterBrokerAll(true, false, topicConfigSerializeWrapper);}
// 注册topic元数据到所有nameServer节点private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,TopicConfigSerializeWrapper topicConfigWrapper) {List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(this.brokerConfig.getBrokerClusterName(), // 集群名称this.getBrokerAddr(), // broker地址this.brokerConfig.getBrokerName(), // broker节点所在分组this.brokerConfig.getBrokerId(), // broker节点IDthis.getHAServerAddr(), // HA服务地址topicConfigWrapper, // topic元数据this.filterServerManager.buildNewFilterServerList(), // filter服务地址列表oneway, // 是否是oneway单向发送this.brokerConfig.getRegisterBrokerTimeoutMills(), // 注册超时时间this.brokerConfig.isCompressedRegister()); // 是否压缩注册信息if (registerBrokerResultList.size() > 0) {RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);if (registerBrokerResult != null) {if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());}this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());if (checkOrderConfig) {this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());}}}}