33.RocketMQ之Broker启动源码


highlight: arduino-light

Broker启动流程:BrokerStartup#main

java  public static void main(String[] args) {        //手动指定了 nameServer        start(createBrokerController(args));   }

java public static BrokerController start(BrokerController controller) {        try {            //启动brokerController            controller.start();            return controller;       } catch (Throwable e) {            e.printStackTrace();            System.exit(-1);       }        return null;   }

1.创建brokercontroller

```java public static BrokerController createBrokerController(String[] args) {            System.setProperty(RemotingCommand.REMOTINGVERSIONKEY,                           Integer.toString(MQVersion.CURRENTVERSION)); ​        if (null == System.getProperty         (NettySystemConfig.COMROCKETMQREMOTINGSOCKETSNDBUFSIZE)){            NettySystemConfig.socketSndbufSize = 131072;       } ​        if (null == System.getProperty         (NettySystemConfig.COMROCKETMQREMOTINGSOCKETRCVBUFSIZE)){            NettySystemConfig.socketRcvbufSize = 131072;       } ​        try {            //PackageConflictDetect.detectFastjson();            Options options = ServerUtil.buildCommandlineOptions(new Options());            commandLine = ServerUtil               .parseCmdLine("mqbroker",                                          args,                                          buildCommandlineOptions(options),                new PosixParser());            if (null == commandLine) {                System.exit(-1);           } ​            final BrokerConfig brokerConfig = new BrokerConfig();            final NettyServerConfig nettyServerConfig = new NettyServerConfig();            final NettyClientConfig nettyClientConfig = new NettyClientConfig(); ​            nettyClientConfig.setUseTLS               (Boolean.parseBoolean(System.getProperty(TLSENABLE,                String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));            nettyServerConfig.setListenPort(10911);            final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); ​            if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {                int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;                messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);           }

           //-c 解析配置文件填充配置类            if (commandLine.hasOption('c')) {                      String file = commandLine.getOptionValue('c');                if (file != null) {                    configFile = file;                    InputStream in = new BufferedInputStream                       (new FileInputStream(file));                    properties = new Properties();                    properties.load(in); ​                    properties2SystemEnv(properties);                    MixAll.properties2Object(properties, brokerConfig);                    MixAll.properties2Object(properties, nettyServerConfig);                    MixAll.properties2Object(properties, nettyClientConfig);                    MixAll.properties2Object(properties, messageStoreConfig); ​                    BrokerPathConfigHelper.setBrokerConfigPath(file);                    in.close();               }           } ​            MixAll.properties2Object               (ServerUtil.commandLine2Properties(commandLine), brokerConfig); //获取Broker的安装地址            if (null == brokerConfig.getRocketmqHome()) {                System.exit(-2);           } //获取配置中的nameServer的地址            String namesrvAddr = brokerConfig.getNamesrvAddr();            if (null != namesrvAddr) {                try {                    String[] addrArray = namesrvAddr.split(";");                    for (String addr : addrArray) {                        RemotingUtil.string2SocketAddress(addr);                   }               } catch (Exception e) {                    System.out.printf(                    System.exit(-3);               }           } //判断broker的主从角色            switch (messageStoreConfig.getBrokerRole()) {                case ASYNCMASTER:                case SYNCMASTER:                    //MixAll.MASTERID = 0 代表是master                    brokerConfig.setBrokerId(MixAll.MASTERID);                    break;                case SLAVE:                    //SLAVEID不能小于等于0                    if (brokerConfig.getBrokerId() <= 0) {                        System.out.printf("Slave's brokerId must be > 0");                        System.exit(-3);                   }                    break;                default:                    break;           } ​            messageStoreConfig.setHaListenPort               (nettyServerConfig.getListenPort() + 1);            LoggerContext lc = (LoggerContext)               LoggerFactory.getILoggerFactory();            JoranConfigurator configurator = new JoranConfigurator();            configurator.setContext(lc);            lc.reset();            configurator.doConfigure               (brokerConfig.getRocketmqHome() + "/conf/logbackbroker.xml"); ​            if (commandLine.hasOption('p')) {                InternalLogger console = InternalLoggerFactory                   .getLogger(LoggerName.BROKERCONSOLENAME);                MixAll.printObjectProperties(console, brokerConfig);                MixAll.printObjectProperties(console, nettyServerConfig);                MixAll.printObjectProperties(console, nettyClientConfig);                MixAll.printObjectProperties(console, messageStoreConfig);                System.exit(0);           } else if (commandLine.hasOption('m')) {                InternalLogger console = InternalLoggerFactory               .getLogger(LoggerName.BROKERCONSOLENAME);                MixAll.printObjectProperties(console, brokerConfig, true);                MixAll.printObjectProperties(console, nettyServerConfig, true);                MixAll.printObjectProperties(console, nettyClientConfig, true);                MixAll.printObjectProperties(console, messageStoreConfig, true);                System.exit(0);           } ​            log = InternalLoggerFactory               .getLogger(LoggerName.BROKERLOGGER_NAME);            MixAll.printObjectProperties(log, brokerConfig);            MixAll.printObjectProperties(log, nettyServerConfig);            MixAll.printObjectProperties(log, nettyClientConfig);            MixAll.printObjectProperties(log, messageStoreConfig); ​            final BrokerController controller = new BrokerController(                brokerConfig,                nettyServerConfig,                nettyClientConfig,                messageStoreConfig);                        controller.getConfiguration().registerConfig(properties); //初始化brokerController            boolean initResult = controller.initialize();            if (!initResult) {                controller.shutdown();                System.exit(-3);           } ​            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {                private volatile boolean hasShutdown = false;                private AtomicInteger shutdownTimes = new AtomicInteger(0); ​                @Override                public void run() {                    synchronized (this) {                        log.info("Shutdown hook was invoked, {}");                        if (!this.hasShutdown) {                            this.hasShutdown = true;                            long beginTime = System.currentTimeMillis();                            controller.shutdown();                            long consumingTimeTotal =                                System.currentTimeMillis() - beginTime;                            log.info                               ("Shutdown hook over, consuming total time(ms): {}");                       }                   }               }           }, "ShutdownHook"));            return controller;       } catch (Throwable e) {            e.printStackTrace();            System.exit(-1);       }        return null;   } ```

```java public BrokerController( final BrokerConfig brokerConfig, final NettyServerConfig nettyServerConfig, final NettyClientConfig nettyClientConfig, final MessageStoreConfig messageStoreConfig) {

this.brokerConfig = brokerConfig;this.nettyServerConfig = nettyServerConfig;this.nettyClientConfig = nettyClientConfig;this.messageStoreConfig = messageStoreConfig;this.consumerOffsetManager = new ConsumerOffsetManager(this);this.topicConfigManager = new TopicConfigManager(this);//处理拉取消息请求的线程this.pullMessageProcessor = new PullMessageProcessor(this);//挂住拉取消息请求的线程this.pullRequestHoldService = new PullRequestHoldService(this);//监听器 有消息来了就可以通过 pullRequestHoldService 返回响应结果this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);this.consumerFilterManager = new ConsumerFilterManager(this);this.producerManager = new ProducerManager();this.clientHousekeepingService = new ClientHousekeepingService(this);this.broker2Client = new Broker2Client(this);this.subscriptionGroupManager = new SubscriptionGroupManager(this);this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);this.filterServerManager = new FilterServerManager(this);this.slaveSynchronize = new SlaveSynchronize(this);this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());//心跳this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));this.brokerFastFailure = new BrokerFastFailure(this);this.configuration = new Configuration(log,BrokerPathConfigHelper.getBrokerConfigPath(),this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig);
}

```

创建配置类

初始化配置主要任务是根据 properties 文件以及命令行参数值,创建了以下配置类:

•nettyServerConfig:封装了作为消息队列服务器的配置信息

•nettyClientConfig:封装了作为NameServer客户端配置信息

•brokerConfig:封装了 Broker 配置信息

•messageStoreConfig:封装了 RocketMQ 存储系统的配置信息

2.初始化Controller

```java public boolean initialize() throws CloneNotSupportedException { boolean result = this.topicConfigManager.load(); /* 主题配置加载 这一步主要是加载 topics.json 文件 并解析生成 TopicConfigSerializerWrapper 对象 并 set 进 topicConfigTable 中。 / result = result && this.consumerOffsetManager.load(); / 消费者订阅组加载: 这一步主要是加载 consumerOffset.json 文件 并解析生成 ConsumerOffsetManager 对象,并替换 offsetTable 成员值。 / result = result && this.subscriptionGroupManager.load(); / 消费者过滤管理加载: 这一步主要是加载 consumerFilter.json 文件,并解析生成 ConsumerFilterManager 对象 */ result = result && this.consumerFilterManager.load();

if (result) {try {/***messageStore 消息存储初始化:这一步主要是创建了 DefaultMessageStore 对象这是 Broker 消息寸处的核心实现创建该对象时也会启动很多相关服务线程,用于管理 store 的存储。***/this.messageStore =new DefaultMessageStore(this.messageStoreConfig,       this.brokerStatsManager,this.messageArrivingListener,this.brokerConfig);if (messageStoreConfig.isEnableDLegerCommitLog()) {DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);((DLedgerCommitLog)((DefaultMessageStore)messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);}this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);//load pluginMessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager,                                        messageArrivingListener, brokerConfig);this.messageStore = MessageStoreFactory.build(context, this.messageStore);this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));} catch (IOException e) {result = false;log.error("Failed to initialize", e);}}/***messageStore加载1.延迟消息加载:加载 delayOffset.json 文件解析生成DelayOffsetSerializerWrapper,并加入offsetTable中2.commitLog加载MappfileQueue映射文件队列加载,加载定义的storePath目录文件3.consumeQueue加载***/result = result && this.messageStore.load();if (result) {/***创建nettyRemotingServer:根据前面初始化好的nettyConfig创建远程通讯服务根据brokerConfig初始化各种线程池:1.初始化发送消息线程池2.初始化拉取消息线程池3.初始化broker管理线程池4.初始化client管理线程池5.初始化消费者管理线程池把这些线程池注册到nettyRemotingServer中***/this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);NettyServerConfig fastConfig = (NettyServerConfig)  this.nettyServerConfig.clone();fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);//发消息线程池this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(//Math.min(Runtime.getRuntime().availableProcessors(), 4);//默认是核心数和4 取最小的那个this.brokerConfig.getSendMessageThreadPoolNums(),this.brokerConfig.getSendMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.sendThreadPoolQueue,new ThreadFactoryImpl("SendMessageThread_"));//拉消息线程池this.pullMessageExecutor = //16 + Runtime.getRuntime().availableProcessors() * 2;new BrokerFixedThreadPoolExecutor(                          this.brokerConfig.getPullMessageThreadPoolNums(),               this.brokerConfig.getPullMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.pullThreadPoolQueue,new ThreadFactoryImpl("PullMessageThread_"));//查询消息线程池this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getQueryMessageThreadPoolNums(),this.brokerConfig.getQueryMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.queryThreadPoolQueue,new ThreadFactoryImpl("QueryMessageThread_"));//broker线程池this.adminBrokerExecutor =Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl("AdminBrokerThread_"));//管理客户端this.clientManageExecutor = new ThreadPoolExecutor(                                 this.brokerConfig.getClientManageThreadPoolNums(),                               this.brokerConfig.getClientManageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.clientManagerThreadPoolQueue,new ThreadFactoryImpl("ClientManageThread_"));//心跳this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getHeartbeatThreadPoolNums(),this.brokerConfig.getHeartbeatThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.heartbeatThreadPoolQueue,new ThreadFactoryImpl("HeartbeatThread_", true));//事务消息线程池this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getEndTransactionThreadPoolNums(),this.brokerConfig.getEndTransactionThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.endTransactionThreadPoolQueue,new ThreadFactoryImpl("EndTransactionThread_"));this.consumerManageExecutor = Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl("ConsumerManageThread_"));this.registerProcessor();final long initialDelay = UtilAll.computNextMorningTimeMillis()- System.currentTimeMillis();final long period = 1000 * 60 * 60 * 24;//开启定时记录 Broker 的状态(消息拉取时间总和、消息发送总和等)this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.getBrokerStats().record();} catch (Throwable e) {log.error("schedule record error.", e);}}}, initialDelay, period, TimeUnit.MILLISECONDS);//每5s持久化一次消息消费进度到consumerOffse.json文件 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerOffsetManager.persist();} catch (Throwable e) {log.error("schedule persist consumerOffset error.", e);}}}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(),TimeUnit.MILLISECONDS);//消息过滤持久化,定时向 consumerFilter.json 文件写入消费者过滤器信息this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.consumerFilterManager.persist();} catch (Throwable e) {log.error("schedule persist consumer filter error.", e);}}}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);//定时禁用消费慢的消费者以保护 Broker//可以设置 disableConsumeIfConsumerReadSlowly 属性,默认 falsethis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.protectBroker();} catch (Throwable e) {log.error("protectBroker error.", e);}}}, 3, 3, TimeUnit.MINUTES);//定时打印 Send、Pull、Query、Transaction 信息this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printWaterMark();} catch (Throwable e) {log.error("printWaterMark error.", e);}}}, 10, 1, TimeUnit.SECONDS);//定时打印已存储在提交日志中但尚未调度到消费队列的字节数this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {log.info("dispatch behind commit log {} bytes");} catch (Throwable e) {log.error("schedule dispatchBehindBytes error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);//定时获取 namserver 地址if (this.brokerConfig.getNamesrvAddr() != null) {this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.brokerOuterAPI.fetchNameServerAddr();} catch (Throwable e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}if (!messageStoreConfig.isEnableDLegerCommitLog()) {//如果是从服务器://定时从主服务器获取 TopicConfig、ConsumerOffset、DelayOffset、//SubscriptionGroupConfig 等信息if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6){this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());this.updateMasterHAServerAddrPeriodically = false;} else {this.updateMasterHAServerAddrPeriodically = true;}//如果是主服务器:定时打印从服务器落后的字节数} else {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {BrokerController.this.printMasterAndSlaveDiff();} catch (Throwable e) {log.error("schedule printMasterAndSlaveDiff error.", e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);}}if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {// Register a listener to reload SslContexttry {fileWatchService = new FileWatchService(new String[] {TlsSystemConfig.tlsServerCertPath,TlsSystemConfig.tlsServerKeyPath,TlsSystemConfig.tlsServerTrustCertPath},new FileWatchService.Listener() {boolean certChanged, keyChanged = false;@Overridepublic void onChanged(String path) {if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {log.info("The trust certificate changed," +"reload the sslcontext"); reloadServerSslContext();}if (path.equals(TlsSystemConfig.tlsServerCertPath)) {certChanged = true;}if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {keyChanged = true;}if (certChanged && keyChanged) {log.info("The certificate and private key changed," +"reload the ssl context");certChanged = keyChanged = false;reloadServerSslContext();}}private void reloadServerSslContext() {((NettyRemotingServer) remotingServer).loadSslContext();((NettyRemotingServer) fastRemotingServer).loadSslContext();}});} catch (Exception e) {log.warn("FileWatchService created error, " +"can't load the certificate dynamically");}}initialTransaction();initialAcl();initialRpcHooks();}return result;
}

```

创建加载消息存储服务

messageStore

创建NettyRemotingServer

初始化定时任务

在线程池注册完后,就会开启各种定时任务

3.启动brokercontroller

```java public void start() throws Exception { /* messageStore启动会启动启动各类线程服务: 1)启动刷盘任务线程 2)启动commitLog线程
3)启动存储存储统计服务线程storeStateService
4)启动延迟定时消息服务线程 5)启动消息分发到各种Consumer queue服务线程reputMessageService
6)启动HA主从同步线程
/ if (this.messageStore != null) { this.messageStore.start(); } / 启动netty服务: remotingServer启动:启动远程通讯服务 / if (this.remotingServer != null) { this.remotingServer.start(); } / 启动netty服务: fastRemotingServer启动:启动远程通讯服务 */ if (this.fastRemotingServer != null) { this.fastRemotingServer.start(); }

if (this.fileWatchService != null) {this.fileWatchService.start();}/***启动netty服务:broker对外API启动:启动client远程通讯服务***/if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();}/***pullRequestHolderService使拉取消息保持长轮询任务启动***/if (this.pullRequestHoldService != null) {this.pullRequestHoldService.start();}/***ClientHouseKeepingService线程定时清除不活动链接任务启动***/if (this.clientHousekeepingService != null) {this.clientHousekeepingService.start();}/***过滤服务器任务启动***/if (this.filterServerManager != null) {this.filterServerManager.start();}if (!messageStoreConfig.isEnableDLegerCommitLog()) {startProcessorByHa(messageStoreConfig.getBrokerRole());handleSlaveSynchronize(messageStoreConfig.getBrokerRole());}/***注册Broker信息到所有的NameServer 这里的all指的是NameServer***/this.registerBrokerAll(true, false, true);/***每隔30s上报Broker信息到NameServer***/this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {//进入registerBrokerAllBrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}            /***第一次延迟10秒启动 因为在前面已经调用了一次registerBrokerAllprivate int registerNameServerPeriod = 1000 * 30;先和10秒取大,再和60秒取小 发现最后得到的结果就是30这个虽然可以配置 但是rocket已经限制了这个间隔的最大值和最小值 在 10 - 60秒之间***/}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager != null) {this.brokerStatsManager.start();}if (this.brokerFastFailure != null) {this.brokerFastFailure.start();}
}

```

启动brokercontroller

1.路由注册

Broker发送心跳包

image.png

RocketMQ路由注册是通过Broker与NameServer的心跳功能实现的。Broker启动时向集群中所有的NameServer发送心跳信息,Broker每隔30s向集群中所有NameServer发送心跳包也就是4次,NameServer收到心跳包时会更新brokerLiveTable缓存中BrokerLiveInfo的lastUpdataTimeStamp信息,然后NameServer每隔10s扫描brokerLiveTable,如果连续120S没有收到心跳包,NameServer将移除Broker的路由信息同时关闭Socket连接。

代码:BrokerController#start

java public void start() throws Exception { //忽略一大堆校验 //注册Broker信息 this.registerBrokerAll(true, false, true); //每隔30s上报Broker信息到NameServer this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { //进入registerBrokerAll //false 发送的不是单向消息 BrokerController.this .registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } //第一次延迟10秒 //private int registerNameServerPeriod = 1000 * 30; //30秒是根据和10秒取大,60秒取小比较获得到的 //仔细想的话 rocket已经限制了这个间隔的最大值和最小值 是不是? }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); }

BrokerController#registerBrokerAll

这里的all指的是所有的nameServer,也就是需要往所有的NS上报自己的信息。

```java /* topicQueueTable:Topic消息队列路由信息,消息发送时根据路由表进行负载均衡

brokerAddrTable:Broker基础信息,包括brokerName、所属集群名称、主备Broker地址

clusterAddrTable:Broker集群信息,存储集群中所有Broker名称

brokerLiveTable:Broker状态信息,NameServer每次收到心跳包是会替换该信息

filterServerTable:Broker上的FilterServer列表,用于类模式消息过滤。

*/

//false 发送的不是单向消息 public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { ConcurrentHashMap topicConfigTable = new ConcurrentHashMap (); for (TopicConfig topicConfig :
topicConfigWrapper.getTopicConfigTable().values()) { TopicConfig tmp =new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), this.brokerConfig.getBrokerPermission());

topicConfigTable.put(topicConfig.getTopicName(), tmp);}topicConfigWrapper.setTopicConfigTable(topicConfigTable);}if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.brokerConfig.getRegisterBrokerTimeoutMills())) {//doRegisterBrokerAlldoRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);}
}

```

BrokerOuterAPI#doRegisterBrokerAll

```java private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway, TopicConfigSerializeWrapper topicConfigWrapper) { //获得nameServer地址信息 nameServer地址信息是从broker的config文件加载得到的 //this.brokerOuterAPI.updateNameServerAddressList // (this.brokerConfig.getNamesrvAddr()); List nameServerAddressList = this.remotingClient.getNameServerAddressList(); //遍历所有nameserver列表 if (nameServerAddressList != null && nameServerAddressList.size() > 0) {

//封装请求头
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
//封装请求体
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());//for循环遍历所有的nameServer地址
for (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(new Runnable() {@Overridepublic void run() {try {//向NameServer注册RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);if (result != null) {registerBrokerResultList.add(result);}log.info("register broker to nameServer {} OK");} catch (Exception e) {log.warn("registerBroker Exception");} finally {countDownLatch.countDown();}}});
}try {countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}

} ```

代码:BrokerOutAPI#registerBroker

java if (oneway) {//oneway=false try { //oneway值为false,表示单向通信,Broker不关心NameServer的返回,也不会触发任何回调函数。 // true 表示双向通信,Broker关心NameServer的返回,会触发回调函数。 this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills); } catch (RemotingTooMuchRequestException e) { // Ignore } return null; } //实际是走的这里 RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);

2.NameServer处理心跳包,更新路由信息

image.png

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor网路处理类解析请求类型,如果请求类型是为REGISTER_BROKER,则将请求转发到RouteInfoManager#regiesterBroker

DefaultRequestProcessor#processRequest

java //判断是注册Broker信息 case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { //注册Broker信息 return this.registerBroker(ctx, request); }

DefaultRequestProcessor#registerBroker

java RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker( requestHeader.getClusterName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerName(), requestHeader.getBrokerId(), requestHeader.getHaServerAddr(), topicConfigWrapper, null, ctx.channel() );

代码:RouteInfoManager#registerBroker

维护路由信息

java //加锁 this.lock.writeLock().lockInterruptibly(); //维护clusterAddrTable Set<String> brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet<String>(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName);

java //维护brokerAddrTable BrokerData brokerData = this.brokerAddrTable.get(brokerName); //第一次注册,则创建brokerData if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>()); this.brokerAddrTable.put(brokerName, brokerData); } //非第一次注册,更新Broker Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> item = it.next(); if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) { it.remove(); } } String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr);

java //维护topicQueueTable if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } }

代码:RouteInfoManager#createAndUpdateQueueData

image.png

java private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) { //创建QueueData QueueData就是topicQueueTable对应的List中的元素 QueueData queueData = new QueueData(); queueData.setBrokerName(brokerName); queueData.setWriteQueueNums(topicConfig.getWriteQueueNums()); queueData.setReadQueueNums(topicConfig.getReadQueueNums()); queueData.setPerm(topicConfig.getPerm()); queueData.setTopicSynFlag(topicConfig.getTopicSysFlag()); //获得topicQueueTable中队列集合 List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName()); //topicQueueTable为空,则直接添加queueData到队列集合 if (null == queueDataList) { queueDataList = new LinkedList<QueueData>(); queueDataList.add(queueData); this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList); log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData); } else { //判断是否是新的队列 boolean addNewOne = true; Iterator<QueueData> it = queueDataList.iterator(); while (it.hasNext()) { QueueData qd = it.next(); //如果brokerName相同,代表不是新的队列 if (qd.getBrokerName().equals(brokerName)) { if (qd.equals(queueData)) { addNewOne = false; } else { log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd, queueData); it.remove(); } } } //如果是新的队列,则添加队列到queueDataList if (addNewOne) { queueDataList.add(queueData); } } }

java //维护brokerLiveTable BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr));

```java //维护filterServerList if (filterServerList != null) { if (filterServerList.isEmpty()) { this.filterServerTable.remove(brokerAddr); } else { this.filterServerTable.put(brokerAddr, filterServerList); } }

if (MixAll.MASTERID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTERID); if (masterAddr != null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } ```

3.NameServer 路由删除

Broker每隔30s向NameServer发送一个心跳包,心跳包包含BrokerId,Broker地址,Broker名称,Broker所属集群名称、Broker关联的FilterServer列表。但是如果Broker宕机,NameServer无法收到心跳包,此时NameServer如何来剔除这些失效的Broker呢?NameServer会每隔10s扫描brokerLiveTable状态表,如果BrokerLive的lastUpdateTimestamp的时间戳距当前时间超过120s,则认为Broker失效,移除该Broker,关闭与Broker连接,同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。

RocketMQ有两个触发点来删除路由信息

NameServer定期扫描

  • NameServer定期扫描brokerLiveTable检测上次心跳包与当前系统的时间差,如果时间超过120s,则需要移除broker。

Broker正常关闭

  • Broker在正常关闭的情况下,会执行unregisterBroker指令

这两种方式路由删除的方法都是一样的,就是从相关路由表中删除与该broker相关的信息。

image.png

代码:NamesrvController#initialize

```java //每隔10s扫描一次为活跃Broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}

}, 5, 10, TimeUnit.SECONDS); ```

代码:RouteInfoManager#scanNotActiveBroker

java public void scanNotActiveBroker() { //获得brokerLiveTable Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); //遍历brokerLiveTable while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); //如果收到心跳包的时间距当时时间是否超过120s if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { //关闭连接 RemotingUtil.closeChannel(next.getValue().getChannel()); //移除broker it.remove(); //维护路由表 this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); } } }

代码:RouteInfoManager#onChannelDestroy

java //申请写锁,根据brokerAddress从brokerLiveTable和filterServerTable移除 this.lock.writeLock().lockInterruptibly(); this.brokerLiveTable.remove(brokerAddrFound); this.filterServerTable.remove(brokerAddrFound);

java //维护brokerAddrTable String brokerNameFound = null; boolean removeBrokerName = false; Iterator<Entry<String, BrokerData>> itBrokerAddrTable =this.brokerAddrTable.entrySet().iterator(); //遍历brokerAddrTable while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) { BrokerData brokerData = itBrokerAddrTable.next().getValue(); //遍历broker地址 Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> entry = it.next(); Long brokerId = entry.getKey(); String brokerAddr = entry.getValue(); //根据broker地址移除brokerAddr if (brokerAddr.equals(brokerAddrFound)) { brokerNameFound = brokerData.getBrokerName(); it.remove(); log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed", brokerId, brokerAddr); break; } } //如果当前主题只包含待移除的broker,则移除该topic if (brokerData.getBrokerAddrs().isEmpty()) { removeBrokerName = true; itBrokerAddrTable.remove(); log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName()); } }

```java //维护clusterAddrTable if (brokerNameFound != null && removeBrokerName) { Iterator >> it = this.clusterAddrTable.entrySet().iterator(); //遍历clusterAddrTable while (it.hasNext()) { Entry > entry = it.next(); //获得集群名称 String clusterName = entry.getKey(); //获得集群中brokerName集合 Set brokerNames = entry.getValue(); //从brokerNames中移除brokerNameFound boolean removed = brokerNames.remove(brokerNameFound); if (removed) { log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed", brokerNameFound, clusterName);

if (brokerNames.isEmpty()) {log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",clusterName);//如果集群中不包含任何broker,则移除该集群it.remove();}break;}
}

} ```

java //维护topicQueueTable队列 if (removeBrokerName) {    //遍历topicQueueTable    Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =        this.topicQueueTable.entrySet().iterator();    while (itTopicQueueTable.hasNext()) {        Entry<String, List<QueueData>> entry = itTopicQueueTable.next();        //主题名称        String topic = entry.getKey();        //队列集合        List<QueueData> queueDataList = entry.getValue(); //遍历该主题队列        Iterator<QueueData> itQueueData = queueDataList.iterator();        while (itQueueData.hasNext()) {            //从队列中移除为活跃broker信息            QueueData queueData = itQueueData.next();            if (queueData.getBrokerName().equals(brokerNameFound)) {                itQueueData.remove();                log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",                    topic, queueData);           }       } //如果该topic的队列为空,则移除该topic        if (queueDataList.isEmpty()) {            itTopicQueueTable.remove();            log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",                topic);       }   } }

java //释放写锁 finally {    this.lock.writeLock().unlock(); }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/14457.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机网络 - http协议 与 https协议(2)

前言 本篇介绍了构造http请求的的五种方式&#xff0c;简单的使用postman构造http请求&#xff0c;进一步了解https, 学习https的加密过程&#xff0c;了解对称密钥与非对称密钥对于加密是如何进行的&#xff0c;如有错误&#xff0c;请在评论区指正&#xff0c;让我们一起交流…

数据结构算法题——链表

leetcode-2.两数之和 leetcode-2.两数之和 给你两个 非空 的链表&#xff0c;表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的&#xff0c;并且每个节点只能存储 一位 数字。 请你将两个数相加&#xff0c;并以相同形式返回一个表示和的链表。 你可以假设除了数…

Linux调优–I/O 调度器

Linux 的 I/O 调度器是一个以块式 I/O 访问存储卷的进程&#xff0c;有时也叫磁盘调度器。Linux I/O 调度器的工作机制是控制块设备的请求队列&#xff1a;确定队列中哪些 I/O 的优先级更高以及何时下发 I/O 到块设备&#xff0c;以此来减少磁盘寻道时间&#xff0c;从而提高系…

【PCIE】hot-reset和link disable

Hot reset 规则 如果上游伪端口&#xff08;Pseudo Port&#xff09;的任何一个通道连续接收到两个带有热复位位设置为1b、禁用链路位和回环位设置为0b的TS1有序集合&#xff0c;并且两个伪端口上的任何一个通道&#xff08;接收到TS1有序集合&#xff09;要么收到EIOS&#xf…

通俗易懂生成对抗网络GAN原理(二)

生成对抗网络&#xff08;Generative Adversarial Network, GAN&#xff09;的原理 学习李宏毅机器学习课程总结。 前面学习了GAN的直观的介绍&#xff0c;现在学习GAN的基本理论。现在我们来学习GAN背后的理论。 引言 假设x是一张图片&#xff08;一个高维向量&#xff09;…

DevOps系列文章 之 SnakeYAML解析与序列化YAML

1、简述 如何使用SnakeYAML库将YAML文档转换为Java对象&#xff0c;以及JAVA对象如何序列化为YAML文档。 在DevOps平台系统中是基础的能力支持&#xff0c;不管是spring boot 的配置还是K8S 资源清单yaml 2、项目设置 要在项目中使用SnakeYAML&#xff0c;需要添加Maven依赖…

百度网盘删除“我的应用数据”文件夹

方法一&#xff1a;电脑端 工具链接&#xff0c; BaiduPCS-Go-3.6.8-windows-86.zip - 蓝奏云 电脑端下载解压运行&#xff0c;弹出浏览器窗口和命令行&#xff0c;在浏览器中输入百度网盘账号密码&#xff0c;登录。 之后会需要输入验证码&#xff0c;之后使用手机号或者邮…

恒生电子探路金融大模型

‍数据智能产业创新服务媒体 ——聚焦数智 改变商业 近日&#xff0c;恒生电子和旗下子公司恒生聚源正式发布基于大语言模型技术打造的数智金融新品&#xff1a;金融智能助手光子和全新升级的智能投研平台WarrenQ。此外&#xff0c;恒生电子金融行业大模型LightGPT也首次对外亮…

Ctfshow web入门 nodejs篇 web334-web344

CTFshow NodeJs web334 前言&#xff1a;做原型链污染&#xff0c;入门的话个人建议先看看P神的文章&#xff0c;只看前四部分就行了。 深入理解 JavaScript Prototype 污染攻击 | 离别歌 (leavesongs.com) 然后也得有一点js基础&#xff0c;不用太多&#xff0c;要不然看起…

web安全php基础_搭建php环境

首先打开phpstudy的网站栏点击创建网站&#xff0c;新建一个网站&#xff08;域名随便输反正是局域网&#xff09;然后点击确认 如下&#xff0c;网站便创建好了 打开浏览器输入刚刚创建网站时输入的域名&#xff0c;即可看见我们的网站 然后网站好了&#xff0c;就可以新建项…

Vue和React的区别?

目录 共同点 1. 数据驱动视图 2. 组件化 3. Virtual DOM 不同点 1. 核心思想不同 2. 组件写法差异 3. diff算法不同 4. 响应式原理不同 5. 其他不同点 首先找到 Vue 和 React 的共性&#xff0c;它们被用于解决什么问题&#xff0c; 然后再挖掘各自独特的个性、设计原…

基于WebSocket的简易聊天室的基本实现梳理

一&#xff0c;前言 目前在很多网站为了实现推送技术所用的技术都是 Ajax 轮询。轮询是在特定的的时间间隔&#xff08;如每1秒&#xff09;&#xff0c;由浏览器对服务器发出HTTP请求&#xff0c;然后由服务器返回最新的数据给客户端的浏览器。HTTP 协议是一种无状态的、无连…