【RocketMQ】路由中心 NameServer

news/2025/3/17 4:15:06/文章来源:https://www.cnblogs.com/kukuxjx/p/18503628

1  前言

上节我们准备了源码以及环境的运行,其中我们启动的时候,会首先启动 NameServer,那么这节我们先看下组件 NameServer,看看它主要是干什么的,在整个生产消费的链路中充当了什么角色,发挥着什么作用。

2  NameServer

RocketMQ路由管理、 服务注册及服务发现的机制, NameServer是整个 RocketMQ 的“大脑” 。分布式服务 SOA架 构体系中会有服务注册中心,分布式服务SOA的注册中心主要提供服务调用的解析服务, 指引服务调用方(消费者)找到“远方”的服务提供者,完成网络通信,那么RocketMQ 的 路由中心存储的是什么数据呢?作为一款高性能的消息中间件,如何避免NameServer的单点故障,提供高可用性,我们往下看。

2.1  NameServer 架构设计

消息中间件的设计思路一般基于主题的订阅发布机制消息生产者(Producer)发 送某一主题的消息到消息服务器,消息服务器负责该消息的持久化存储,消息消费者 (Consumer)订阅感兴趣的主题,消息服务器根据订阅信息(路由信息)将消息推送到消费 者(PUSH模式)或者消息消费者主动向消息服务器拉取消息(PULL模式),从而实现消息 生产者与消息消费者解调。 为了避免消息服务器的单点故障导致的整个系统瘫痪,通常会部署多台消息服务器共同承担消息的存储。 那消息生产者如何知道消息要发往哪台消息服务器呢?如果某一台消息服务器若机了,那么生产者如何在不重启服务的情况下感知呢?NameServer 就是为了解决上述问题而设计的。

RocketMQ 的逻辑部署图如下:

Broker 消息服务器在启动时向所有NameServer 注册(这个我们看过是通过线程池 + 门闩锁CountDownLauch实现),消息生产者(Producer)在发送消息之前先从NameServer 获取 Broker 服务器地址列表,然后根据负载算法从列表中选择一 台消息服务器进行消息发送(也就是说生产者在发送的时候就选择了某个Broker)。 NameServer与每台 Broker服务器保持长连接,并间隔 5s 检测Broker 是否存活,如果检测到 Broker宕机, 则从路由注册表中将其移除。 但是路由变化不会马上通知消息生产者,为什么要这样设计呢?这是为了降低NameServer实现的复杂性,在消息发送端提供容错机制来保证消息发送的高可用性,那么发送失败了怎么办,这个我们在看消息发送的时候再细看(这里埋个引子发送失败会重试)。

NameServer 本身的高可用可通过部署多台 NameServer服务器来实现,但彼此之间 互不通信,也就是NameServer服务器之间在某一时刻的数据并不会完全相同,但这对消息发送不会造成任何影响,这也是RocketMQ NameServer设计的一个亮点, RocketMQ NameServer 设计追求简单高效。

2.2  NameServer 启动流程

从源码的角度窥探一下Names巳rver启动流程,重点关注NameServer相关启动参数。

NameServer 启动类:org.apache.rocketmq.namesrv.NamesrvStartup。

// main 启动入口
public static void main(String[] args) {main0(args);controllerManagerMain();
}

我们看看 main0:

public static NamesrvController main0(String[] args) {try {// 解析命令行参数和配置文件
        parseCommandlineAndConfigFile(args);// 创建并启动 NameServer控制器NamesrvController controller = createAndStartNamesrvController();return controller;} catch (Throwable e) {e.printStackTrace();// 异常直接退出System.exit(-1);}return null;
}

两个步骤:

(1)解析命令行以及配置文件(也就是启动前的配置初始化工作)

(2)创建并启动 NamesrcController(NameServerController 实例为NameSerer 核心控制器,别跟 SpringMVC 里的 Controller 搞混= =)

我们简单看下解析配置:

/*** 解析命令行参数和配置文件* 该方法首先设置Remoting框架的版本属性,然后解析命令行参数,接着加载配置文件(如果有提供)* 最后,根据命令行参数和配置文件初始化相关的配置对象** @param args 命令行参数数组* @throws Exception 如果解析命令行参数或加载配置文件时发生错误,则抛出异常*/
public static void parseCommandlineAndConfigFile(String[] args) throws Exception {// 设置Remoting框架的版本属性 rocketmq.remoting.version
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));// 构建命令行参数选项Options options = ServerUtil.buildCommandlineOptions(new Options());// 解析命令行参数CommandLine commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new DefaultParser());// 如果解析失败,退出程序if (null == commandLine) {System.exit(-1);return;}// 初始化配置对象namesrvConfig = new NamesrvConfig();nettyServerConfig = new NettyServerConfig();nettyClientConfig = new NettyClientConfig();// 设置Netty服务器的监听端口  也就是默认的 NameServer 端口是 9876nettyServerConfig.setListenPort(9876);// 如果命令行参数中包含配置文件选项,加载配置文件if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {// 读取并解析配置文件InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file)));properties = new Properties();properties.load(in);// 将配置属性设置到配置对象中
            MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);// 如果配置中启用了控制器功能,初始化并配置控制器if (namesrvConfig.isEnableControllerInNamesrv()) {controllerConfig = new ControllerConfig();MixAll.properties2Object(properties, controllerConfig);}// 设置配置文件路径
            namesrvConfig.setConfigStorePath(file);// 确认配置文件加载成功并关闭输入流System.out.printf("load config properties file OK, %s%n", file);in.close();}}// 将命令行参数设置到配置对象中
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);// 如果命令行参数中包含打印配置信息的选项,打印配置信息并退出程序if (commandLine.hasOption('p')) {MixAll.printObjectProperties(logConsole, namesrvConfig);MixAll.printObjectProperties(logConsole, nettyServerConfig);MixAll.printObjectProperties(logConsole, nettyClientConfig);if (namesrvConfig.isEnableControllerInNamesrv()) {MixAll.printObjectProperties(logConsole, controllerConfig);}System.exit(0);}// 如果未设置RocketMQ的安装路径,提示用户设置环境变量并退出程序// 也就是环境变量中要有 ROCKETMQ_HOME 这就是我们上节刚开始启动 NameServer 报错的原因位置if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}// 打印最终的配置信息
    MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);
}

从代码我们可以知道先创建 NamesrvConfig ( NameServer业务参数)、 NettyServerConfig ( NameServer 网络参数), 然后在解析启动时把指定的配置文件或启动命令中的选项 值,填充到namesrvConfig、nettyServerConfig 对象。

我们看看 NamesrvConfig  属性:

// rocketmq 主目录,可以通过-Drocketmq.home.dir=path 或通过设置环境变量ROCKETMQ_HOME来配置RocketMQ 的主目录
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
// NameServer 存储 KV 配置属性的持久化路径
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
// NameServer 默认配置文件路径
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
private String productEnvName = "center";
private boolean clusterTest = false;
// 是否支持顺序消息,默认是不支持
private boolean orderMessageEnable = false;
private boolean returnOrderTopicConfigToBroker = true;
/*** Indicates the nums of thread to handle client requests, like GET_ROUTEINTO_BY_TOPIC.*/
private int clientRequestThreadPoolNums = 8;
/*** Indicates the nums of thread to handle broker or operation requests, like REGISTER_BROKER.*/
private int defaultThreadPoolNums = 16;
/*** Indicates the capacity of queue to hold client requests.*/
private int clientRequestThreadPoolQueueCapacity = 50000;
/*** Indicates the capacity of queue to hold broker or operation requests.*/
private int defaultThreadPoolQueueCapacity = 10000;
/*** Interval of periodic scanning for non-active broker; 扫描 Broker 是否存活的间隔时间 5 秒*/
private long scanNotActiveBrokerInterval = 5 * 1000;
private int unRegisterBrokerQueueCapacity = 3000;
/*** Support acting master or not.** The slave can be an acting master when master node is down to support following operations:* 1. support lock/unlock message queue operation.* 2. support searchOffset, query maxOffset/minOffset operation.* 3. support query earliest msg store time.*/
private boolean supportActingMaster = false;
private volatile boolean enableAllTopicList = true;
private volatile boolean enableTopicList = true;
private volatile boolean notifyMinBrokerIdChanged = false;
/*** Is startup the controller in this name-srv*/
private boolean enableControllerInNamesrv = false;
private volatile boolean needWaitForService = false;
private int waitSecondsForService = 45;
/*** If enable this flag, the topics that don't exist in broker registration payload will be deleted from name server.** WARNING:* 1. Enable this flag and "enableSingleTopicRegister" of broker config meanwhile to avoid losing topic route info unexpectedly.* 2. This flag does not support static topic currently.*/
private boolean deleteTopicWithBrokerRegistration = false;

再看看 NettyServerConfig  属性:

// NameServer 默认绑定地址
private String bindAddress = "0.0.0.0";
// NameServer 监昕端口,该值默认会被初始化为 9876
private int listenPort = 0;
// Netty 业务线程池线程个数
private int serverWorkerThreads = 8;
// Netty public 任务线程池线程个数, Netty 网络设计,根据业务类型会创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。如果该业务类型(RequestCode)未注册线程池, 则由 public线程池执行
private int serverCallbackExecutorThreads = 0;
//  IO 线程池线程个数,主要是 NameServer、Broker 端解析请求、返回相应的线程个数,这类线程主要是处理网络请求的,解析请求包, 然后转发到各个业务线程池完成具体的业务操作,然后将结果再返回调用方
private int serverSelectorThreads = 3;
// send oneway 消息请求井发度(Broker 端参数)
private int serverOnewaySemaphoreValue = 256;
// 异步消息发送最大并发度(Broker 端参数)
private int serverAsyncSemaphoreValue = 64;
// 网络连接最大空闲时间,默认 120s。如果连接空闲时间超过该参数设置的值,连接将被关闭
private int serverChannelMaxIdleTimeSeconds = 120;
// 网络 socket 发送缓存区大小, 默认 64k
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
// 网络 socket 接收缓存区大小,默认 64k
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
/*** 高水位标记,当Channel的待写入数据量达到此值时,Netty会自动关闭Channel的写操作,* 需要用户手动调用Channel的flush方法来刷新缓冲区以继续写入数据。* 这有助于防止应用程序过度缓冲数据,导致内存使用过多。*/
private int writeBufferHighWaterMark = NettySystemConfig.writeBufferHighWaterMark;
/*** 低水位标记,当Channel的待写入数据量减少到此值时,Netty会自动重新打开Channel的写操作,* 允许数据再次被写入缓冲区。* 这有助于在数据量减少到一个合理水平时恢复写操作,保证数据传输的流畅。*/
private int writeBufferLowWaterMark = NettySystemConfig.writeBufferLowWaterMark;
// 同时处理的连接请求的最大数量 默认1024
private int serverSocketBacklog = NettySystemConfig.socketBacklog;
// ByteBuffer 是否开启缓存, 建议开启
private boolean serverPooledByteBufAllocatorEnable = true;
// 是否启用 Epoll IO 模型, Linux 环境建议开启
private boolean useEpollNativeSelector = false;

然后我们看看创建以及启动 NamesrvController:

public static NamesrvController createAndStartNamesrvController() throws Exception {// 创建 NameServer 控制器实例NamesrvController controller = createNamesrvController();// 启动 NameServer控制器
    start(controller);// 获取 Netty服务器配置NettyServerConfig serverConfig = controller.getNettyServerConfig();// 格式化输出String tip = String.format("The Name Server boot success. serializeType=%s, address %s:%d", RemotingCommand.getSerializeTypeConfigInThisServer(), serverConfig.getBindAddress(), serverConfig.getListenPort());// 记录启动日志
    log.info(tip);// 控制台输出启动信息System.out.printf("%s%n", tip);// 返回创建并启动的 NameServer控制器实例return controller;
}

格式化输出的信息,就是我们上节启动 NameServer 后输出的信息:

我们这里主要看下启动 start:

public static NamesrvController start(final NamesrvController controller) throws Exception {// 检查传入的 NamesrvController 实例是否为null,如果是,则抛出异常if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");}// 初始化 NamesrvController,如果初始化失败,则关闭控制器并退出程序boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}// 注册关闭钩子,确保在程序退出时优雅地关闭 NamesrvControllerRuntime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {controller.shutdown();return null;}));// 启动NamesrvController
    controller.start();// 返回启动后的 NamesrvController 实例return controller;
}

可以看到启动主要做了三件事情:

(1)初始化

(2)注册关闭钩子函数

(3)启动

先看下 NamesrvController 的初始化方法 initialize:

public boolean initialize() {// 加载系统配置,这是系统运行所必需的配置信息
    loadConfig();// 初始化网络组件,为网络通信做准备
    initiateNetworkComponents();// 初始化线程池,用于管理和执行系统中的各种任务
    initiateThreadExecutors();// 注册处理器,这些处理器将处理系统中的各种请求和任务
    registerProcessor();// 启动定时服务,比如检查 Broker 的存活状态
    startScheduleService();// 初始化SSL上下文,为安全通信做准备
    initiateSslContext();// 初始化RPC钩子,用于在远程过程调用时执行特定操作
    initiateRpcHooks();// 返回return true;
}

我们这里看下启动的调度任务 startScheduleService:

private void startScheduleService() {// 定时扫描不活跃的Broker并移除 默认每隔 5秒this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);// 周期性地打印所有配置信息 默认每隔 10分钟this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically,1, 10, TimeUnit.MINUTES);// 定期打印水位信息 队列大小等信息 默认每隔 1秒this.scheduledExecutorService.scheduleAtFixedRate(() -> {try {NamesrvController.this.printWaterMark();} catch (Throwable e) {LOGGER.error("printWaterMark error.", e);}}, 10, 1, TimeUnit.SECONDS);
}

最后我们看下 NamesrvController 的启动 start:

public void start() throws Exception {// 启动服务端this.remotingServer.start();// 这里不会走 因为看前边端口会被设置为 9876 if (0 == nettyServerConfig.getListenPort()) {nettyServerConfig.setListenPort(this.remotingServer.localListenPort());}this.remotingClient.updateNameServerAddressList(Collections.singletonList(NetworkUtil.getLocalAddress()+ ":" + nettyServerConfig.getListenPort()));// 启动客户端this.remotingClient.start();// 如果文件监视服务已初始化,则启动该服务// initialize 初始化的时候, initiateSslContext 初始化 ssl的时候,会初始化 fileWatchService // 监听的文件列表是 {TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath}// tls.server.certPath tls.server.keyPath tls.server.trustCertPath// 都是 ssl 认证相关的 也就是当你开启了 ssl 会监听证书的变化if (this.fileWatchService != null) {this.fileWatchService.start();}// 启动路由信息管理器this.routeInfoManager.start();
}

最后我们看下服务端的启动 this.remotingServer.start():

public void start() {// 创建一个DefaultEventExecutorGroup实例,用于处理连接和请求this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactoryImpl("NettyServerCodecThread_"));// 准备共享的处理器,这些处理器可以在多个通道中共享
    prepareSharableHandlers();// 配置服务器的引导程序serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_REUSEADDR, true).childOption(ChannelOption.SO_KEEPALIVE, false).childOption(ChannelOption.TCP_NODELAY, true).localAddress(new InetSocketAddress(this.nettyServerConfig.getBindAddress(),this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {configChannel(ch);}});// 添加自定义配置,如果有的话
    addCustomConfig(serverBootstrap);try {// 尝试绑定服务器到指定的端口,并等待操作完成  这里就是绑定我们的 9876端口ChannelFuture sync = serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();// 如果配置的端口为0,则使用分配的端口if (0 == nettyServerConfig.getListenPort()) {this.nettyServerConfig.setListenPort(addr.getPort());}// 记录服务器启动信息log.info("RemotingServer started, listening {}:{}", this.nettyServerConfig.getBindAddress(),this.nettyServerConfig.getListenPort());// 将服务器实例添加到服务器表中this.remotingServerTable.put(this.nettyServerConfig.getListenPort(), this);} catch (Exception e) {// 如果绑定失败,抛出一个异常throw new IllegalStateException(String.format("Failed to bind to %s:%d", nettyServerConfig.getBindAddress(),nettyServerConfig.getListenPort()), e);}// 如果存在通道事件监听器,则启动Netty事件执行器if (this.channelEventListener != null) {this.nettyEventExecutor.start();}// 创建并启动一个定时任务,定期扫描响应表TimerTask timerScanResponseTable = new TimerTask() {@Overridepublic void run(Timeout timeout) {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);} finally {timer.newTimeout(this, 1000, TimeUnit.MILLISECONDS);}}};this.timer.newTimeout(timerScanResponseTable, 1000 * 3, TimeUnit.MILLISECONDS);// 定期执行任务,打印远程代码分布scheduledExecutorService.scheduleWithFixedDelay(() -> {try {NettyRemotingServer.this.printRemotingCodeDistribution();} catch (Throwable e) {TRAFFIC_LOGGER.error("NettyRemotingServer print remoting code distribution exception", e);}}, 1, 1, TimeUnit.SECONDS);
}

好啦,到这里我们的启动过程就看的差不多了,核心是 NamesrvController 控制器,看完我们最起码要知道的是,默认的端口是 9876,并且会有一个调度任务是每隔 5秒 扫描 Broker 的状态,不存活的直接移除。

public void scanNotActiveBroker() {try {// 开始扫描不活跃的 Broker log.info("start scanNotActiveBroker");// 遍历BrokerLiveTable中的每个Broker信息for (Entry<BrokerAddrInfo, BrokerLiveInfo> next : this.brokerLiveTable.entrySet()) {// 获取Broker最后一次更新时间long last = next.getValue().getLastUpdateTimestamp();// 获取Broker的心跳超时时间 默认是 120秒 DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;long timeoutMillis = next.getValue().getHeartbeatTimeoutMillis();// 判断Broker是否超时 也就是超过120秒没有更新的话 就认为不存活 则进行 destory 剔除if ((last + timeoutMillis) < System.currentTimeMillis()) {// 关闭Broker的通信通道
                RemotingHelper.closeChannel(next.getValue().getChannel());// 记录Broker通道过期警告日志log.warn("The broker channel expired, {} {}ms", next.getKey(), timeoutMillis);// 调用Broker通道销毁后的处理方法this.onChannelDestroy(next.getKey());}}} catch (Exception e) {// 记录扫描不活跃Broker时遇到的异常log.error("scanNotActiveBroker exception", e);}
}

2.3  NameServer 路由注册、故障剔除

NameServer 主要作用是为消息生产者和消息消费者提供关于主题Topic 的路由信息, 那么NameServer 需要存储路由的基础信息,还要能够管理Broker节点,包括路由注册、 路由删除等功能。

2.3.1  路由元信息

NameServer 路由实现类: org.apache.rocketmq.namesrv.routeinfo.RoutelnfoManager, 在了解路由注册之前,我们首先看一下 NameServer 到底存储哪些信息。

// Broker 默认超时时间 120秒
private final static long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
// Topic 消息队列路由信息,消息发送时根据路由表进行负载均衡
private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
// Broker 基础信息, 包含 brokerName、 所属集群名称、 主备 Broker地址
private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
// Broker 集群信息,存储集群中所有 Broker 名称
private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// Broker 状态信息。 NameServer 每次收到心跳包时会替换该信息
private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// Broker 上的 FilterServer 列表,用于类模式消息过滤
private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
// Topic 的队列信息映射
private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;
private final BatchUnregistrationService unRegisterService;
// 所属配置
private final NamesrvController namesrvController;
private final NamesrvConfig namesrvConfig;

RocketMQ 基于订阅发布机制, 一个Topic 拥有多个消息队列,一个Broker为每一主题默认创建4个读队列4个写队列。 多个Broker组成一个集群, BrokerName 由相同的多台 Broker 组成Master-Slave 架构, brokerId 为 0 代表 Master, 大于 0 表示 Slave。 BrokerLivelnfo 中 的 lastUpdateTimestamp 存储上次收到 Broker 心跳包的时间

QueueData、 BrokerData、 BrokerLiveinfo 类图信息图下:

比如RocketMQ2 主 2 从部署图如下:

对应运行时数据结构如下:

2.3.2  路由注册

RocketMQ 路由注册是通过 Broker 与 NameServer 的心跳功能实现的。 Broker启动时向集群中所有的NameServer发送心跳语句,每隔 30s 向集群中所有NameServer发送心跳包, NameServer 收到 Broker 心跳包时会更新 brokerLiveTable 缓存中 BrokerLivelnfo 的 l astUpdateTimestamp ,然后 NameServer 每隔 5s 扫描 brokerLiveTable,如果连续 120s 没有收到心跳包, NameServer将移除该Broker 的路由信息同时关闭 Socket连接。

2.3.2.1  Broker 发送心跳包

Broker 发送心跳包的核心代码如下:

// BrokerController#start
public void start() throws Exception {...scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {@Overridepublic void run0() {try {if (System.currentTimeMillis() < shouldStartTime) {BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);return;}if (isIsolated) {BrokerController.LOG.info("Skip register for broker is isolated");return;}// 注册BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {BrokerController.LOG.error("registerBrokerAll Exception", e);}}// 延迟10秒启动 // brokerConfig.getRegisterNameServerPeriod() 默认 30秒 private int registerNameServerPeriod = 1000 * 30; // 也就是每隔 30秒执行一次}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));...
}

最后落点是在 BrokerOuterAPI#registerBrokerAll:

// BrokerOuterAPI#registerBrokerAll
public List<RegisterBrokerResult> registerBrokerAll(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final boolean oneway,final int timeoutMills,final boolean enableActingMaster,final boolean compressed,final Long heartbeatTimeoutMillis,final BrokerIdentity brokerIdentity) {final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();// 获取所有的 NameServer 信息List<String> nameServerAddressList = this.remotingClient.getAvailableNameSrvList();if (nameServerAddressList != null && nameServerAddressList.size() > 0) {final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);requestHeader.setHaServerAddr(haServerAddr);requestHeader.setEnableActingMaster(enableActingMaster);requestHeader.setCompressed(false);if (heartbeatTimeoutMillis != null) {requestHeader.setHeartbeatTimeoutMillis(heartbeatTimeoutMillis);}RegisterBrokerBody requestBody = new RegisterBrokerBody();requestBody.setTopicConfigSerializeWrapper(TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper));requestBody.setFilterServerList(filterServerList);final byte[] body = requestBody.encode(compressed);final int bodyCrc32 = UtilAll.crc32(body);requestHeader.setBodyCrc32(bodyCrc32);// 计数器锁final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());// 扔到线程池中注册for (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(new AbstractBrokerRunnable(brokerIdentity) {@Overridepublic void run0() {try {RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);if (result != null) {registerBrokerResultList.add(result);}LOGGER.info("Registering current broker to name server completed. TargetHost={}", namesrvAddr);} catch (Exception e) {LOGGER.error("Failed to register current broker to name server. TargetHost={}", namesrvAddr, e);} finally {countDownLatch.countDown();}}});}try {if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) {LOGGER.warn("Registration to one or more name servers does NOT complete within deadline. Timeout threshold: {}ms", timeoutMills);}} catch (InterruptedException ignore) {}}return registerBrokerResultList;
}

该方法主要是遍历NameServer列表, Broker 消息服务器依次向 NameServer发送心跳包。

发送心跳包具体逻辑,首先封装请求包头(Header):

brokerAddr: broker 地址

broker Id: brokerld,O:Master:,大于 0: Slave

brokerName: broker 名称

clusterName: 集群名称

haServerAddr: master 地址,初次请求时该值为空, slave 向 Nameserver 注册后返回

requestBody:filterServerList。 消息过滤服务器列表;topicConfigWrapper。 主题配置, topicConfigWrapper 内部封装的是 TopicConfigManager 中的 topicConfigTable,内部存储的是 Broker启动时默认的一些 Topic, MixAll. SELF_TEST_ TOPIC 、 MixAll.DEFAULT_TOPIC ( AutoCreateTopic Enable=true )., MixAll.BENCHMARK_TOPIC 、 MixAll.OFFSET_MOVED_EVENT、 BrokerConfig#brokerClusterName、 BrokerConfig#brokerName。 Broker 中 Topic 默认存储在${Rocket_Home}/store/confg/topic. json 中。

RocketMQ 网络传输基于 Netty, 具体网络实现细节本书不会过细去剖析,在这里介绍 一下网络跟踪方法: 每一个请求, RocketMQ 都会定义一个RequestCode,然后在服务端会 对应相应的网络处理器(processor包中), 只需整库搜索 RequestCode 即可找到相应的处理 逻辑。

2.3.2.2  NameServer 处理心跳包

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor 网络处理器解析请求类 型, 如果请求类型为RequestCode.REGISTER_BROKER,则请求最终转发到 RoutelnfoManager#registerBroker。

public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final String zoneName,final Long timeoutMillis,final Boolean enableActingMaster,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {// 加锁 这个是不是有点问题  lock 是不是应该写在 try 上边?this.lock.writeLock().lockInterruptibly();//init or update the cluster infoSet<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>());brokerNames.add(brokerName);boolean registerFirst = false;// 获取当前的 broker 信息BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());// 更新this.brokerAddrTable.put(brokerName, brokerData);}// 具体的主从的逻辑 我就没看了哈} catch (Exception e) {log.error("registerBroker Exception", e);} finally {this.lock.writeLock().unlock();}return result;
}

路由注册需要加写锁,防止并发修改RoutelnfoManager 中的路由表。 首先判断 Broker 所属集群是否存在, 如果不存在则创建,然后将broker名加入到集群Broker集合中。

BrokerLivelnfo,存活 Broker 信息表, BrokeLivelnfo 是执行路由删除的重要依据。

2.3.3  路由删除

上面看到Broker每隔 30s 向 NameServer发送一个心跳包,心跳包中包含 Broker Id 、 Broker 地址、 Broker 名称、 Broker 所属集群名称、 Broker 关联的 FilterServer 列表。 但是如果Broker若机, NameServer无法收到心跳包,此时NameServer如何来剔除这些失效的Broker 呢? Name Server 会每隔 5s 扫描 brokerLiveTable 状态表,如果 BrokerLive 的 lastUpdateTimestamp 的时间戳距当前时间超过 120s,则认为 Broker失效,移除该 Broker, 关闭与Broker连接,并同时更新topicQueueTable、 brokerAddrTable、 brokerLive Table、 filterServerTable。

RocktMQ 有两个触发点来触发路由删除:

(1)NameServer 定时扫描 brokerLiveTable 检测上次心跳包与当前系统时间的时间差, 如果时间戳大于 120s,则需要移除该Broker信息 这个我们上边看过了。

(2)Broker 在正常被关闭的情况下,会执行unrRgisterBroker指令。

由于不管是何种方式触发的路由删除,路由删除的方法都是一样的,就是从topic QueueTable、 brokerAddrTable、 brokerLiveTable、 filterServerTable 删除与该 Broker 相关的信息。

2.3.4  路由发现

RocketMQ 路由发现是非实时的,当 Topic路由 出现变化后, NameServer不主动推送给客户端, 而 是由客户端定时拉取主题最新的路由。 根据主题名 称拉取路由信息的命令编码为: GET_ROUTEINTO_BY_TOPIC。RocketMQ 路由结果如图 2-6 所示。

orderTopicConf :顺序消息配置内容,来自于 kvConfig

List<QueueData> queueDatas: topic 队列元数据

List<BrokerData> brokerDatas : topic 分布的 broker 元数据

HashMap<String/* brokerAddress*/ ,List<String>/*filterServer*/>filterServerTable: broker 上过滤服务器地址列表

NameServer 路由发现实现类:ClientRequestProcessor#getRoutelnfoByTopic

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {// 创建一个响应命令对象,用于后续填充响应信息final RemotingCommand response = RemotingCommand.createResponseCommand(null);// 解析请求命令的自定义头,获取特定的请求参数final GetRouteInfoRequestHeader requestHeader =(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);// 检查名称服务器是否准备就绪boolean namesrvReady = needCheckNamesrvReady.get() && System.currentTimeMillis() - startupTimeMillis >= TimeUnit.SECONDS.toMillis(namesrvController.getNamesrvConfig().getWaitSecondsForService());// 如果名称服务器未准备就绪且配置了等待服务,则返回错误响应if (namesrvController.getNamesrvConfig().isNeedWaitForService() && !namesrvReady) {log.warn("name server not ready. request code {} ", request.getCode());response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("name server not ready");return response;}// 从路由信息管理器中获取指定主题的路由数据TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());// 如果找到了主题的路由数据,则进行后续处理if (topicRouteData != null) {...// 根据请求版本和是否只接受标准JSON来决定如何序列化主题路由数据byte[] content;Boolean standardJsonOnly = requestHeader.getAcceptStandardJsonOnly();if (request.getVersion() >= MQVersion.Version.V4_9_4.ordinal() || null != standardJsonOnly && standardJsonOnly) {content = topicRouteData.encode(SerializerFeature.BrowserCompatible,SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField,SerializerFeature.MapSortField);} else {content = topicRouteData.encode();}// 填充响应命令的主体,设置成功响应码,并返回响应
        response.setBody(content);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}// 如果没有找到主题的路由信息,则返回错误响应
    response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));return response;
}

主要就是调用 RouterlnfoManager 的方法,从路由表 topicQueueTable、 brokerAddrTable、 filterServerTable 中分别填充 TopicRouteData 中的 List<QueueData>、 List<BrokerData>和 filterServer 地址表。如果找到主题对应的路由信息并且该主题为顺序消息,则从NameServer KVconfig 中获取关于顺序消息相关的配置填充路由信息。如果找不到路由信息CODE则使用TOPIC NOT_EXISTS,表示没有找到对应的路由。

3  小结

本章主要介绍了NameServer路由功能,包含路由元数据、路由注册与发现机制。 为了 加强对本章的理解,路由发现机制,大致关系如下:

还有我们起码要知道的是路由元信息是存放在 NamesrvConntroller 核心控制器里的 RouteInfoManager类里的几个集合中,Broker路由注册是BrokerOutApi里的 registerBroker 方法,路由心跳是相互的。NamesrvController 里的定时任务每隔5秒看看 Broker列表里是否都存活以及Broker启动的时候启动定时任务每隔 30秒 更新一下自己的元信息保持存活。发送消息的时候通过 ClientRequestProcessor里的getRoutelnfoByTopic方法获取某个 Topic 的路由信息,知道这些关键类的关系哈,有理解不对的地方还请指正哈。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/822286.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CUDA编程学习 (1)——CUDA C介绍

这篇文章是关于CUDA编程的基础介绍,主要聚焦于CUDA C的概念与内存管理。文章首先阐明了CUDA的异构计算模型,区分了CPU(host)和GPU(device)的角色,并介绍了它们之间的数据移动和通信机制。接着,详细说明了CUDA程序的执行流程,包括内存分配、数据传输和核函数的调用。1.…

博图SCL均值计算

这一篇学习笔记在新浪博客记录过,这里再记录一次。 工作中有时候会需要做一些均值计算,比如计算某个测量值近一分钟的均值,近一小时均值,近一天的均值。今天在家休息,试着做一下分钟均值,按照每秒一个数据,比如现在时刻10:07:10,那么计算从10:06:11到10:07:10这个时间段…

CUDA编程学习 (2)——CUDA并行性模型

这篇文章深入探讨了CUDA编程中的并行性模型,重点介绍了基于kernel的单指令多数据(SPMD)编程。首先,通过向量加法示例,展示了CUDA内核函数的编写与调用,并解释了__global__、__device__和__host__的不同用法。接着,文章扩展到多维网格配置,演示如何处理图像数据,如RGB转灰…

SLF4J 中的适配器模式

什么是适配器模式 适配器模式中,适配器包装不兼容指定接口的对象,来实现不同兼容指定接口。 SLF4J 中的适配器模式 SLF4J 是一个日志门面系统,其中提供了统一的 Logger 等接口,许多框架都会面向 SLF4J 打印日志,这样就不会和具体的日志框架耦合在一起,框架使用者也就能够…

Pbootcms留言“提交成功”的提示语怎么修改

要在 PbootCMS 中修改留言“提交成功”的提示语,可以按照以下步骤操作:定位文件:打开 apps/home/controller/MessageController.php 文件。查找代码段:在文件中找到大约第 103 行的代码段,该段代码如下:if ($this->model->addMessage($data)) {session(lastsub, ti…

Protues中51单片机按键无法复位(已解决)

前言 昨晚用 Protues 搭建了 51 的最小系统电路,在实物中好用的复位电路,到仿真里不能正常复位了。 51 单片机是高电平复位,所以在运行时 RST 引脚应该是低电平,但在仿真中 RST 引脚一直保持高电平,导致按下按键也不能复位单片机。解决方法 我在网上搜索的解决方法一共有两…

南昌航空大学-22207316-涂高杰-JAVA第一次blog作业

一.前言 本学期新增JAVA的面向程序设计课程,为增加学生编写能力开始了本学期的PTA作业,以及接下来我将根据我的实际情况总结前三次PTA题目集中最后一题并讲诉自己对Java的学习心得。从这三次PTA作业中学习到的了对ArrayLis、Vector等自动增长的数组的使用方法,学习到了许多Ja…

安装网站出现404 not found如何解决?

遇到404 Not Found错误时,可以尝试以下几个步骤来解决问题:检查URL:确认输入的网址是否正确,包括大小写和拼写。 检查是否有遗漏或多余的字符。清除浏览器缓存:有时候旧的缓存数据会导致页面加载错误,清除缓存后重新尝试访问。刷新页面:使用F5键或点击浏览器的刷新按钮重…

pbootcms模板上线推广百度竞价后打不开网站出现404错误

PbootCMS V3.2.5 版本中为了增强安全性或优化URL结构,加入了对URL参数的严格判断。当URL中包含?但不符合特定条件(如/?tag=、/?page=、/?ext_)时,系统会自动返回404错误页面。这种做法虽然有助于防止一些非法请求,但也可能导致合法的请求被误判为无效,特别是对于那些…

分布式数据库TDSQL搭建

TDSQL介绍TDSQL是腾讯基于MySQL/Mariadb社区版本打造的一款金融级分布式数据库集群方案,目前腾讯主推TDSQL MySQL版。TDSQL MySQL版具备强一致高可用、全球部署架构、分布式水平扩展、高性能、企业级安全等特性,同时提供智能 DBA、自动化运营、监控告警等配套设施,为客户提供…