Nacos 配置中心源码 | 京东物流技术团队

客户端

入口

在引入配置中心 maven 依赖的 jar 文件中找到 spring-cloud-starter-alibaba-nacos-config-2.2.5.RELEASE.jar!/META-INF/spring.factories,在该配置文件找到 NacosConfigBootstrapConfiguration 配置类,该类是 nacos 配置中心的入口类,类中注册了三个 bean。

NacosConfigProperties:属性配置类,对应配置文件中 spring.cloud.nacos.config 前缀的属性。

NacosConfigManager:管理 NacosConfigProperties 和 ConfigService。

NacosPropertySourceLocator:加载配置中心配置信息。

NacosConfigManager

在 NacosConfigManager 构造方法中,调用了 createConfigService 方法,该方法通过工厂类调用 ConfigService 实现类的构造方法创建 ConfigService 实例。

在 ConfigService 的实现类 NacosConfigService 的构造方法中会初始化 this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));,该 agent 是用来像服务端发送请求的代理。

ServerHttpAgent 类中 NacosRestTemplate 属性是发送远程调用的工具类,会调用 HttpMethod.GET 方法调用服务端 rest 请求。

在回到 NacosConfigService#NacosConfigService 的方法中 this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties); 该属性是客户端工作线程类,在类的内部有两个线程池:

1. 只有一个线程的线程池 this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() 用来执行定时任务,每隔 10ms 执行一次 checkConfigInfo(); 方法,按照每 3000 个配置项为一批次捞取待轮询的 cacheData 实例,将其包装成为一个 LongPollingTask 提交进入第二个线程池 executorService 处理。

2.线程数等于处理器个数的线程池,用来执行 ClientWorker.LongPollingRunnable#LongPollingRunnable#run,cacheMap 中缓存着需要刷新的配置,将 cacheMap 中数量以 3000 分一个组,分别创建一个 LongPollingRunnable 用来监听配置更新,在 LongPollingRunnable#run 方法中调用checkLocalConfig(cacheData); 检查本地的配置,容错的处理;调用 checkUpdateDataIds(cacheDatas, inInitializingCacheList); 方法是向 nacos 服务端 发送一个长连接超时事件30s,返回有更新的dataids;调用 getServerConfig(dataId, group, tenant, 3000L); 方法是根据返回有变化的dataids调用服务端配置中心接口获取配置属性,并更新本地快照;调用checkListenerMd5();方式,对有变化的配置添加监听处理;最后继续调用executorService.execute(this); 方法轮询处理。

CacheData#checkListenerMd5

在listener.receiveConfigInfo(contentTmp); 方法中会调用到 AbstractSharedListener#receiveConfigInfo 方法,会发布 RefreshEvent 事件。

对应的事件监听器为:RefreshEventListener, Spring Cloud 实现的,在该监听器里更新配置和刷新容器中标记了 @RefreshScope 的配置,在 onApplicationEvent 方法中监听2个事件,ApplicationReadyEvent(spring boot 事件,表示 application 应该初始化完成)、RefreshEvent。

RefreshEvent:this.handle((RefreshEvent)event);处理该事件,用来刷新容器中标记了 @RefreshScope注解的配置,org.springframework.cloud.context.refresh.ContextRefresher#refresh

refreshEnvironment();中 extract(this.context.getEnvironment().getPropertySources()) 抽取除系统变量外的其他变量;addConfigFilesToEnvironment();把原有的 environment里面的参数放到一个新建的 spring context 容器下重新加载,完事之后关闭新容器,这里就是获取参数的新值;

changes(before,extract(this.context.getEnvironment().getPropertySources())) 获取新的参数值,并和之前得进行比较找出改变得参数值。

this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys)); 发布环境变更事件,并带上改变得参数值。

回到 ContextRefresher#refresh 方法,看下 this.scope.refreshAll(); 刷新标记@RefreshScope注解的 bean。

super.destroy(); 方法,清楚 scope 里面的缓存,下次就会重新从 BeanFactory 获取一个新的实例会使用新的配置。

this.context.publishEvent(new RefreshScopeRefreshedEvent()); 方法发布事件。

服务端

DumpService

DumpService 类是一个抽象类负责从存储中查询配置保存到磁盘上,它有两个子类,EmbeddedDumpService嵌入式存储(DERBY)、ExternalDumpService扩展数存储。

ExternalDumpService 实现类的 init 方法上 @PostConstruct 注解,在 spring 构建 bean 的过程中会执行带有 @PostConstruct 的初始化方法。

调用到抽象父类 DumpService#dumpOperate 的方法,调用到 dumpConfigInfo 方法,dumpConfigInfo 方法会判断是全量更新,还是追加更新。

如果 isAllDump 为true 会走全量更新,会进行判断是否有快速更新配置、是否存在心跳检查文件、最后检查时间是否小于 6小时,上述判断都满足就不走全量更新,否则走全量更新。

dumpAllProcessor.process(new DumpAllTask());将数据库中的所有 configInfo 配置信息查询出来,写入服务器端磁盘缓存。

persistService.findConfigMaxId(); 查询数据库中最大的主键,用于分页处理。

persistService.findAllConfigInfoFragment(lastMaxId, PAGE_SIZE); 从数据库中分页查询数据,每次查询1000条。

ConfigCacheService.dump(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified(),cf.getType()); 写入磁盘

保存到文件中

updateMd5(groupKey, md5, lastModifiedTs); 缓存配置信息的 MD5 到内存中,并发布 LocalDataChangeEvent 事件。

事件监听器会在 NotifyCenter.registerSubscriber 调用。

获取配置

HttpMethod.GET /nacos/v1/cs/configs 获取服务端配置接口,ConfigController#getConfig。

在getConfig 中调用了 inner.doGetConfig(request, response, dataId, group, tenant, tag, clientIp);

在 doGetConfig 方法中会调用 DiskUtil.targetBetaFile(dataId, group, tenant);方法,从本地磁盘上获取,不是从 mysql 中拉取,如果直接修改 mysql数据不会生效的,需要发布 ConfigDataChangeEvent 事件,触发更新。

监听配置

HttpMethod.POST 请求调用 /nacos/v1/cs/configs/listener 轮询接口调用长连接。

longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize); 长连接轮询处理。

SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); 最多处理29.5s 需要保留0.5s来响应客户端,避免超时。

MD5Util.compareMd5(req, rsp, clientMd5Map); 比较客户端的 md5 与当前服务端的是否一致,不一致返回到 changedGroups。

有不一致数据直接响应 generateResponse(req, rsp, changedGroups);

线程池执行长连接任务 ConfigExecutor.executeLongPolling。

LongPollingService.ClientLongPolling#run 长轮询。

ConfigExecutor.scheduleLongPolling 延迟 29.5s 执行

延迟执行先删除队列中自己的任务 allSubs.remove(ClientLongPolling.this);

allSubs.add(this); 添加到队列

inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());

MD5Util.compareMd5(request, response, clientMd5Map); 和当前配置比较,返回有变更的配置

nacos 管理端变更配置

HttpMethod.POST /nacos/v1/cs/configs

persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true); 持节化信息到数据库。

回到 ConfigController#publishConfig 看下 ConfigChangePublisher.notifyConfigChange 方法,触发 ConfigDataChangeEvent 事件。

ConfigDataChangeEvent 事件监听。

ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue)); 同步其他节点。

还有 LongPollingService 初始化的时候订阅了 LocalDataChangeEvent 事件,也会监听到。

ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));

看下 LongPollingService.DataChangeTask#run,push 模式, 遍历 allSubs 把变化的 key 响应客户端。clientSub.sendResponse(Arrays.asList(groupKey));

作者:京东物流 张士欣

来源:京东云开发者社区 自猿其说Tech 转载请注明来源

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/265453.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux 14网站架构 编译安装mysql数据库

目录 LNMP网站架构下载源码包mysql 下载位置 mysql 安装1.1、清理安装环境:1.2、创建mysql用户1.3、从官网下载tar包1.4、安装编译工具1.5、解压1.6、编译安装编译安装三部曲1.7、初始化初始化,只需要初始化一次1.8、启动mysql1.9、登录mysql1.10、systemctl启动方式…

Kafka事务是怎么实现的?Kafka事务消息原理详解

目录 一、Kafka事务性消息1.1 介绍Kafka事务性消息1.2 事务性消息的应用场景1.3 Kafka事务性消息的优势 二、Kafka事务性消息的使用2.1 配置Kafka以支持事务性消息生产者配置消费者配置 2.2 生产者:发送事务性消息创建Kafka生产者开始事务发送消息提交或中止事务 2.…

CESM笔记——component活动状态+compset前缀解析+B1850,BHIST区别

时隔一年没写CSDN笔记了,一些CESM的知识点我都快忘了。诶,主要是在国外办公室的网屏蔽了好多国内的网络,CSDN登不上,回家又不想干活。。。好吧,好多借口。。。 昨天师弟问我一些问题,想想要不可以水一篇小…

31条PCB设计布线技巧:

大家在做PCB设计时,都会发现布线这个环节必不可少,而且布线的合理性,也决定了PCB的美观度和其生产成本的高低,同时还能体现出电路性能和散热性能的好坏,以及是否可以让器件的性能达到最优等。 本篇内容,将…

向量数据库:AIGC时代的必备基础工具

今天分享的AIGC系列深度研究报告:《向量数据库:AIGC时代的必备基础工具》。 (报告出品方:广发证券) 报告共计:47页 点击添加图片描述(最多60个字)编辑 一、向量数据库为 AI 大模型…

jmeter 压测需要的部分配置

修改jmeter 目录的bin目录下的jmeter.properties文件 解除KeepAlive设置 修改接口的高级中的实现和超时 解除httpclient4.retrycount前的注释符并将0修改为1 即修改为:httpclient4.retrycount1 解除httpclient4.idletimeout前的注释符并修改为合适间隔 即修改为…

【离散数学】——期末刷题题库(等价关系和偏序关系)

🎃个人专栏: 🐬 算法设计与分析:算法设计与分析_IT闫的博客-CSDN博客 🐳Java基础:Java基础_IT闫的博客-CSDN博客 🐋c语言:c语言_IT闫的博客-CSDN博客 🐟MySQL&#xff1a…

Vue.js With no Tool-Chain

准备 https://vuejs.org/guide/quick-start.html 代码编辑器:VSCode 浏览器开发工具(用于调试):Vue.js devtools rkun1LAPTOP-TUS5FU0D MINGW64 /c/code $ mkdir VUE-STARTrkun1LAPTOP-TUS5FU0D MINGW64 /c/code $ cd VUE-STAR…

记录 | ubuntu上安装fzf

在 ubuntu 上采用命令行安装 fzf 的方式行不通 指的是采用下面的方式行不通: sudo apt install fzf # 行不通 sudo snap install fzf --classic # 行不通正确的安装方式是: ● 到 fzf 的 git 仓库:https://github.com/junegunn/fzf/re…

字符串String

字符串字面量 let s "hello";变量s属于字符串字面量,它们属于硬编码进程序的字符串值,属于不可变的类型。但并不是所有字符串的值都能够在编写代码时确定。 String类型 String类型会在堆上分配到自己需要的存储空间,所以它能够…

Java项目-瑞吉外卖Day3

填充公共字段: 目的:由于某些属性,例如createdTime这些需要填充的字段会在多个地方出现,所以考虑使用公共字段自动填充的办法减少重复代码。 在对应属性上加入TableField注解。通过fill字段表明策略,是插入/更新的时候…

numpy数据读取保存及速度测试

目录 数据保存及读取 速度比对测试 数据保存及读取 代码示例: # 导入必要的库 import numpy as np # 生成测试数据 arr_disk np.arange(8) # 打印生成能的数据 print(arr_disk) # numpy保存数据到本地 np.save("arr_disk", arr_disk) # 加载本地数据…