全网第一篇把Nacos配置中心客户端讲明白的

入口

我们依旧拿ConfigExample作为入口

public class ConfigExample {public static void main(String[] args) throws NacosException, InterruptedException {String serverAddr = "localhost";String dataId = "test";String group = "DEFAULT_GROUP";Properties properties = new Properties();properties.put("serverAddr", serverAddr);ConfigService configService = NacosFactory.createConfigService(properties);String content = configService.getConfig(dataId, group, 5000);System.out.println(content);configService.addListener(dataId, group, new Listener() {@Overridepublic void receiveConfigInfo(String configInfo) {System.out.println("receive:" + configInfo);}@Overridepublic Executor getExecutor() {return null;}});Thread.sleep(300000);boolean isPublishOk = configService.publishConfig(dataId, group, "content");System.out.println(isPublishOk);Thread.sleep(3000);content = configService.getConfig(dataId, group, 5000);System.out.println(content);//        boolean isRemoveOk = configService.removeConfig(dataId, group);
//        System.out.println(isRemoveOk);
//        Thread.sleep(3000);
//
//        content = configService.getConfig(dataId, group, 5000);
//        System.out.println(content);Thread.sleep(300000);}
}

NacosFactory.createConfigService

套路和之前差不多, 加速过
image.png
image.png
会走到NacosConfigService的构造方法里面
image.png

NacosConfigService.getConfig

image.png

   private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {group = blank2defaultGroup(group);// todo 检查参数ParamUtils.checkKeyParam(dataId, group);ConfigResponse cr = new ConfigResponse();// todo // 设置配置信息cr.setDataId(dataId);cr.setTenant(tenant);cr.setGroup(group);// use local config first// todo 这里有个失败转移的配置。如果能读到失败转移的配置信息,则直接返回了。原因的话英文注释写的很清楚了// 优先使用失败转移,设计的目的是当server挂后,又需要修改配置,就可以读本地目录String content = LocalConfigInfoProcessor.getFailover(worker.getAgentName(), dataId, group, tenant);if (content != null) {LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}",worker.getAgentName(), dataId, group, tenant, ContentUtils.truncateContent(content));cr.setContent(content);String encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);cr.setEncryptedDataKey(encryptedDataKey);configFilterChainManager.doFilter(null, cr);content = cr.getContent();return content;}try {// todo 通过客户端远程拉取配置信息ConfigResponse response = worker.getServerConfig(dataId, group, tenant, timeoutMs, false);cr.setContent(response.getContent());cr.setEncryptedDataKey(response.getEncryptedDataKey());configFilterChainManager.doFilter(null, cr);content = cr.getContent();return content;} catch (NacosException ioe) {if (NacosException.NO_RIGHT == ioe.getErrCode()) {throw ioe;}LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",worker.getAgentName(), dataId, group, tenant, ioe.toString());}LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}",worker.getAgentName(), dataId, group, tenant, ContentUtils.truncateContent(content));// todo     // 非鉴权失败的异常的,可以从本地快照中获取配置,如果有的话content = LocalConfigInfoProcessor.getSnapshot(worker.getAgentName(), dataId, group, tenant);cr.setContent(content);String encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);cr.setEncryptedDataKey(encryptedDataKey);configFilterChainManager.doFilter(null, cr);content = cr.getContent();return content;}

总结一下做了几件事:

  1. 支持故障转移从本地读取配置
  2. 正常情况下从server获取配置
  3. 非鉴权失败的异常,可以从本地快照中获取配置

ClientWorker.getServiceConfig

image.png
这里的agent会被ClientWorker里面内部类的ConfigRpcTransportClient�继承,并且重写,也就是说最终会调用到ConfigRpcTransportClient.queryConfig方法

image.png

通过GrpcSdkClient往server发送请求,获取配置

NacosConfigService.pushConfig

这个比较简单,和上面逻辑类似
最终也是调用ClientWorker.publishConfig -> agent.pushConfig(实际为ClientWorker的内部类ConfigRpcTransportClient)
image.png

NacosConfigService.addListener

image.png
image.png
image.png
这里一共做了几件事:

  1. 创建CacheData,这里有一个很重要的cacheMap.size()/ParamUtil.getPerTaskConfigSize(默认是是3000),也就说对cache进行一个分组,比如size为1/3000,2/3000,这里的taskId永远为0,后面在定时任务调度,批量往server端请求的时候会用到
  2. 往cache里面放listener
  3. 设置syncWithServer为false
  4. agent.notifyListenConfig:ConfigRpcTransportClient.notifyListenConfig 这个比较重要

image.png
它会往这个阻塞队列里面放一个Object,为什么要放呢?那肯定有地方要取,ClientWorker在启动的时候,会有一个定时任务不断从这个阻塞队列中取,如果取到就执行

ClientWorker

NacosConfigService在创建的同时会创建ClientWorker, ClientWorker其实就是它的打手😄
image.png
image.png
这个agent又是ClientWorker的打手,当调用到agent.start的时候,最终会调用到ClientWorker的内部类image.png的startInternal方法,

ClientWorker#ConfigRpcTransportClient.startInternal方法

image.png
这个方法不断在从ListenExecutebell获取,如果说一直获取不到,就超时,就进入executeConfigListen, 结合前面的notifyListenConfig其实就是给这里一个信号,触发executeConfigListen执行

ClientWorker#ConfigRpcTransportClient.executeConfigListen

 public void executeConfigListen() {// todo 存放含有listen的cacheDataMap<String, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16);// todo 存放不含邮listen的cacheDataMap<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16);long now = System.currentTimeMillis();// todo // 当前时间减去上次全量同步时间是否大于5分钟boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;for (CacheData cache : cacheMap.get().values()) {synchronized (cache) {// todo !!!!!!!这里,一般不会走这里,不要被误导了,我也是debug多次才发现//check local listeners consistent.if (cache.isSyncWithServer()) {// todo // 一致则检查md5值,若md5值和上一个不一样,则说明变动了,需要通知监听器cache.checkListenerMd5();// todo // 是否到全量同步时间了,未到则直接跳过if (!needAllSync) {continue;}}if (!CollectionUtils.isEmpty(cache.getListeners())) {// todo 如果有监听器并且缓存数据并非使用本地的,则把这些缓存数据加入到需要监听的列表listenCachesMap中//get listen  configif (!cache.isUseLocalConfigInfo()) {List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));if (cacheDatas == null) {cacheDatas = new LinkedList<CacheData>();listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);}cacheDatas.add(cache);}} else if (CollectionUtils.isEmpty(cache.getListeners())) {// todo 即删除, 放入removeListenCachesMapif (!cache.isUseLocalConfigInfo()) {List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));if (cacheDatas == null) {cacheDatas = new LinkedList<CacheData>();removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);}cacheDatas.add(cache);}}}}// todo  此时,如果需要和服务端数据同步,则listenCachesMap和removeListenCachesMap存放了本地数据,需要和服务端对比boolean hasChangedKeys = false;if (!listenCachesMap.isEmpty()) {for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {String taskId = entry.getKey();Map<String, Long> timestampMap = new HashMap<>(listenCachesMap.size() * 2);List<CacheData> listenCaches = entry.getValue();for (CacheData cacheData : listenCaches) {timestampMap.put(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant),cacheData.getLastModifiedTs().longValue());}// todo 构建新增数据的请求参数,此请求用于远程和本地对比,发现变动了会进行通知ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);// todo // 配置需要新增或更新监听数据configChangeListenRequest.setListen(true);try {// todo // 获取一个rpc的客户端RpcClient rpcClient = ensureRpcClient(taskId);ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(rpcClient, configChangeListenRequest);if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) {Set<String> changeKeys = new HashSet<String>();//handle changed keys,notify listenerif (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) {hasChangedKeys = true;for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse.getChangedConfigs()) {String changeKey = GroupKey.getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(),changeConfig.getTenant());changeKeys.add(changeKey);boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();// todo  刷新配置并通知变动refreshContentAndCheck(changeKey, !isInitializing);}}//handler content configsfor (CacheData cacheData : listenCaches) {String groupKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant());if (!changeKeys.contains(groupKey)) {//sync:cache data md5 = server md5 && cache data md5 = all listeners md5.synchronized (cacheData) {if (!cacheData.getListeners().isEmpty()) {Long previousTimesStamp = timestampMap.get(groupKey);if (previousTimesStamp != null) {if (!cacheData.getLastModifiedTs().compareAndSet(previousTimesStamp,System.currentTimeMillis())) {continue;}}// todo  缓存数据没有变动,设置为和服务器同步cacheData.setSyncWithServer(true);}}}cacheData.setInitializing(false);}}} catch (Exception e) {LOGGER.error("Async listen config change error ", e);try {Thread.sleep(50L);} catch (InterruptedException interruptedException) {//ignore}}}}// todo     // 需要删除的数据不为空if (!removeListenCachesMap.isEmpty()) {for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) {String taskId = entry.getKey();List<CacheData> removeListenCaches = entry.getValue();ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches);// todo // 配置需要删除configChangeListenRequest.setListen(false);try {// 获取rpc客户端RpcClient rpcClient = ensureRpcClient(taskId);// todo 通知服务端移除数据boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest);if (removeSuccess) {for (CacheData cacheData : removeListenCaches) {synchronized (cacheData) {// todo  // 移除缓存if (cacheData.getListeners().isEmpty()) {ClientWorker.this.removeCache(cacheData.dataId, cacheData.group, cacheData.tenant);}}}}} catch (Exception e) {LOGGER.error("async remove listen config change error ", e);}try {Thread.sleep(50L);} catch (InterruptedException interruptedException) {//ignore}}}if (needAllSync) {//todo  更新同步时间lastAllSyncTime = now;}//If has changed keys,notify re sync md5.if (hasChangedKeys) {// todo // 服务端告知了有数据变动,则需要再同步一次notifyListenConfig();}}

这段代码极其的长,现在我们来总结一一下具体做了哪些事:

  1. 设置两个监听器cacheMap,一个是带监听器的,一个是不带的

  2. 开始遍历cacheMap集合,忽略什么cache.isSyncWithServer,debug的时候走不到这里,看源码抓住核心流程,将有监听器的放到listenCachesMap(注意,这里有一个分组操作,拿到cache的taskId, 将cache的taskId和 相同taskId的cache组成一个Map:<taskId, {cacheData, cacheData}>),将没有监听器的放到removeListenCacheMap中image.png

  3. 如果listenCachesMap不为空,然后遍历listenCachesMap,

    1. 构造批量配置查询请求
    2. 获取一个RPC的客户端
    3. 发起RPC请求,查询这一个批taskId对应的cacheData发生变化了没,如果有返回值,就会走到refreshContextAndCheck 刷新配置并通知
  4. refreshContentAndCheck�:通过cacheData拿到的dataId、group、tenant 通过getServerConfig调用服务端拿到这个dataId对应的配置

ClientWorker#refreshContentAndCheck

image.png
将请求回来的content、configType、encryptedDataKey都设置到cacheData中,接下来调用cacheData.checkListenerMD5()

另外注意一下cacheData.setContent:会同时设置上md5
image.png

CacheData.checkListenerMD5

image.png
listeners(ManagerListernerWrap)就是我们刚开始创建CacheData设置上的listener上面包装了一层,在创建listener的时候,会把CacheData的content、md5、还有我们创建listener都放到里面,所以这里才会判断当前CacheData里面的md5和listener里面md5是不是一样的,如果不是,就需要通知到listener
image.png

CacheData.safeNotifyListener

 private void safeNotifyListener(final String dataId, final String group, final String content, final String type,final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {final Listener listener = listenerWrap.listener;if (listenerWrap.inNotifying) {LOGGER.warn("[{}] [notify-currentSkip] dataId={}, group={}, md5={}, listener={}, listener is not finish yet,will try next time.",name, dataId, group, md5, listener);return;}// todo 定义一个通知任务Runnable job = new Runnable() {@Overridepublic void run() {long start = System.currentTimeMillis();ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();ClassLoader appClassLoader = listener.getClass().getClassLoader();try {// todo 拓展点,像spring cloud alibaba就用到了,创建了NacosContextRefresherif (listener instanceof AbstractSharedListener) {AbstractSharedListener adapter = (AbstractSharedListener) listener;adapter.fillContext(dataId, group);LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);}// Before executing the callback, set the thread classloader to the classloader of// the specific webapp to avoid exceptions or misuses when calling the spi interface in// the callback method (this problem occurs only in multi-application deployment).Thread.currentThread().setContextClassLoader(appClassLoader);ConfigResponse cr = new ConfigResponse();cr.setDataId(dataId);cr.setGroup(group);cr.setContent(content);cr.setEncryptedDataKey(encryptedDataKey);configFilterChainManager.doFilter(null, cr);String contentTmp = cr.getContent();listenerWrap.inNotifying = true;// todo !!!!最终回调通知,就是这里listener.receiveConfigInfo(contentTmp);// compare lastContent and content// todo   扩展点,告知配置内容的变动if (listener instanceof AbstractConfigChangeListener) {Map data = ConfigChangeHandler.getInstance().parseChangeData(listenerWrap.lastContent, content, type);ConfigChangeEvent event = new ConfigChangeEvent(data);((AbstractConfigChangeListener) listener).receiveConfigChange(event);listenerWrap.lastContent = content;}// 赋予最新的md5listenerWrap.lastCallMd5 = md5;LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ,cost={} millis.", name,dataId, group, md5, listener, (System.currentTimeMillis() - start));} catch (NacosException ex) {LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());} catch (Throwable t) {LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,group, md5, listener, t.getCause());} finally {listenerWrap.inNotifying = false;Thread.currentThread().setContextClassLoader(myClassLoader);}}};final long startNotify = System.currentTimeMillis();try {// todo // 监听器配置了异步执行器,就用配置的执行if (null != listener.getExecutor()) {listener.getExecutor().execute(job);} else {try {//todo  内部线程池执行INTERNAL_NOTIFIER.submit(job);} catch (RejectedExecutionException rejectedExecutionException) {LOGGER.warn("[{}] [notify-blocked] dataId={}, group={}, md5={}, listener={}, no available internal notifier,will sync notifier ",name, dataId, group, md5, listener);job.run();} catch (Throwable throwable) {LOGGER.error("[{}] [notify-blocked] dataId={}, group={}, md5={}, listener={}, submit internal async task fail,throwable= ",name, dataId, group, md5, listener, throwable);job.run();}}} catch (Throwable t) {LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,group, md5, listener, t.getCause());}final long finishNotify = System.currentTimeMillis();LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",name, (finishNotify - startNotify), dataId, group, md5, listener);}

�这块代码也很长,但是比较简单:

  1. 创建一个任务,判断是否是某种类型listener,如果是AbstractSharedListener,就回调到它的方法
  2. 回调到我们正常的listener方法,比如listener.receiverConfigInfo
  3. 判断是否是AbstractConfigChange Listener,如果是,就回调
  4. 看这个listener有没有配置异步执行器Executor,如果有就用它执行,如果没有,就用内部的线程池执行

ClientWorker#ConfigRpcTransportClient�#ensureRpcClient

�到上面为止,其实客户端的主流程已经比较请求,但是在executeConfigListen方法中有一个小方法ensureRpcClient我们就简单的一笔带过,实际上在后续的与服务端请求交互比较有用,我们还是再看一下
image.png
image.png
image.png
简单总结一下:

  1. 通过RpcClientFactory创建了一个GrpcSDKClient,这个之前Nacos服务注册的时候也会创建,所以比较熟悉
  2. 初始化网络请求处理:在这里注册了服务端调用客户端的处理方法, 注意不是客户端请求,而是服务端接受客户端的请求,因为Grpc是可以双向请求的,这个最重要的就是notifyListenConfig,😄是不是非常熟悉,如果我们服务端改动了配置,客户端从这里就可以得到通知,然后往listenExecutebell.offer(bellItem)发送一个信号,客户端就立马开始执行executeConfigListen
  3. rpcClient.start:这个没什么好说的,服务注册那里说过

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/454832.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

react 之 useInperativeHandle

useInperativeHandle是通过ref暴露子组件中的方法 1.场景说明-直接调用子组件内部的方法 import { forwardRef, useImperativeHandle, useRef } from "react"// 子组件const Son forwardRef((props, ref) > {// 实现聚焦逻辑const inputRef useRef(null)const …

超多制作模板的姓氏头像生成器微信小程序源码

超多制作模板的姓氏头像生成器微信小程序源码&#xff0c;这是一款姓氏头像制作小工具&#xff0c;内含丰富多样的模板提供制作。 以前的基本是固定位置生成&#xff0c;这款制作支持拖拽调整位置&#xff0c;自定义颜色&#xff0c;阴影等等。

[VulnHub靶机渗透] MHZ_CXF: C1F

&#x1f36c; 博主介绍&#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 hacker-routing &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【应急响应】 【python】 【VulnHub靶场复现】【面试分析】 &#x1f389;点赞➕评论➕收藏…

“SharpDocx” C#项目中用于创建 Word 文档的轻量级模板引擎

简介&#xff1a; SharpDocx是一个轻量级的模板引擎&#xff0c;用于创建Word文档。它允许开发者基于视图生成Word文档&#xff0c;这个视图本身就是一个Word文档&#xff0c;可以根据需要设置简单或复杂的布局。 以下是一些主要特点&#xff1a; 模板引擎类似Razor&#xf…

简单实践 spring clound 使用openfeign

1.概要 这是在前面工程基础上的一个变更。 前工程&#xff1a;检查实验 spring cloud nacos nacos-server-2.3.0-CSDN博客 2 代码 2.1 引入依赖 <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-open…

润唇膏市场分析:预计2029年将达到13亿美元

近年来&#xff0c;随着中国居民人均可支配收入持续增长&#xff0c;“颜值经济”的崛起&#xff0c;中国居民对化妆品的消费将持续上升。唇部是人体肌肤中最薄最娇嫩的部分之一&#xff0c;如果护理不当&#xff0c;极其容易产生唇纹、唇部黑色素沉着、干燥起皮等问题。因此对…

SpringSecurity(17)——OAuth2令牌管理策略

刷新令牌策略 注意&#xff1a;刷新令牌只有在授权码模式和密码模式中才有&#xff0c;对应的指定这两种模式时&#xff0c;在类型上加上refresh_token <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-se…

【优先级队列(大顶堆 小顶堆)】【遍历哈希表键值对】Leetcode 347 前K个高频元素

【优先级队列&#xff08;大顶堆 小顶堆&#xff09;】【排序】Leetcode 347 前K个高频元素 1.不同排序法归纳2.大顶堆和小顶堆3.PriorityQueue操作4.PriorityQueue的升序&#xff08;默认&#xff09;与降序5.问题解决&#xff1a;找前K个最大的元素 &#xff1a;踢走最小的&…

Mixed Content: The page at ‘xxx‘ was loaded over HTTPS, but requested an insecure XMLHttpRequest end

Mixed Content: The page at xxx was loaded over HTTPS, but requested an insecure XMLHttpRequest end 报错信息报错的原因出现的问题解决办法 报错信息 Mixed Content: The page at xxx was loaded over HTTPS, but requested an insecure XMLHttpRequest endpoint xxx. Th…

挑战杯 opencv 图像识别 指纹识别 - python

0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 基于机器视觉的指纹识别系统 &#x1f947;学长这里给一个题目综合评分(每项满分5分) 难度系数&#xff1a;3分工作量&#xff1a;3分创新点&#xff1a;4分 该项目较为新颖&#xff0c;适…

功能测试+自动化测试代码覆盖率统计

Jacoco 是一个开源的覆盖率工具。Jacoco 可以嵌入到 Ant 、Maven 中&#xff0c;并提供了 EclEmma Eclipse 插件,也可以使用 Java Agent 技术监控 Java 程序。很多第三方的工具提供了对 Jacoco 的集成&#xff0c;如 sonar、Jenkins、IDEA。 Jacoco 包含了多种尺度的覆盖率计数…

研究表明:论文被大V宣传后,引用次数暴涨2~3倍!

随着AI领域的迅猛发展&#xff0c;学术成果的传播方式发生了显著转变。 期刊审稿周期长&#xff0c;当你还在和审稿人battle时&#xff0c;方法先过时了。而会议虽然没有期刊长&#xff0c;但也有几个月的时间差&#xff0c;为了保护成果的创新性并扩大影响力&#xff0c;很多…