全网第一篇把Nacos配置中心服务端讲明白的

入口

getServerConfig对应:ConfigQueryRequestHandler�
image.png
getBatchServiceConfig对应:ConfigChangeBatchListenResponse�
image.png
admin对应:ConfigController
image.png

我们重点就要2个,一个是服务端如何完成客户端获取配置请求,一个是服务端更新配置,客户端如何更新, 也就是说ConfigQueryReustHandler和ConfigController

ConfigQueryRequestHandler

image.png

ConfigQueryRequestHandler.getContext

private ConfigQueryResponse getContext(ConfigQueryRequest configQueryRequest, RequestMeta meta, boolean notify)throws UnsupportedEncodingException {// todoString dataId = configQueryRequest.getDataId();String group = configQueryRequest.getGroup();String tenant = configQueryRequest.getTenant();String clientIp = meta.getClientIp();String tag = configQueryRequest.getTag();ConfigQueryResponse response = new ConfigQueryResponse();final String groupKey = GroupKey2.getKey(configQueryRequest.getDataId(), configQueryRequest.getGroup(), configQueryRequest.getTenant());String autoTag = configQueryRequest.getHeader(com.alibaba.nacos.api.common.Constants.VIPSERVER_TAG);String requestIpApp = meta.getLabels().get(CLIENT_APPNAME_HEADER);int lockResult = tryConfigReadLock(groupKey);boolean isBeta = false;boolean isSli = false;if (lockResult > 0) {//FileInputStream fis = null;try {String md5 = Constants.NULL;long lastModified = 0L;// todo 从缓存中获取CacheItem cacheItem = ConfigCacheService.getContentCache(groupKey);if (cacheItem != null) {if (cacheItem.isBeta()) {if (cacheItem.getIps4Beta().contains(clientIp)) {isBeta = true;}}String configType = cacheItem.getType();response.setContentType((null != configType) ? configType : "text");}File file = null;ConfigInfoBase configInfoBase = null;PrintWriter out = null;if (isBeta) {md5 = cacheItem.getMd54Beta();lastModified = cacheItem.getLastModifiedTs4Beta();if (PropertyUtil.isDirectRead()) {configInfoBase = persistService.findConfigInfo4Beta(dataId, group, tenant);} else {file = DiskUtil.targetBetaFile(dataId, group, tenant);}response.setBeta(true);} else {if (StringUtils.isBlank(tag)) {if (isUseTag(cacheItem, autoTag)) {if (cacheItem != null) {if (cacheItem.tagMd5 != null) {md5 = cacheItem.tagMd5.get(autoTag);}if (cacheItem.tagLastModifiedTs != null) {lastModified = cacheItem.tagLastModifiedTs.get(autoTag);}}if (PropertyUtil.isDirectRead()) {configInfoBase = persistService.findConfigInfo4Tag(dataId, group, tenant, autoTag);} else {file = DiskUtil.targetTagFile(dataId, group, tenant, autoTag);}response.setTag(URLEncoder.encode(autoTag, Constants.ENCODE));} else {md5 = cacheItem.getMd5();lastModified = cacheItem.getLastModifiedTs();// todo 是否是数据读if (PropertyUtil.isDirectRead()) {configInfoBase = persistService.findConfigInfo(dataId, group, tenant);} else {file = DiskUtil.targetFile(dataId, group, tenant);}if (configInfoBase == null && fileNotExist(file)) {// FIXME CacheItem// No longer exists. It is impossible to simply calculate the push delayed. Here, simply record it as - 1.ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1,ConfigTraceService.PULL_EVENT_NOTFOUND, -1, clientIp, false);// pullLog.info("[client-get] clientIp={}, {},// no data",// new Object[]{clientIp, groupKey});response.setErrorInfo(ConfigQueryResponse.CONFIG_NOT_FOUND, "config data not exist");return response;}}} else {if (cacheItem != null) {if (cacheItem.tagMd5 != null) {md5 = cacheItem.tagMd5.get(tag);}if (cacheItem.tagLastModifiedTs != null) {Long lm = cacheItem.tagLastModifiedTs.get(tag);if (lm != null) {lastModified = lm;}}}if (PropertyUtil.isDirectRead()) {configInfoBase = persistService.findConfigInfo4Tag(dataId, group, tenant, tag);} else {file = DiskUtil.targetTagFile(dataId, group, tenant, tag);}if (configInfoBase == null && fileNotExist(file)) {// FIXME CacheItem// No longer exists. It is impossible to simply calculate the push delayed. Here, simply record it as - 1.ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1,ConfigTraceService.PULL_EVENT_NOTFOUND, -1, clientIp, false);// pullLog.info("[client-get] clientIp={}, {},// no data",// new Object[]{clientIp, groupKey});response.setErrorInfo(ConfigQueryResponse.CONFIG_NOT_FOUND, "config data not exist");return response;}}}response.setMd5(md5);if (PropertyUtil.isDirectRead()) {response.setLastModified(lastModified);response.setContent(configInfoBase.getContent());response.setResultCode(ResponseCode.SUCCESS.getCode());} else {//read from fileString content = null;try {content = readFileContent(file);response.setContent(content);response.setLastModified(lastModified);response.setResultCode(ResponseCode.SUCCESS.getCode());} catch (IOException e) {response.setErrorInfo(ResponseCode.FAIL.getCode(), e.getMessage());return response;}}LogUtil.PULL_CHECK_LOG.warn("{}|{}|{}|{}", groupKey, clientIp, md5, TimeUtils.getCurrentTimeStr());final long delayed = System.currentTimeMillis() - lastModified;// TODO distinguish pull-get && push-get/*Otherwise, delayed cannot be used as the basis of push delay directly,because the delayed value of active get requests is very large.*/ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, lastModified,ConfigTraceService.PULL_EVENT_OK, notify ? delayed : -1, clientIp, notify);} finally {releaseConfigReadLock(groupKey);}} else if (lockResult == 0) {// FIXME CacheItem No longer exists. It is impossible to simply calculate the push delayed. Here, simply record it as - 1.ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, -1, ConfigTraceService.PULL_EVENT_NOTFOUND, -1,clientIp, notify);response.setErrorInfo(ConfigQueryResponse.CONFIG_NOT_FOUND, "config data not exist");} else {PULL_LOG.info("[client-get] clientIp={}, {}, get data during dump", clientIp, groupKey);response.setErrorInfo(ConfigQueryResponse.CONFIG_QUERY_CONFLICT,"requested file is being modified, please try later.");}return response;}

这一段代码很长,感觉写的也不是很好,很冗长,总结一下:

  1. 通过groupKey从ConfigCacheService.getContentCache(groupKey)获取cacheItem, 这里的cacheItem要等到pushConfig在看一下,具体用来做什么,只是做一些数据的暂存吗?
  2. 然后看了一下是否从数据读,如果是就从数据库读出配置,如果不是,就从文件中读取


PropertyUtil.isDirectRead

image.png

persistService.findConfigInfo

image.png

ConfigController

ConfigController.pushConfig

  @PostMapping@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,@RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,@RequestParam(value = "appName", required = false) String appName,@RequestParam(value = "src_user", required = false) String srcUser,@RequestParam(value = "config_tags", required = false) String configTags,@RequestParam(value = "desc", required = false) String desc,@RequestParam(value = "use", required = false) String use,@RequestParam(value = "effect", required = false) String effect,@RequestParam(value = "type", required = false) String type,@RequestParam(value = "schema", required = false) String schema) throws NacosException {// todo 2.2以上这里有个插件思想学一下,基本上也是spi的思想final String srcIp = RequestUtil.getRemoteIp(request);final String requestIpApp = RequestUtil.getAppName(request);srcUser = RequestUtil.getSrcUserName(request);//check typeif (!ConfigType.isValidType(type)) {type = ConfigType.getDefaultType().getType();}// check tenant//todo  参数检查ParamUtils.checkTenant(tenant);ParamUtils.checkParam(dataId, group, "datumId", content);ParamUtils.checkParam(tag);Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);MapUtil.putIfValNoNull(configAdvanceInfo, "config_tags", configTags);MapUtil.putIfValNoNull(configAdvanceInfo, "desc", desc);MapUtil.putIfValNoNull(configAdvanceInfo, "use", use);MapUtil.putIfValNoNull(configAdvanceInfo, "effect", effect);MapUtil.putIfValNoNull(configAdvanceInfo, "type", type);MapUtil.putIfValNoNull(configAdvanceInfo, "schema", schema);ParamUtils.checkParam(configAdvanceInfo);if (AggrWhitelist.isAggrDataId(dataId)) {LOGGER.warn("[aggr-conflict] {} attempt to publish single data, {}, {}", RequestUtil.getRemoteIp(request),dataId, group);throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");}final Timestamp time = TimeUtils.getCurrentTime();String betaIps = request.getHeader("betaIps");// todo // 构造配置信息ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);configInfo.setType(type);if (StringUtils.isBlank(betaIps)) {if (StringUtils.isBlank(tag)) {persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);// todo 由AsyncNotifyService处理ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));} else {persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);// todo 由AsyncNotifyService处理ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));}} else {// beta publishpersistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));}ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(),ConfigTraceService.PERSISTENCE_EVENT_PUB, content);return true;}

一共做了这么几件事:

  1. persistsService.insertOrUpdate
  2. 发送ConfigDataChangeEvent,这个事件最终由AsyncNotifyService执行到

AsyncNotifyService.onEvent

image.png
这里做了这么几件事:

  1. memberManager.allMembers():获取到所有的服务节点,我这里因为是单机启动的,所以就一台

image.png

  1. 看对应的服务是否支持长链接,如果支持就放到rpcQueue中,如果不支持,就放到httpQueue,基本到这里就知道后面要做什么,肯定又是什么线程要消费这里的queue
  2. 将rpcQueue放到AsyncRpcTask里面去,然后用一个异步线程池调用它

其中ConfigExecutor:
image.png
image.png
不往下追了,就是一个简单的定时调度线程池,反正最终也是调度到AsyncRpcTask的run方法,这个更重要

其中AsyncRpcTask:
image.png
每一个AsyncRpcTask里面有一个队列,这个队列里面存放的是NotifySingleRpcTask:就是rpc通知任务

AsyncRpcTask.run

image.png
image.png
这里主要做了几件事:

  1. 从queue中获取通知任务,如果任务目标是当前节点,就直接调用dumpService.dump
  2. 如果是其他服务节点的话,判断是否支持长链接,如果支持就发送rpc请求,如果不支持就发送http请求

DumpService.dump

image.png
比较简单,将task放到TaskManager中,然后TaskManager异步执行

TaskManager

首先DumpService里面有一个TaskManager,这个类是继承NacosDelayTaskExecuteEngine�,这个类之前是详细分析过的,就是延迟执行的,核心逻辑就是有一个线程不断的从一个map中获取任务,
image.png
如果获取到任务,就看这个taskKey有没有对应的处理器,如果没有设置,那就找一个默认的,
image.png
刚好DumpService为TaskManager设置了一个默认的Processor,位置在DumpService的构造方法上
image.png

也就说说TaskManager.processTasks -> NacosDelayTaskExecuteEngine.processTasks ->DumpProcessor�.process方法

DumpProcessor.process

image.png
会调用到DumpConfigHandler

DumpConfigHandler.configDump

image.png
最终又调用到ConfigCacheService.dump

ConfigCacheService.dump

image.png

  1. 获取写锁
  2. 创建CacheItem
  3. 非单点 非数据库存储,需要保存到磁盘
  4. updateMD5:发送本地配置变更事件, 这个事件意义非凡,得好好看看

image.png
这里比较了一下md5值,如果不一样,就发送LocalDataChangeEvent事件,我们来看看这个事件又是谁处理的?PpcConfigChangeNotifier.onEvent(LocalDataChangeEvent)

RpcConfigChangeNotifier.onEvent

image.png
image.png
这里的大致逻辑就是:

  1. 判断groupKey对应有没有监听,也就是需不需要回调,如果有,会向客户端发送消息
  2. 通过clientId拿到客户端连接,然后就是往客户端发送Rpc ConfigChangeNofifyChangeRequest

image.png

之前我们在客户端源码里面讲过,在ensureRpcClient =>initRpcClientHandler里面会注册一个客户端处理服务端->客户端的handler
image.png
当客户端接受到notifyListenConfig,会往队列丢一个信号,客户端会马上执行,从服务端拉取最新的配置

ConfigClusterRpcClientProxy�.syncConfigChange

上面从3.4到3.8都是讲了当配置更新完毕,本机如何处理,那如果是其他Server或者客户端又该怎么处理那?
入口就是这个
image.png
也是是RpcClient发送请求

通过搜索ConfigChangeClusterSyncRequest�这个请求 找到具体的处理类是ConfigChangeClusterSyncRequestHandler�

ConfigChangeClusterSyncRequestHandler.handle

其他server通过这个handler.handle方法最终也是调用dumpService方法
image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/454285.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IT行业证书的获取与价值:提升职业竞争力的关键

目录 IT行业证书的价值和作用 1. Cisco&#xff08;思科&#xff09;认证&#xff08;如CCNA、CCNP、CCIE&#xff09;&#xff1a; 2. 微软认证&#xff08;如MCSA、MCSE、MCSD&#xff09;&#xff1a; 3. 计算机网络技术&#xff08;CompTIA Network、CompTIA Security&a…

Backtrader 文档学习- Observers

Backtrader 文档学习- Observers 1.概述 在backtrader中运行的策略主要处理数据源和指标。 数据源被加载到Cerebro实例中&#xff0c;并最终成为策略的一部分&#xff08;解析和提供实例的属性&#xff09;&#xff0c;而指标则由策略本身声明和管理。 到目前为止&#xff0c…

rsync-3.1.2下载编译安装运行同步

下载 https://rsync.samba.org/ftp/rsync/src/ 解压 -解压源码包tar -xvf rsync-3.1.2.tar.gz -重命名mv rsync-3.1.2 rsync -将软件安装到指定目录下./configure --prefi/usr -编译 make - 安装 make install 安装之后启动脚本在/usr/bin/ -启动脚本 (启动之前需要配置一下…

SpringBoot注解--04--01--注解@Mapper在IDEA中自动注入警告的解决方案

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 问题原因 解决方案方法1&#xff1a;为 Autowired 注解设置required false方法2&#xff1a;用 Resource 替换 Autowired方法3&#xff1a;在Mapper接口上加上Repo…

一次Kubernetes Pod内存异常导致的测试环境耗时异常问题排查过程

概述 在使用公司内部后台系统测试环境时发现一个请求加载慢的问题&#xff0c;简简单单的列表&#xff0c;查询MongoDB数据库&#xff0c;测试环境不过几百上千条数据而已&#xff0c;请求耗时居然高达5~6秒&#xff1a; 作为对比&#xff0c;生产环境的请求响应截图如下&…

CSRF:跨站请求伪造攻击

目录 什么是CSRF&#xff1f; DVWA中的CSRF low medium hight impossible 防御CSRF 1、验证码 2、referer校验 3、cookie的Samesite属性 4、Anti-CSRF-Token 什么是CSRF&#xff1f; CSRF全称为跨站请求伪造&#xff08;Cross-site request forgery&#xff09;&…

@所有人 您需要的 幻兽帕鲁服务器搭建教程 已上线

所有人 您需要的 幻兽帕鲁服务器搭建教程 已上线 幻兽帕鲁一键购买及部署体验购买及部署购买云服务器ECS部署幻兽帕鲁 创建账户并登录Steam其他操作更新服务器修改游戏参数其他操作释放资源 一直拖到今天才来写这篇幻兽帕鲁服务器搭建教程&#xff0c;确实是因为前段时间有事耽…

2024年美赛数学建模E题思路分析 - 财产保险的可持续性

# 1 赛题 问题E&#xff1a;财产保险的可持续性 极端天气事件正成为财产所有者和保险公司面临的危机。“近年来&#xff0c;世界已经遭受了1000多起极端天气事件造成的超过1万亿美元的损失”。[1]2022年&#xff0c;保险业的自然灾害索赔人数“比30年的平均水平增加了115%”。…

淘宝镜像到期如何切换镜像及如何安装淘宝镜像

淘宝镜像到期如何切换镜像及如何安装淘宝镜像 一、淘宝镜像到期如何切换新镜像二、第一次使用淘宝镜像如何配置镜像 一、淘宝镜像到期如何切换新镜像 清空缓存&#xff1a;npm cache clean --force切换镜像源&#xff1a;npm config set registry https://registry.npmmirror.…

003集—三调数据库添加三大类字段——arcgis

在国土管理日常统计工作中经常需要用到三大类数据&#xff08;农用地、建设用地、未利用地&#xff09;&#xff0c;而三调数据库中无三大类字段&#xff0c;因此需要手工录入三大类字段&#xff0c;并根据二级地类代码录入相关三大类名称。本代码可一键录入海量三大类名称统计…

【Flink入门修炼】1-1 为什么要学习 Flink?

流处理和批处理是什么&#xff1f; 什么是 Flink&#xff1f; 为什么要学习 Flink&#xff1f; Flink 有什么特点&#xff0c;能做什么&#xff1f; 本文将为你解答以上问题。 一、批处理和流处理 早些年&#xff0c;大数据处理还主要为批处理&#xff0c;一般按天或小时定时处…

【Redis】深入理解 Redis 常用数据类型源码及底层实现(3.详解String数据结构)

【Redis】深入理解 Redis 常用数据类型源码及底层实现&#xff08;1.结构与源码概述&#xff09;-CSDN博客 【Redis】深入理解 Redis 常用数据类型源码及底层实现(2.版本区别dictEntry & redisObject详解)-CSDN博客 紧接着前两篇的总体介绍&#xff0c;从这篇开始&#x…