全网最细RocketMQ源码一:NameSrv

一、入口

在这里插入图片描述
NameServer的启动源码在NameStartup,现在开始debug之旅

二、createNamesrcController

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));//PackageConflictDetect.detectFastjson();Options options = ServerUtil.buildCommandlineOptions(new Options());// 启动时的参数信息 有commandLine 管理了。commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());if (null == commandLine) {System.exit(-1);return null;}// namesrv 配置。final NamesrvConfig namesrvConfig = new NamesrvConfig();// netty 服务器配置。final NettyServerConfig nettyServerConfig = new NettyServerConfig();// namesrv服务器 监听端口 修改为9876nettyServerConfig.setListenPort(9876);if (commandLine.hasOption('c')) {// 读取 -c 选项的值String file = commandLine.getOptionValue('c');if (file != null) {// 读取 config 文件数据 到 properties 内InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);// 如果 config 配置文件 内的配置 涉及到 namesrvConfig 或者 nettyServerConfig 的字段,那么进行复写。MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);// 将读取的 配置文件 路径 保存 到 字段。namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);MixAll.printObjectProperties(console, namesrvConfig);MixAll.printObjectProperties(console, nettyServerConfig);System.exit(0);}// 将启动时 命令行 设置的kv 复写到 namesrvConfig内。MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}// 创建日志对象。LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);// 创建 控制器// 参数1:namesrvConfig// 参数2:网络层配置 nettyServerConfigfinal NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);return controller;}

主要做了几件事:

  1. 创建了NamesrvConfig
  2. 创建了nettyServerConfig
  3. 创建了NamesrvController

NamesrvController详解:

public class NamesrvController {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);private final NamesrvConfig namesrvConfig;private final NettyServerConfig nettyServerConfig;// 调度线程池,执行定时任务,两件事:1. 检查存活的broker状态  2. 打印配置private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("NSScheduledThread"));// 管理kv配置。private final KVConfigManager kvConfigManager;// 管理路由信息的对象,重要。private final RouteInfoManager routeInfoManager;// 网络层封装对象,重要。private RemotingServer remotingServer;// ChannelEventListener ,用于监听channel 状态,当channel状态 发生改变时 close idle... 会向 事件队列发起事件,事件最终由 该service处理。private BrokerHousekeepingService brokerHousekeepingService;// 业务线程池,netty 线程 主要任务是 解析报文 将 报文 解析成 RemotingCommand 对象,然后 就将 该对象 交给 业务 线程池 再继续处理。private ExecutorService remotingExecutor;private Configuration configuration;private FileWatchService fileWatchService;// 参数1:namesrvConfig// 参数2:网络层配置 nettyServerConfigpublic NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {this.namesrvConfig = namesrvConfig;this.nettyServerConfig = nettyServerConfig;this.kvConfigManager = new KVConfigManager(this);this.routeInfoManager = new RouteInfoManager();this.brokerHousekeepingService = new BrokerHousekeepingService(this);this.configuration = new Configuration(log,this.namesrvConfig, this.nettyServerConfig);this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");}}

start(NamesrvController)

  public static NamesrvController start(final NamesrvController controller) throws Exception {if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");}// 初始化方法..boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}// JVM HOOK ,平滑关闭的逻辑。 当JVM 被关闭时,主动调用 controller.shutdown() 方法,让服务器平滑关机。Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {@Overridepublic Void call() throws Exception {controller.shutdown();return null;}}));// 启动服务器。controller.start();return controller;}

主要做了几件事:

  1. controller初始化
   public boolean initialize() {// 加载本地kv配置this.kvConfigManager.load();// 创建网络服务器对象this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);// 创建业务线程池,默认线程数 8this.remotingExecutor =Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));// 注册协议处理器(缺省协议处理器)this.registerProcessor();// 定时任务1:每10秒钟检查 broker 存活状态,将idle状态的 broker 移除。this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);// 定时任务2:每10分钟 打印一遍 kv 配置。this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.kvConfigManager.printAllPeriodically();}}, 1, 10, TimeUnit.MINUTES);}
  1. controller启动
    在这里插入图片描述
    会调用到NettyRemotingServer.start方法
 public void start() {// 当向channel pipeline 添加 handler 时 指定了 group 时,网络事件传播到 当前handler时,事件处理 由 分配给 handler 的线程执行。this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());}});// 创建共享的 处理器 handlerprepareSharableHandlers();ServerBootstrap childHandler =// 配置服务端 启动对象// 配置工作组 boss 和 worker 组this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)// 设置服务端 ServerSocketChannel类型.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)// 设置服务端ch选项.option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, false)// 客户端ch选项.childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()).childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())// 设置服务器端口.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))//.childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {// 初始化 客户端ch pipeline 的逻辑ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}});if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {// 客户端开启 内存池,使用的内存池是  PooledByteBufAllocator.DEFAULTchildHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);}try {// 服务器 绑定端口。ChannelFuture sync = this.serverBootstrap.bind().sync();InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();// 将服务器成功绑定的端口号 赋值给 字段 port。this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);}// housekeepingService 不为空,则创建 网络异常事件 处理器if (this.channelEventListener != null) {this.nettyEventExecutor.start();}// 提交定时任务,每一秒 执行一次。// 扫描 responseTable 表,将过期的 responseFuture 移除。this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {NettyRemotingServer.this.scanResponseTable();} catch (Throwable e) {log.error("scanResponseTable exception", e);}}}, 1000 * 3, 1000);}


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/340418.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Github全球第一的免费waf防火墙雷池社区版的语义分析检测算法

传统规则防护&#xff0c;在当下为什么失灵&#xff1f; 当下&#xff0c;Web 应用防火墙大多采用规则匹配方式来识别和阻断攻击流量&#xff0c;但由于 Web 攻击成本低、方式复杂多样、高危漏洞不定期爆发等原因&#xff0c;管理者们在安全运维工作中不得不持续调整防护规则&a…

[C#]winform部署PaddleOCRV3推理模型

【官方框架地址】 https://github.com/PaddlePaddle/PaddleOCR.git 【算法介绍】 PaddleOCR是由百度公司推出的一款开源光学字符识别&#xff08;OCR&#xff09;工具&#xff0c;它基于深度学习框架PaddlePaddle开发。这款工具提供了一整套端到端的文字检测和识别解决方案&a…

企业必知的加速FTP传输解决方案

FTP是一种用于在网络上进行文件传输的协议&#xff0c;广泛应用于文件共享、数据备份、远程访问等场景。然而&#xff0c;随着数据量的增加和网络环境的复杂化&#xff0c;FTP传输面临着速度慢、安全性低、稳定性差、网络拥塞等问题&#xff0c;这些问题严重影响了企业的工作效…

为什么基于树的模型在表格数据任务中比深度学习更优?

论文 | Why do tree-based models still outperform deep learning on tabular data? 代码 | https://github.com/LeoGrin/tabular-benchmark 虽然深度学习在计算机视觉、自然语言处理等领域取得了显著的成果&#xff0c;但在处理表格数据任务方面&#xff0c;深度学习模型的…

Fenwick Tree——树状数组

问题陈述&#xff1a; 你得到一个长度为 N 的数组为 a0,a1,a2……an-1。处理以下类型的查询&#xff0c;一共有 Q 次查询。 0 p x : ap⬅ap x 1 l r : 打印 ai ( il 到 ir-1 的 ai 之和) 约束&#xff1a; 1 ≤ N,Q ≤ 500000 0 ≤ ai,x ≤ 1e9 0 ≤ p < N 0 ≤ li <…

YOLOv8-Seg改进:轻量化改进 | 超越RepVGG!浙大阿里提出OREPA:在线卷积重参数化

🚀🚀🚀本文改进:OREPA在线卷积重参数化巧妙的和YOLOV8结合,并实现轻量化 🚀🚀🚀YOLOv8-seg创新专栏:http://t.csdnimg.cn/KLSdv 学姐带你学习YOLOv8,从入门到创新,轻轻松松搞定科研; 1)手把手教你如何训练YOLOv8-seg; 2)模型创新,提升分割性能; 3)独家…

解决:TypeError: ‘dict_keys’ object does not support indexing

解决&#xff1a;TypeError: ‘dict_keys’ object does not support indexing 文章目录 解决&#xff1a;TypeError: dict_keys object does not support indexing背景报错问题报错翻译报错位置代码报错原因解决方法方法一&#xff1a;方法二&#xff1a;方法三&#xff1a;今…

2023年度产品评选!人人都是产品经理携手boardmix博思白板联合呈现!

榜单内容概览 2023年度产品评选活动&#xff0c;由人人都是产品经理发起&#xff0c;汇聚了众多引领行业风向的优秀产品&#xff0c;涵盖技术创新、数字化服务、AI效率、运营增长等多领域。这些杰出的产品经过多轮专业评委的严格评审与用户投票的热烈参与&#xff0c;最终脱颖…

IntelliJ IDEA Java 连接 mysql 配置(附完整 demo)

下载 MySQL 驱动 从MySQL官网下载JDBC驱动的步骤如下&#xff1a; 1&#xff09;访问MySQL的官方网站&#xff1a;MySQL 2&#xff09;点击页面上方的"DOWNLOADS"菜单&#xff1b; 3&#xff09;在下载页面&#xff0c;找到"MySQL Community (GPL) Downloads…

uniapp 设置底部导航栏

uniapp 设置原生 tabBar 底部导航栏。 设置底部导航栏 一、创建页面&#xff0c;一定要在 pages.json 文件中注册。 二、在 pages.json 文件中&#xff0c;设置 tabBar 配置项。 pages.json 页面 {"pages": [...],"globalStyle": {...},"uniIdRout…

RT-Thread 中断管理

中断管理 什么是中断&#xff1f;简单的解释就是系统正在处理某一个正常事件&#xff0c;忽然被另一个需要马上处理的紧急事件打断&#xff0c;系统转而处理这个紧急事件&#xff0c;待处理完毕&#xff0c;再恢复运行刚才被打断的事件。 生活中&#xff0c;我们经常会遇到这…

【操作系统】优化MBR程序:让MBR调用显存吧

一.显存、显卡以及显示器的概述 显卡用于连接CPU和显示器&#xff0c;我们调用显示器时&#xff0c;其实就是利用显卡提供的IO接口间接地对显示器进行操作&#xff0c;所以显卡也称之为显示适配器。接下来我们将优化之前写的MBR程序&#xff08;参考&#xff1a;【操作系统】BI…