1 前言
上节我们准备了源码以及环境的运行,其中我们启动的时候,会首先启动 NameServer,那么这节我们先看下组件 NameServer,看看它主要是干什么的,在整个生产消费的链路中充当了什么角色,发挥着什么作用。
2 NameServer
RocketMQ路由管理、 服务注册及服务发现的机制, NameServer是整个 RocketMQ 的“大脑” 。分布式服务 SOA架 构体系中会有服务注册中心,分布式服务SOA的注册中心主要提供服务调用的解析服务, 指引服务调用方(消费者)找到“远方”的服务提供者,完成网络通信,那么RocketMQ 的 路由中心存储的是什么数据呢?作为一款高性能的消息中间件,如何避免NameServer的单点故障,提供高可用性,我们往下看。
2.1 NameServer 架构设计
消息中间件的设计思路一般基于主题的订阅发布机制消息生产者(Producer)发 送某一主题的消息到消息服务器,消息服务器负责该消息的持久化存储,消息消费者 (Consumer)订阅感兴趣的主题,消息服务器根据订阅信息(路由信息)将消息推送到消费 者(PUSH模式)或者消息消费者主动向消息服务器拉取消息(PULL模式),从而实现消息 生产者与消息消费者解调。 为了避免消息服务器的单点故障导致的整个系统瘫痪,通常会部署多台消息服务器共同承担消息的存储。 那消息生产者如何知道消息要发往哪台消息服务器呢?如果某一台消息服务器若机了,那么生产者如何在不重启服务的情况下感知呢?NameServer 就是为了解决上述问题而设计的。
RocketMQ 的逻辑部署图如下:
Broker 消息服务器在启动时向所有NameServer 注册(这个我们看过是通过线程池 + 门闩锁CountDownLauch实现),消息生产者(Producer)在发送消息之前先从NameServer 获取 Broker 服务器地址列表,然后根据负载算法从列表中选择一 台消息服务器进行消息发送(也就是说生产者在发送的时候就选择了某个Broker)。 NameServer与每台 Broker服务器保持长连接,并间隔 5s 检测Broker 是否存活,如果检测到 Broker宕机, 则从路由注册表中将其移除。 但是路由变化不会马上通知消息生产者,为什么要这样设计呢?这是为了降低NameServer实现的复杂性,在消息发送端提供容错机制来保证消息发送的高可用性,那么发送失败了怎么办,这个我们在看消息发送的时候再细看(这里埋个引子发送失败会重试)。
NameServer 本身的高可用可通过部署多台 NameServer服务器来实现,但彼此之间 互不通信,也就是NameServer服务器之间在某一时刻的数据并不会完全相同,但这对消息发送不会造成任何影响,这也是RocketMQ NameServer设计的一个亮点, RocketMQ NameServer 设计追求简单高效。
2.2 NameServer 启动流程
从源码的角度窥探一下Names巳rver启动流程,重点关注NameServer相关启动参数。
NameServer 启动类:org.apache.rocketmq.namesrv.NamesrvStartup。
// main 启动入口 public static void main(String[] args) {main0(args);controllerManagerMain(); }
我们看看 main0:
public static NamesrvController main0(String[] args) {try {// 解析命令行参数和配置文件 parseCommandlineAndConfigFile(args);// 创建并启动 NameServer控制器NamesrvController controller = createAndStartNamesrvController();return controller;} catch (Throwable e) {e.printStackTrace();// 异常直接退出System.exit(-1);}return null; }
两个步骤:
(1)解析命令行以及配置文件(也就是启动前的配置初始化工作)
(2)创建并启动 NamesrcController(NameServerController 实例为NameSerer 核心控制器,别跟 SpringMVC 里的 Controller 搞混= =)
我们简单看下解析配置:
/*** 解析命令行参数和配置文件* 该方法首先设置Remoting框架的版本属性,然后解析命令行参数,接着加载配置文件(如果有提供)* 最后,根据命令行参数和配置文件初始化相关的配置对象** @param args 命令行参数数组* @throws Exception 如果解析命令行参数或加载配置文件时发生错误,则抛出异常*/ public static void parseCommandlineAndConfigFile(String[] args) throws Exception {// 设置Remoting框架的版本属性 rocketmq.remoting.version System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));// 构建命令行参数选项Options options = ServerUtil.buildCommandlineOptions(new Options());// 解析命令行参数CommandLine commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new DefaultParser());// 如果解析失败,退出程序if (null == commandLine) {System.exit(-1);return;}// 初始化配置对象namesrvConfig = new NamesrvConfig();nettyServerConfig = new NettyServerConfig();nettyClientConfig = new NettyClientConfig();// 设置Netty服务器的监听端口 也就是默认的 NameServer 端口是 9876nettyServerConfig.setListenPort(9876);// 如果命令行参数中包含配置文件选项,加载配置文件if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {// 读取并解析配置文件InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file)));properties = new Properties();properties.load(in);// 将配置属性设置到配置对象中 MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);// 如果配置中启用了控制器功能,初始化并配置控制器if (namesrvConfig.isEnableControllerInNamesrv()) {controllerConfig = new ControllerConfig();MixAll.properties2Object(properties, controllerConfig);}// 设置配置文件路径 namesrvConfig.setConfigStorePath(file);// 确认配置文件加载成功并关闭输入流System.out.printf("load config properties file OK, %s%n", file);in.close();}}// 将命令行参数设置到配置对象中 MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);// 如果命令行参数中包含打印配置信息的选项,打印配置信息并退出程序if (commandLine.hasOption('p')) {MixAll.printObjectProperties(logConsole, namesrvConfig);MixAll.printObjectProperties(logConsole, nettyServerConfig);MixAll.printObjectProperties(logConsole, nettyClientConfig);if (namesrvConfig.isEnableControllerInNamesrv()) {MixAll.printObjectProperties(logConsole, controllerConfig);}System.exit(0);}// 如果未设置RocketMQ的安装路径,提示用户设置环境变量并退出程序// 也就是环境变量中要有 ROCKETMQ_HOME 这就是我们上节刚开始启动 NameServer 报错的原因位置if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}// 打印最终的配置信息 MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig); }
从代码我们可以知道先创建 NamesrvConfig ( NameServer业务参数)、 NettyServerConfig ( NameServer 网络参数), 然后在解析启动时把指定的配置文件或启动命令中的选项 值,填充到namesrvConfig、nettyServerConfig 对象。
我们看看 NamesrvConfig 属性:
// rocketmq 主目录,可以通过-Drocketmq.home.dir=path 或通过设置环境变量ROCKETMQ_HOME来配置RocketMQ 的主目录 private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); // NameServer 存储 KV 配置属性的持久化路径 private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; // NameServer 默认配置文件路径 private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"; private String productEnvName = "center"; private boolean clusterTest = false; // 是否支持顺序消息,默认是不支持 private boolean orderMessageEnable = false; private boolean returnOrderTopicConfigToBroker = true; /*** Indicates the nums of thread to handle client requests, like GET_ROUTEINTO_BY_TOPIC.*/ private int clientRequestThreadPoolNums = 8; /*** Indicates the nums of thread to handle broker or operation requests, like REGISTER_BROKER.*/ private int defaultThreadPoolNums = 16; /*** Indicates the capacity of queue to hold client requests.*/ private int clientRequestThreadPoolQueueCapacity = 50000; /*** Indicates the capacity of queue to hold broker or operation requests.*/ private int defaultThreadPoolQueueCapacity = 10000; /*** Interval of periodic scanning for non-active broker; 扫描 Broker 是否存活的间隔时间 5 秒*/ private long scanNotActiveBrokerInterval = 5 * 1000; private int unRegisterBrokerQueueCapacity = 3000; /*** Support acting master or not.** The slave can be an acting master when master node is down to support following operations:* 1. support lock/unlock message queue operation.* 2. support searchOffset, query maxOffset/minOffset operation.* 3. support query earliest msg store time.*/ private boolean supportActingMaster = false; private volatile boolean enableAllTopicList = true; private volatile boolean enableTopicList = true; private volatile boolean notifyMinBrokerIdChanged = false; /*** Is startup the controller in this name-srv*/ private boolean enableControllerInNamesrv = false; private volatile boolean needWaitForService = false; private int waitSecondsForService = 45; /*** If enable this flag, the topics that don't exist in broker registration payload will be deleted from name server.** WARNING:* 1. Enable this flag and "enableSingleTopicRegister" of broker config meanwhile to avoid losing topic route info unexpectedly.* 2. This flag does not support static topic currently.*/ private boolean deleteTopicWithBrokerRegistration = false;
再看看 NettyServerConfig 属性:
// NameServer 默认绑定地址 private String bindAddress = "0.0.0.0"; // NameServer 监昕端口,该值默认会被初始化为 9876 private int listenPort = 0; // Netty 业务线程池线程个数 private int serverWorkerThreads = 8; // Netty public 任务线程池线程个数, Netty 网络设计,根据业务类型会创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。如果该业务类型(RequestCode)未注册线程池, 则由 public线程池执行 private int serverCallbackExecutorThreads = 0; // IO 线程池线程个数,主要是 NameServer、Broker 端解析请求、返回相应的线程个数,这类线程主要是处理网络请求的,解析请求包, 然后转发到各个业务线程池完成具体的业务操作,然后将结果再返回调用方 private int serverSelectorThreads = 3; // send oneway 消息请求井发度(Broker 端参数) private int serverOnewaySemaphoreValue = 256; // 异步消息发送最大并发度(Broker 端参数) private int serverAsyncSemaphoreValue = 64; // 网络连接最大空闲时间,默认 120s。如果连接空闲时间超过该参数设置的值,连接将被关闭 private int serverChannelMaxIdleTimeSeconds = 120; // 网络 socket 发送缓存区大小, 默认 64k private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; // 网络 socket 接收缓存区大小,默认 64k private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; /*** 高水位标记,当Channel的待写入数据量达到此值时,Netty会自动关闭Channel的写操作,* 需要用户手动调用Channel的flush方法来刷新缓冲区以继续写入数据。* 这有助于防止应用程序过度缓冲数据,导致内存使用过多。*/ private int writeBufferHighWaterMark = NettySystemConfig.writeBufferHighWaterMark; /*** 低水位标记,当Channel的待写入数据量减少到此值时,Netty会自动重新打开Channel的写操作,* 允许数据再次被写入缓冲区。* 这有助于在数据量减少到一个合理水平时恢复写操作,保证数据传输的流畅。*/ private int writeBufferLowWaterMark = NettySystemConfig.writeBufferLowWaterMark; // 同时处理的连接请求的最大数量 默认1024 private int serverSocketBacklog = NettySystemConfig.socketBacklog; // ByteBuffer 是否开启缓存, 建议开启 private boolean serverPooledByteBufAllocatorEnable = true; // 是否启用 Epoll IO 模型, Linux 环境建议开启 private boolean useEpollNativeSelector = false;
然后我们看看创建以及启动 NamesrvController:
public static NamesrvController createAndStartNamesrvController() throws Exception {// 创建 NameServer 控制器实例NamesrvController controller = createNamesrvController();// 启动 NameServer控制器 start(controller);// 获取 Netty服务器配置NettyServerConfig serverConfig = controller.getNettyServerConfig();// 格式化输出String tip = String.format("The Name Server boot success. serializeType=%s, address %s:%d", RemotingCommand.getSerializeTypeConfigInThisServer(), serverConfig.getBindAddress(), serverConfig.getListenPort());// 记录启动日志 log.info(tip);// 控制台输出启动信息System.out.printf("%s%n", tip);// 返回创建并启动的 NameServer控制器实例return controller; }
格式化输出的信息,就是我们上节启动 NameServer 后输出的信息:
我们这里主要看下启动 start:
public static NamesrvController start(final NamesrvController controller) throws Exception {// 检查传入的 NamesrvController 实例是否为null,如果是,则抛出异常if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");}// 初始化 NamesrvController,如果初始化失败,则关闭控制器并退出程序boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}// 注册关闭钩子,确保在程序退出时优雅地关闭 NamesrvControllerRuntime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {controller.shutdown();return null;}));// 启动NamesrvController controller.start();// 返回启动后的 NamesrvController 实例return controller; }
可以看到启动主要做了三件事情:
(1)初始化
(2)注册关闭钩子函数
(3)启动
先看下 NamesrvController 的初始化方法 initialize:
public boolean initialize() {// 加载系统配置,这是系统运行所必需的配置信息 loadConfig();// 初始化网络组件,为网络通信做准备 initiateNetworkComponents();// 初始化线程池,用于管理和执行系统中的各种任务 initiateThreadExecutors();// 注册处理器,这些处理器将处理系统中的各种请求和任务 registerProcessor();// 启动定时服务,比如检查 Broker 的存活状态 startScheduleService();// 初始化SSL上下文,为安全通信做准备 initiateSslContext();// 初始化RPC钩子,用于在远程过程调用时执行特定操作 initiateRpcHooks();// 返回return true; }
我们这里看下启动的调度任务 startScheduleService:
private void startScheduleService() {// 定时扫描不活跃的Broker并移除 默认每隔 5秒this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);// 周期性地打印所有配置信息 默认每隔 10分钟this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically,1, 10, TimeUnit.MINUTES);// 定期打印水位信息 队列大小等信息 默认每隔 1秒this.scheduledExecutorService.scheduleAtFixedRate(() -> {try {NamesrvController.this.printWaterMark();} catch (Throwable e) {LOGGER.error("printWaterMark error.", e);}}, 10, 1, TimeUnit.SECONDS); }
最后我们看下 NamesrvController 的启动 start:
public void start() throws Exception {// 启动服务端this.remotingServer.start();// 这里不会走 因为看前边端口会被设置为 9876 if (0 == nettyServerConfig.getListenPort()) {nettyServerConfig.setListenPort(this.remotingServer.localListenPort());}this.remotingClient.updateNameServerAddressList(Collections.singletonList(NetworkUtil.getLocalAddress()+ ":" + nettyServerConfig.getListenPort()));// 启动客户端this.remotingClient.start();// 如果文件监视服务已初始化,则启动该服务// initialize 初始化的时候, initiateSslContext 初始化 ssl的时候,会初始化 fileWatchService // 监听的文件列表是 {TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath}// tls.server.certPath tls.server.keyPath tls.server.trustCertPath// 都是 ssl 认证相关的 也就是当你开启了 ssl 会监听证书的变化if (this.fileWatchService != null) {this.fileWatchService.start();}// 启动路由信息管理器this.routeInfoManager.start(); }
最后我们看下服务端的启动 this.remotingServer.start():
public void start() {// 创建一个DefaultEventExecutorGroup实例,用于处理连接和请求this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactoryImpl("NettyServerCodecThread_"));// 准备共享的处理器,这些处理器可以在多个通道中共享 prepareSharableHandlers();// 配置服务器的引导程序serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_REUSEADDR, true).childOption(ChannelOption.SO_KEEPALIVE, false).childOption(ChannelOption.TCP_NODELAY, true).localAddress(new InetSocketAddress(this.nettyServerConfig.getBindAddress(),this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {configChannel(ch);}});// 添加自定义配置,如果有的话 addCustomConfig(serverBootstrap);try {// 尝试绑定服务器到指定的端口,并等待操作完成 这里就是绑定我们的 9876端口ChannelFuture sync = serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();// 如果配置的端口为0,则使用分配的端口if (0 == nettyServerConfig.getListenPort()) {this.nettyServerConfig.setListenPort(addr.getPort());}// 记录服务器启动信息log.info("RemotingServer started, listening {}:{}", this.nettyServerConfig.getBindAddress(),this.nettyServerConfig.getListenPort());// 将服务器实例添加到服务器表中this.remotingServerTable.put(this.nettyServerConfig.getListenPort(), this);} catch (Exception e) {// 如果绑定失败,抛出一个异常throw new IllegalStateException(String.format("Failed to bind to %s:%d", nettyServerConfig.getBindAddress(),nettyServerConfig.getListenPort()), e);}// 如果存在通道事件监听器,则启动Netty事件执行器if (this.channelEventListener != null) {this.nettyEventExecutor.start();}// 创建并启动一个定时任务,定期扫描响应表TimerTask timerScanResponseTable = new TimerTask() {@Overridepublic void run(Timeout timeout) {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);} finally {timer.newTimeout(this, 1000, TimeUnit.MILLISECONDS);}}};this.timer.newTimeout(timerScanResponseTable, 1000 * 3, TimeUnit.MILLISECONDS);// 定期执行任务,打印远程代码分布scheduledExecutorService.scheduleWithFixedDelay(() -> {try {NettyRemotingServer.this.printRemotingCodeDistribution();} catch (Throwable e) {TRAFFIC_LOGGER.error("NettyRemotingServer print remoting code distribution exception", e);}}, 1, 1, TimeUnit.SECONDS); }
好啦,到这里我们的启动过程就看的差不多了,核心是 NamesrvController 控制器,看完我们最起码要知道的是,默认的端口是 9876,并且会有一个调度任务是每隔 5秒 扫描 Broker 的状态,不存活的直接移除。
public void scanNotActiveBroker() {try {// 开始扫描不活跃的 Broker log.info("start scanNotActiveBroker");// 遍历BrokerLiveTable中的每个Broker信息for (Entry<BrokerAddrInfo, BrokerLiveInfo> next : this.brokerLiveTable.entrySet()) {// 获取Broker最后一次更新时间long last = next.getValue().getLastUpdateTimestamp();// 获取Broker的心跳超时时间 默认是 120秒 DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;long timeoutMillis = next.getValue().getHeartbeatTimeoutMillis();// 判断Broker是否超时 也就是超过120秒没有更新的话 就认为不存活 则进行 destory 剔除if ((last + timeoutMillis) < System.currentTimeMillis()) {// 关闭Broker的通信通道 RemotingHelper.closeChannel(next.getValue().getChannel());// 记录Broker通道过期警告日志log.warn("The broker channel expired, {} {}ms", next.getKey(), timeoutMillis);// 调用Broker通道销毁后的处理方法this.onChannelDestroy(next.getKey());}}} catch (Exception e) {// 记录扫描不活跃Broker时遇到的异常log.error("scanNotActiveBroker exception", e);} }
2.3 NameServer 路由注册、故障剔除
NameServer 主要作用是为消息生产者和消息消费者提供关于主题Topic 的路由信息, 那么NameServer 需要存储路由的基础信息,还要能够管理Broker节点,包括路由注册、 路由删除等功能。
2.3.1 路由元信息
NameServer 路由实现类: org.apache.rocketmq.namesrv.routeinfo.RoutelnfoManager, 在了解路由注册之前,我们首先看一下 NameServer 到底存储哪些信息。
// Broker 默认超时时间 120秒 private final static long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; // Topic 消息队列路由信息,消息发送时根据路由表进行负载均衡 private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable; // Broker 基础信息, 包含 brokerName、 所属集群名称、 主备 Broker地址 private final Map<String/* brokerName */, BrokerData> brokerAddrTable; // Broker 集群信息,存储集群中所有 Broker 名称 private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; // Broker 状态信息。 NameServer 每次收到心跳包时会替换该信息 private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; // Broker 上的 FilterServer 列表,用于类模式消息过滤 private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; // Topic 的队列信息映射 private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable; private final BatchUnregistrationService unRegisterService; // 所属配置 private final NamesrvController namesrvController; private final NamesrvConfig namesrvConfig;
RocketMQ 基于订阅发布机制, 一个Topic 拥有多个消息队列,一个Broker为每一主题默认创建4个读队列4个写队列。 多个Broker组成一个集群, BrokerName 由相同的多台 Broker 组成Master-Slave 架构, brokerId 为 0 代表 Master, 大于 0 表示 Slave。 BrokerLivelnfo 中 的 lastUpdateTimestamp 存储上次收到 Broker 心跳包的时间
QueueData、 BrokerData、 BrokerLiveinfo 类图信息图下:
比如RocketMQ2 主 2 从部署图如下:
对应运行时数据结构如下:
2.3.2 路由注册
RocketMQ 路由注册是通过 Broker 与 NameServer 的心跳功能实现的。 Broker启动时向集群中所有的NameServer发送心跳语句,每隔 30s 向集群中所有NameServer发送心跳包, NameServer 收到 Broker 心跳包时会更新 brokerLiveTable 缓存中 BrokerLivelnfo 的 l astUpdateTimestamp ,然后 NameServer 每隔 5s 扫描 brokerLiveTable,如果连续 120s 没有收到心跳包, NameServer将移除该Broker 的路由信息同时关闭 Socket连接。
2.3.2.1 Broker 发送心跳包
Broker 发送心跳包的核心代码如下:
// BrokerController#start public void start() throws Exception {...scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {@Overridepublic void run0() {try {if (System.currentTimeMillis() < shouldStartTime) {BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);return;}if (isIsolated) {BrokerController.LOG.info("Skip register for broker is isolated");return;}// 注册BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {BrokerController.LOG.error("registerBrokerAll Exception", e);}}// 延迟10秒启动 // brokerConfig.getRegisterNameServerPeriod() 默认 30秒 private int registerNameServerPeriod = 1000 * 30; // 也就是每隔 30秒执行一次}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));... }
最后落点是在 BrokerOuterAPI#registerBrokerAll:
// BrokerOuterAPI#registerBrokerAll public List<RegisterBrokerResult> registerBrokerAll(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final boolean oneway,final int timeoutMills,final boolean enableActingMaster,final boolean compressed,final Long heartbeatTimeoutMillis,final BrokerIdentity brokerIdentity) {final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();// 获取所有的 NameServer 信息List<String> nameServerAddressList = this.remotingClient.getAvailableNameSrvList();if (nameServerAddressList != null && nameServerAddressList.size() > 0) {final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);requestHeader.setHaServerAddr(haServerAddr);requestHeader.setEnableActingMaster(enableActingMaster);requestHeader.setCompressed(false);if (heartbeatTimeoutMillis != null) {requestHeader.setHeartbeatTimeoutMillis(heartbeatTimeoutMillis);}RegisterBrokerBody requestBody = new RegisterBrokerBody();requestBody.setTopicConfigSerializeWrapper(TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper));requestBody.setFilterServerList(filterServerList);final byte[] body = requestBody.encode(compressed);final int bodyCrc32 = UtilAll.crc32(body);requestHeader.setBodyCrc32(bodyCrc32);// 计数器锁final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());// 扔到线程池中注册for (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(new AbstractBrokerRunnable(brokerIdentity) {@Overridepublic void run0() {try {RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);if (result != null) {registerBrokerResultList.add(result);}LOGGER.info("Registering current broker to name server completed. TargetHost={}", namesrvAddr);} catch (Exception e) {LOGGER.error("Failed to register current broker to name server. TargetHost={}", namesrvAddr, e);} finally {countDownLatch.countDown();}}});}try {if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) {LOGGER.warn("Registration to one or more name servers does NOT complete within deadline. Timeout threshold: {}ms", timeoutMills);}} catch (InterruptedException ignore) {}}return registerBrokerResultList; }
该方法主要是遍历NameServer列表, Broker 消息服务器依次向 NameServer发送心跳包。
发送心跳包具体逻辑,首先封装请求包头(Header):
brokerAddr: broker 地址
broker Id: brokerld,O:Master:,大于 0: Slave
brokerName: broker 名称
clusterName: 集群名称
haServerAddr: master 地址,初次请求时该值为空, slave 向 Nameserver 注册后返回
requestBody:filterServerList。 消息过滤服务器列表;topicConfigWrapper。 主题配置, topicConfigWrapper 内部封装的是 TopicConfigManager 中的 topicConfigTable,内部存储的是 Broker启动时默认的一些 Topic, MixAll. SELF_TEST_ TOPIC 、 MixAll.DEFAULT_TOPIC ( AutoCreateTopic Enable=true )., MixAll.BENCHMARK_TOPIC 、 MixAll.OFFSET_MOVED_EVENT、 BrokerConfig#brokerClusterName、 BrokerConfig#brokerName。 Broker 中 Topic 默认存储在${Rocket_Home}/store/confg/topic. json 中。
RocketMQ 网络传输基于 Netty, 具体网络实现细节本书不会过细去剖析,在这里介绍 一下网络跟踪方法: 每一个请求, RocketMQ 都会定义一个RequestCode,然后在服务端会 对应相应的网络处理器(processor包中), 只需整库搜索 RequestCode 即可找到相应的处理 逻辑。
2.3.2.2 NameServer 处理心跳包
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor 网络处理器解析请求类 型, 如果请求类型为RequestCode.REGISTER_BROKER,则请求最终转发到 RoutelnfoManager#registerBroker。
public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final String zoneName,final Long timeoutMillis,final Boolean enableActingMaster,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {// 加锁 这个是不是有点问题 lock 是不是应该写在 try 上边?this.lock.writeLock().lockInterruptibly();//init or update the cluster infoSet<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>());brokerNames.add(brokerName);boolean registerFirst = false;// 获取当前的 broker 信息BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null == brokerData) {registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());// 更新this.brokerAddrTable.put(brokerName, brokerData);}// 具体的主从的逻辑 我就没看了哈} catch (Exception e) {log.error("registerBroker Exception", e);} finally {this.lock.writeLock().unlock();}return result; }
路由注册需要加写锁,防止并发修改RoutelnfoManager 中的路由表。 首先判断 Broker 所属集群是否存在, 如果不存在则创建,然后将broker名加入到集群Broker集合中。
BrokerLivelnfo,存活 Broker 信息表, BrokeLivelnfo 是执行路由删除的重要依据。
2.3.3 路由删除
上面看到Broker每隔 30s 向 NameServer发送一个心跳包,心跳包中包含 Broker Id 、 Broker 地址、 Broker 名称、 Broker 所属集群名称、 Broker 关联的 FilterServer 列表。 但是如果Broker若机, NameServer无法收到心跳包,此时NameServer如何来剔除这些失效的Broker 呢? Name Server 会每隔 5s 扫描 brokerLiveTable 状态表,如果 BrokerLive 的 lastUpdateTimestamp 的时间戳距当前时间超过 120s,则认为 Broker失效,移除该 Broker, 关闭与Broker连接,并同时更新topicQueueTable、 brokerAddrTable、 brokerLive Table、 filterServerTable。
RocktMQ 有两个触发点来触发路由删除:
(1)NameServer 定时扫描 brokerLiveTable 检测上次心跳包与当前系统时间的时间差, 如果时间戳大于 120s,则需要移除该Broker信息 这个我们上边看过了。
(2)Broker 在正常被关闭的情况下,会执行unrRgisterBroker指令。
由于不管是何种方式触发的路由删除,路由删除的方法都是一样的,就是从topic QueueTable、 brokerAddrTable、 brokerLiveTable、 filterServerTable 删除与该 Broker 相关的信息。
2.3.4 路由发现
RocketMQ 路由发现是非实时的,当 Topic路由 出现变化后, NameServer不主动推送给客户端, 而 是由客户端定时拉取主题最新的路由。 根据主题名 称拉取路由信息的命令编码为: GET_ROUTEINTO_BY_TOPIC。RocketMQ 路由结果如图 2-6 所示。
orderTopicConf :顺序消息配置内容,来自于 kvConfig
List<QueueData> queueDatas: topic 队列元数据
List<BrokerData> brokerDatas : topic 分布的 broker 元数据
HashMap<String/* brokerAddress*/ ,List<String>/*filterServer*/>filterServerTable: broker 上过滤服务器地址列表
NameServer 路由发现实现类:ClientRequestProcessor#getRoutelnfoByTopic
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {// 创建一个响应命令对象,用于后续填充响应信息final RemotingCommand response = RemotingCommand.createResponseCommand(null);// 解析请求命令的自定义头,获取特定的请求参数final GetRouteInfoRequestHeader requestHeader =(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);// 检查名称服务器是否准备就绪boolean namesrvReady = needCheckNamesrvReady.get() && System.currentTimeMillis() - startupTimeMillis >= TimeUnit.SECONDS.toMillis(namesrvController.getNamesrvConfig().getWaitSecondsForService());// 如果名称服务器未准备就绪且配置了等待服务,则返回错误响应if (namesrvController.getNamesrvConfig().isNeedWaitForService() && !namesrvReady) {log.warn("name server not ready. request code {} ", request.getCode());response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("name server not ready");return response;}// 从路由信息管理器中获取指定主题的路由数据TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());// 如果找到了主题的路由数据,则进行后续处理if (topicRouteData != null) {...// 根据请求版本和是否只接受标准JSON来决定如何序列化主题路由数据byte[] content;Boolean standardJsonOnly = requestHeader.getAcceptStandardJsonOnly();if (request.getVersion() >= MQVersion.Version.V4_9_4.ordinal() || null != standardJsonOnly && standardJsonOnly) {content = topicRouteData.encode(SerializerFeature.BrowserCompatible,SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField,SerializerFeature.MapSortField);} else {content = topicRouteData.encode();}// 填充响应命令的主体,设置成功响应码,并返回响应 response.setBody(content);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}// 如果没有找到主题的路由信息,则返回错误响应 response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));return response; }
主要就是调用 RouterlnfoManager 的方法,从路由表 topicQueueTable、 brokerAddrTable、 filterServerTable 中分别填充 TopicRouteData 中的 List<QueueData>、 List<BrokerData>和 filterServer 地址表。如果找到主题对应的路由信息并且该主题为顺序消息,则从NameServer KVconfig 中获取关于顺序消息相关的配置填充路由信息。如果找不到路由信息CODE则使用TOPIC NOT_EXISTS,表示没有找到对应的路由。
3 小结
本章主要介绍了NameServer路由功能,包含路由元数据、路由注册与发现机制。 为了 加强对本章的理解,路由发现机制,大致关系如下:
还有我们起码要知道的是路由元信息是存放在 NamesrvConntroller 核心控制器里的 RouteInfoManager类里的几个集合中,Broker路由注册是BrokerOutApi里的 registerBroker 方法,路由心跳是相互的。NamesrvController 里的定时任务每隔5秒看看 Broker列表里是否都存活以及Broker启动的时候启动定时任务每隔 30秒 更新一下自己的元信息保持存活。发送消息的时候通过 ClientRequestProcessor里的getRoutelnfoByTopic方法获取某个 Topic 的路由信息,知道这些关键类的关系哈,有理解不对的地方还请指正哈。