RocketMQ源码 Broker-SubscriptionGroupManager 订阅组管理组件源码分析

前言

SubscriptionGroupManager 继承了ConfigManager配置管理组件,拥有将内存数据持久化到磁盘文件subscriptionGroup.json的能力。它主要负责维护所有消费组在内存中的订阅数据。


源码版本:4.9.3

源码架构图

核心数据结构

主要的数据结构比较简单,维护了Map<消费组名称, 订阅组配置>的映射关系。

// 订阅组管理组件
public class SubscriptionGroupManager extends ConfigManager {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);// Map<消费组名称,订阅组配置>private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);// 内存数据版本号private final DataVersion dataVersion = new DataVersion();
}

深入看下SubscriptionGroupConfig 的数据结构。

public class SubscriptionGroupConfig {// 消费组名称private String groupName;// 是否开启消费private boolean consumeEnable = true;// 是否允许消费最早消息private boolean consumeFromMinEnable = true;// 是否允许广播消费private boolean consumeBroadcastEnable = true;// 重试队列数private int retryQueueNums = 1;// 重试最大次数private int retryMaxTimes = 16;// brokerIdprivate long brokerId = MixAll.MASTER_ID;// 当产生慢消费时,选择第几个brokerprivate long whichBrokerWhenConsumeSlowly = 1;// 是否通知消费者ids变化private boolean notifyConsumerIdsChangedEnable = true;
}

核心数据行为

数据行为主要都是对上面提到的数据结构的维护,代码 + 注释如下:

// 订阅组管理组件
public class SubscriptionGroupManager extends ConfigManager {public SubscriptionGroupManager() {this.init();}public SubscriptionGroupManager(BrokerController brokerController) {this.brokerController = brokerController;this.init();}private void init() {{// 初始化系统消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig);}{// 初始化过滤服务消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig);}{// 初始化自测消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig);}{// 初始化http代理消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true);this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig);}{// 初始化ONS_API_PULL消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true); // 激活广播模式this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig);}{// 初始化ONS_API_PERMISSION消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true);this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig);}{// 初始化ONS_API_OWNER消费组SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);subscriptionGroupConfig.setConsumeBroadcastEnable(true);this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig);}}// 更新订阅配置,且更新内存数据版本号public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);if (old != null) {log.info("update subscription group config, old: {} new: {}", old, config);} else {log.info("create new subscription group, {}", config);}this.dataVersion.nextVersion();this.persist();}// 失效消费组public void disableConsume(final String groupName) {SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName);if (old != null) {old.setConsumeEnable(false);this.dataVersion.nextVersion();}}// 查找指定消费组的订阅配置public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);if (null == subscriptionGroupConfig) {if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(group);SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);if (null == preConfig) {log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());}this.dataVersion.nextVersion();this.persist();}}return subscriptionGroupConfig;}// 将内存数据结构编码成字符串@Overridepublic String encode() {return this.encode(false);}// 获取配置文件路径@Overridepublic String configFilePath() {return BrokerPathConfigHelper.getSubscriptionGroupPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());}// 从字符串中恢复数据,写回内存数据结构@Overridepublic void decode(String jsonString) {if (jsonString != null) {SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);if (obj != null) {this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);this.dataVersion.assignNewOne(obj.dataVersion);this.printLoadDataWhenFirstBoot(obj);}}}// 将内存数据结构编码成字符串public String encode(final boolean prettyFormat) {return RemotingSerializable.toJson(this, prettyFormat);}// 当第一次启动时,打印加载数据时的日志private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) {Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator();while (it.hasNext()) {Entry<String, SubscriptionGroupConfig> next = it.next();log.info("load exist subscription group, {}", next.getValue().toString());}}public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {return subscriptionGroupTable;}public DataVersion getDataVersion() {return dataVersion;}// 删除指定消费组的订阅配置public void deleteSubscriptionGroupConfig(final String groupName) {SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName);if (old != null) {log.info("delete subscription group OK, subscription group:{}", old);this.dataVersion.nextVersion();this.persist();} else {log.warn("delete subscription group failed, subscription groupName: {} not exist", groupName);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/279123.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微信小程序使用camera扫码获取相机权限

确保用户隐私指引已经明确使用相机功能 “mp-weixin”: "permission": {"scope.camera": {"desc": "需要使用相机功能&#xff0c;请授权"}}wx.authorize({scope: scope.camera,success(res) {console.log(res, 用户成功授权)// 用户…

基于虚拟机下的win7系统安装简记

文章目录 安装系统激活win7提示系统保留分区未分配驱动器问题使用win7 Active激活系统根据dns分配的ip地址将网络改为固定ip&#xff0c;然后关闭防火墙&#xff0c;即可完成虚拟机与宿主机互通 安装系统 在虚拟机中找到自己下载win7镜像文件&#xff0c;配置完成后一路next即…

Docker 部署 Lobe Chat 服务

拉取最新版本的 Lobe Chat 镜像&#xff1a; $ sudo docker pull lobehub/lobe-chat:latest使用以下命令来运行 Lobe Chat 容器: $ sudo docker run -d --name lobe-chat -p 10084:3210 -e OPENAI_API_KEYsk-xxxx -e OPENAI_PROXY_URLhttps://api.openai.com/v1 -e ACCESS_CO…

Github仓库远程操作——简单版

Github远程操作 github仓库简单的远程操作&#xff0c;更多复杂的功能请参考github官方文档 标题 Github远程操作添加公钥到githubGithub仓库远程操作 远程操作之前&#xff0c;先添加本地的公钥到github 添加公钥到github 创建本地ssh公私钥&#xff1a;使用powershell或者gi…

微服务项目部署

启动rabbitmq \RabbitMQ\rabbitmq_server-3.8.2\sbin 找到你的安装路径 找到\sbin路径下执行这些命令即可 rabbitmqctl status //查看当前状态 rabbitmq-plugins enable rabbitmq_management //开启Web插件 rabbitmq-server start //启动服务 rabbitmq-server stop //停止服务…

Linux的重定向

Linux中的重定向是将程序的输入流或输出流从默认的位置改变到指定的位置。可以使用特殊的符号来实现重定向操作。&#xff08;文中command代表命令&#xff09; &#xff08;1&#xff09;重定向命令列表 命令 说明 command > file …

首发卡密引流系统源码

程序特色&#xff1a; 支持个人和企业小程序广告获取卡密。 支持短视频点赞和关注获取卡密。 搭建教程&#xff1a; 环境要求&#xff1a;Nginx、MySQL 5.6、PHP 5.6 步骤&#xff1a; 将压缩包解压至网站根目录。 打开域名/install&#xff0c;按照提示填写数据库信息进行…

腾讯云Linux云服务器禁Ping设置

腾讯云Linux服务器默认是允许ping包的&#xff0c;但是在一些情况下为了安全考虑起见&#xff0c;我们都会把服务器设置为禁ping的模式。 1、首先检查Linux服务器当前是否禁ping 执行命令&#xff1a; cat /proc/sys/net/ipv4/icmp_echo_ignore_all 备注&#xff1a; 0----代…

【Vue】日期格式化(全局)

系列文章 【Vue】vue增加导航标签 本文链接&#xff1a;https://blog.csdn.net/youcheng_ge/article/details/134965353 【Vue】Element开发笔记 本文链接&#xff1a;https://blog.csdn.net/youcheng_ge/article/details/133947977 【Vue】vue&#xff0c;在Windows IIS平台…

Hudi 在 vivo 湖仓一体的落地实践

作者&#xff1a;vivo 互联网大数据团队 - Xu Yu 在增效降本的大背景下&#xff0c;vivo大数据基础团队引入Hudi组件为公司业务部门湖仓加速的场景进行赋能。主要应用在流批同源、实时链路优化及宽表拼接等业务场景。 一、Hudi 基础能力及相关概念介绍 1.1 流批同源能力 与H…

二维差分详解

前言 上一期我们分享了一维差分的使用方法&#xff0c;这一期我们将接着上期的内容带大家了解二位差分的使用方法&#xff0c;话不多说&#xff0c;LET’S GO!&#xff08;上一期链接&#xff09; 二维差分 二维差分我们可以用于对矩阵区间进行多次操作的题。 二维差分我们还…

spring boot 实现直播聊天室

spring boot 实现直播聊天室 技术方案: spring bootwebsocketrabbitmq 使用 rabbitmq 提高系统吞吐量 引入依赖 <dependencies><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.42&…