七、Nacos源码系列:Nacos服务发现

目录

一、服务发现

二、getServices():获取服务列表

2.1、获取服务列表

2.2、总结图

三、getInstances(serviceId):获取服务实例列表 

3.1、从缓存中获取服务信息

3.2、缓存为空,执行订阅服务

3.2.1、调度更新,往线程池中提交一个UpdateTask任务

3.2.2、订阅服务 

 3.2.3、处理服务信息

3.3、非订阅模式,通过grpc发送ServiceQueryRequest服务查询请求

3.4、筛选满足条件的实例 

3.5、总结图


一、服务发现

在discovery-provider项目的pom.xml中,我们引入了如下依赖:

<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId><version>2.2.9.RELEASE</version>
</dependency>

SpringCloud很多功能都是基于SpringBoot项目的自动配置原理来扩展实现的,下面我们查看spring-cloud-starter-alibaba-nacos-discovery-2.2.9.RELEASE.jar包路径下的spring.factories的"org.springframework.boot.autoconfigure.EnableAutoConfiguration"自动装配类配置,如下图:

如上图,跟客户端服务发现有关的有两个自动配置类:NacosDiscoveryClientConfiguration和NacosDiscoveryAutoConfiguration。

相关源码如下:

@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ConditionalOnBlockingDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
@AutoConfigureBefore({ SimpleDiscoveryClientAutoConfiguration.class,CommonsClientAutoConfiguration.class })
// 在NacosDiscoveryAutoConfiguration自动装配类执行完成后才执行
@AutoConfigureAfter(NacosDiscoveryAutoConfiguration.class)
public class NacosDiscoveryClientConfiguration {// 创建DiscoveryClient bean对象@Beanpublic DiscoveryClient nacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {return new NacosDiscoveryClient(nacosServiceDiscovery);}@Bean@ConditionalOnMissingBean@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled",matchIfMissing = true)public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager,NacosDiscoveryProperties nacosDiscoveryProperties) {return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties);}}@Configuration(proxyBeanMethods = false)
// spring.cloud.discovery.enabled=true时才生效,缺省值为true
@ConditionalOnDiscoveryEnabled
// spring.cloud.nacos.discovery.enabled=true时才生效,缺省值为true
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryAutoConfiguration {@Bean@ConditionalOnMissingBeanpublic NacosDiscoveryProperties nacosProperties() {// 匹配配置文件中以“spring.cloud.nacos.discovery”为前缀的那些属性,// 如namespace、username、password、serverAddr等属性return new NacosDiscoveryProperties();}// 创建NacosServiceDiscovery bean对象@Bean@ConditionalOnMissingBeanpublic NacosServiceDiscovery nacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties,NacosServiceManager nacosServiceManager) {return new NacosServiceDiscovery(discoveryProperties, nacosServiceManager);}}

在NacosDiscoveryAutoConfiguration自动配置类中,创建了一个NacosServiceDiscovery的bean对象,然后在NacosDiscoveryClientConfiguration自动装配时,创建DiscoveryClient的bean对象,传入前面创建的NacosServiceDiscovery对象。

重点关注NacosDiscoveryClient这个类,NacosDiscoveryClient的源码如下:

public class NacosDiscoveryClient implements DiscoveryClient {private static final Logger log = LoggerFactory.getLogger(NacosDiscoveryClient.class);/*** Nacos Discovery Client Description.*/public static final String DESCRIPTION = "Spring Cloud Nacos Discovery Client";private NacosServiceDiscovery serviceDiscovery;public NacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {this.serviceDiscovery = nacosServiceDiscovery;}@Overridepublic String description() {return DESCRIPTION;}@Overridepublic List<ServiceInstance> getInstances(String serviceId) {try {return serviceDiscovery.getInstances(serviceId);}catch (Exception e) {throw new RuntimeException("Can not get hosts from nacos server. serviceId: " + serviceId, e);}}@Overridepublic List<String> getServices() {try {return serviceDiscovery.getServices();}catch (Exception e) {log.error("get service name from nacos server fail,", e);return Collections.emptyList();}}}

可以看到,NacosDiscoveryClient实现了SpringCloud的DiscoveryClient接口,重点是getInstances()和getServices()方法,而且都是由NacosServiceDiscovery类去实现。

public class NacosServiceDiscovery {// 跟配置文件中以“spring.cloud.nacos.discovery”前缀的属性配置对应上private NacosDiscoveryProperties discoveryProperties;// nacos服务管理器对象private NacosServiceManager nacosServiceManager;public NacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties,NacosServiceManager nacosServiceManager) {this.discoveryProperties = discoveryProperties;this.nacosServiceManager = nacosServiceManager;}/*** 返回指定group和servic的所有实例*/public List<ServiceInstance> getInstances(String serviceId) throws NacosException {// 配置文件中配置的group组名String group = discoveryProperties.getGroup();// namingService(): 通过反射创建一个NacosNamingService对象// 最终会调用NacosNamingService#selectInstances()方法List<Instance> instances = namingService().selectInstances(serviceId, group,true);// 将Instance包装成NacosServiceInstance对象返回return hostToServiceInstanceList(instances, serviceId);}/*** 返回指定group的所有服务名称*/public List<String> getServices() throws NacosException {// 配置文件中配置的group组名String group = discoveryProperties.getGroup();// namingService(): 通过反射创建一个NacosNamingService对象// 最终会调用NamingGrpcClientProxy#getServiceList()方法ListView<String> services = namingService().getServicesOfServer(1,Integer.MAX_VALUE, group);// 返回所有服务名称return services.getData();}public static List<ServiceInstance> hostToServiceInstanceList(List<Instance> instances, String serviceId) {List<ServiceInstance> result = new ArrayList<>(instances.size());for (Instance instance : instances) {ServiceInstance serviceInstance = hostToServiceInstance(instance, serviceId);if (serviceInstance != null) {result.add(serviceInstance);}}return result;}public static ServiceInstance hostToServiceInstance(Instance instance,String serviceId) {if (instance == null || !instance.isEnabled() || !instance.isHealthy()) {return null;}NacosServiceInstance nacosServiceInstance = new NacosServiceInstance();nacosServiceInstance.setHost(instance.getIp());nacosServiceInstance.setPort(instance.getPort());nacosServiceInstance.setServiceId(serviceId);Map<String, String> metadata = new HashMap<>();metadata.put("nacos.instanceId", instance.getInstanceId());metadata.put("nacos.weight", instance.getWeight() + "");metadata.put("nacos.healthy", instance.isHealthy() + "");metadata.put("nacos.cluster", instance.getClusterName() + "");if (instance.getMetadata() != null) {metadata.putAll(instance.getMetadata());}metadata.put("nacos.ephemeral", String.valueOf(instance.isEphemeral()));nacosServiceInstance.setMetadata(metadata);if (metadata.containsKey("secure")) {boolean secure = Boolean.parseBoolean(metadata.get("secure"));nacosServiceInstance.setSecure(secure);}return nacosServiceInstance;}private NamingService namingService() {return nacosServiceManager.getNamingService();}}

接下来,我们分析前面介绍到的两个重要方法:getInstances(serviceId)和getServices()。

二、getServices():获取服务列表

2.1、获取服务列表

// namingService(): 通过反射创建一个NacosNamingService对象
// NamingFactory#createNamingService(java.util.Properties)
public static NamingService createNamingService(Properties properties) throws NacosException {try {Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");Constructor constructor = driverImplClass.getConstructor(Properties.class);return (NamingService) constructor.newInstance(properties);} catch (Throwable e) {throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);}
}
//getServices()调用栈大体如下:namingService().getServicesOfServer(1, Integer.MAX_VALUE, group);NacosNamingService#getServicesOfServerclientProxy.getServiceList(pageNo, pageSize, groupName, selector)NamingClientProxyDelegate#getServiceListgrpcClientProxy.getServiceList(pageNo, pageSize, groupName, selector);// 最终会调用NamingGrpcClientProxy#getServiceList()方法
public ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector)throws NacosException {// 构建ServiceListRequest请求(服务列表请求),指定命名空间ID、服务组名ServiceListRequest request = new ServiceListRequest(namespaceId, groupName, pageNo, pageSize);if (selector != null) {if (SelectorType.valueOf(selector.getType()) == SelectorType.label) {request.setSelector(JacksonUtils.toJson(selector));}}// 发送服务列表请求给Nacos服务端,接下来由服务端处理ServiceListResponse response = requestToServer(request, ServiceListResponse.class);// 组装返回值出去ListView<String> result = new ListView<>();result.setCount(response.getCount());result.setData(response.getServiceNames());return result;
}

接下来,我们看看服务端怎么处理这个服务列表请求的。通过对ServiceListRequest类引用的追踪,我们发现是在com.alibaba.nacos.naming.remote.rpc.handler.ServiceListRequestHandler#handle这个方法中对客户端提交的服务列表请求进行处理的。

// 处理客户端提交的服务列表请求
public ServiceListResponse handle(ServiceListRequest request, RequestMeta meta) throws NacosException {// ServiceManager.getInstance()通过单例返回一个ServiceManager对象/*** 获取指定命令空间下的所有服务,在ServiceManager中存在一个map保存着每个命名空间中的所有服务。* ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps = new ConcurrentHashMap<>(1 << 2)* key: namespaceId* value: Set<Service>* 注册实例的时候,就往这个map写入了数据** ServiceManager.getInstance().getSingletons(request.getNamespace())相当于执行:* namespaceSingletonMaps.getOrDefault(namespace, new HashSet<>(1))*/Collection<Service> serviceSet = ServiceManager.getInstance().getSingletons(request.getNamespace());// 构建响应结果ServiceListResponse result = ServiceListResponse.buildSuccessResponse(0, new LinkedList<>());if (!serviceSet.isEmpty()) {// 过滤指定分组的Service,添加groupServiceName,格式如:groupA@@serviceACollection<String> serviceNameSet = selectServiceWithGroupName(serviceSet, request.getGroupName());// 按分页裁剪serviceNameSetList<String> serviceNameList = ServiceUtil.pageServiceName(request.getPageNo(), request.getPageSize(), serviceNameSet);result.setCount(serviceNameSet.size());result.setServiceNames(serviceNameList);}return result;
}

从源码可以看出,Nacos服务端从ServiceManager管理器中的一个map(namespaceSingletonMaps)中拿出指定命名空间那些Service,并根据筛选条件过滤满足条件的Service,然后组装好groupServiceName(格式如:groupA@@serviceA)并返回。

2.2、总结图

三、getInstances(serviceId):获取服务实例列表 

// 调用栈如下:
// namingService().selectInstances(serviceId, group,true);// NamingService#selectInstances(serviceName, groupName, healthy, true)// NamingService#selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, true)
// getInstances(serviceId)方法最终会调用NacosNamingService#selectInstances()获取实例信息。
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,boolean subscribe) throws NacosException {ServiceInfo serviceInfo;// 集群名称,使用逗号分隔String clusterString = StringUtils.join(clusters, ",");// 是否订阅,默认是订阅的if (subscribe) {/*** 1.从缓存中获取ServiceInfo* ConcurrentMap<String, ServiceInfo> serviceInfoMap* key:  groupName@@serviceName  或者  groupName@@serviceName@@clusterString* value: ServiceInfo*/// 示例:serviceName=discovery-provider   groupName=DEFAULT_GROUPserviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);// 2.缓存为空,执行订阅服务if (null == serviceInfo) {serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);}} else {// 3.非订阅,通过grpc发送ServiceQueryRequest服务查询请求serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);}// 4.筛选满足条件的实例return selectInstances(serviceInfo, healthy);
}

上述流程的基本逻辑为:

  • 如果是订阅模式,则直接从本地缓存获取服务信息(ServiceInfo),然后从中获取实例列表,这是因为订阅机制会自动同步服务器实例的变化到本地。如果本地缓存中没有,那说明是首次调用,则进行订阅,在订阅完成后会获得到服务信息。
  • 如果是非订阅模式,那就直接请求服务器端,获得服务信息。

3.1、从缓存中获取服务信息

public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());// 组装服务名(带组名):groupName@@serviceName// 例如:DEFAULT_GROUP@@discovery-providerString groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);// 如果指定了集群,那么key还会加上"@@clusters"String key = ServiceInfo.getKey(groupedServiceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}// ConcurrentMap<String, ServiceInfo> serviceInfoMap// 从缓存中获取服务信息return serviceInfoMap.get(key);
}

3.2、缓存为空,执行订阅服务

serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);// clientProxy在构造方法中初始化为:NamingClientProxyDelegate
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);

实际上调用的是NamingClientProxyDelegate#subscribe()方法:

public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);// 服务名称(带组名)  格式:groupName@@serviceNameString serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);// 如果集群名称非空,key还需要拼接上集群名称String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);// 调度更新,往线程池中提交一个UpdateTask任务serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);// 获取缓存中的服务信息ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);if (null == result || !isSubscribed(serviceName, groupName, clusters)) {// 缓存中不存在对应的服务信息 或者 SubscriberRedoData还未注册,则执行订阅result = grpcClientProxy.subscribe(serviceName, groupName, clusters);}// 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件serviceInfoHolder.processServiceInfo(result);return result;
}

主要逻辑有下面三个,分析如下。

3.2.1、调度更新,往线程池中提交一个UpdateTask任务

public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {if (!asyncQuerySubscribeService) {return;}// 组装key   格式:groupName@@serviceName@@clustersString serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);// Map<String, ScheduledFuture<?>> futureMap = new HashMap<>()// futureMap用于保存UpdateTask线程池任务的执行结果if (futureMap.get(serviceKey) != null) {return;}synchronized (futureMap) {// double check双重检查,如果非空,直接返回,也就是相同的groupName@@serviceName@@clusters,只会存在一个UpdateTask任务if (futureMap.get(serviceKey) != null) {return;}// 往线程池中添加一个更新任务// UpdateTask实现了Runnable接口,需要关注其run()方法ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));futureMap.put(serviceKey, future);}
}private synchronized ScheduledFuture<?> addTask(UpdateTask task) {// 延迟1s执行UpdateTaskreturn executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

可以看到,使用了一个map来保存线程池任务的响应,延迟1s执行调度更新任务。我们看下UpdateTask的源码:

public class UpdateTask implements Runnable {long lastRefTime = Long.MAX_VALUE;private boolean isCancel;private final String serviceName;private final String groupName;private final String clusters;private final String groupedServiceName;private final String serviceKey;/*** the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty*/private int failCount = 0;public UpdateTask(String serviceName, String groupName, String clusters) {this.serviceName = serviceName;this.groupName = groupName;this.clusters = clusters;this.groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);this.serviceKey = ServiceInfo.getKey(groupedServiceName, clusters);}@Overridepublic void run() {long delayTime = DEFAULT_DELAY;try {if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);isCancel = true;return;}ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);if (serviceObj == null) {// 使用grpc向服务端发送ServiceQueryRequest服务查询请求serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件serviceInfoHolder.processServiceInfo(serviceObj);lastRefTime = serviceObj.getLastRefTime();return;}if (serviceObj.getLastRefTime() <= lastRefTime) {// 使用grpc向服务端发送ServiceQueryRequest服务查询请求serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);// 处理服务信息:获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件serviceInfoHolder.processServiceInfo(serviceObj);}lastRefTime = serviceObj.getLastRefTime();if (CollectionUtils.isEmpty(serviceObj.getHosts())) {// 记录失败次数incFailCount();return;}// TODO multiple time can be configured.delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;// 重置失败次数resetFailCount();} catch (NacosException e) {handleNacosException(e);} catch (Throwable e) {handleUnknownException(e);} finally {if (!isCancel) {// 注意:延时时间最长为60s,时长和失败次数相关(失败次数越大,延时时间越长)executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60),TimeUnit.MILLISECONDS);}}}private void handleNacosException(NacosException e) {incFailCount();int errorCode = e.getErrCode();if (NacosException.SERVER_ERROR == errorCode) {handleUnknownException(e);}NAMING_LOGGER.warn("Can't update serviceName: {}, reason: {}", groupedServiceName, e.getErrMsg());}private void handleUnknownException(Throwable throwable) {incFailCount();NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, throwable);}private void incFailCount() {int limit = 6;if (failCount == limit) {return;}failCount++;}private void resetFailCount() {failCount = 0;}
}

run()方法主要逻辑就是使用grpc向服务端发送ServiceQueryRequest服务查询请求,然后处理服务信息,获取老的服务信息,将新的服务信息重新存入客户端缓存中,对比新的服务信息,如发生变更,则发布实例变更数据,并同步serviceInfo数据到本地文件。

这里有重试机制,最多重试6次,延时时间最长为60s,时长和失败次数相关(失败次数越大,延时时间越长)。

3.2.2、订阅服务 

public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("[GRPC-SUBSCRIBE] service:{}, group:{}, cluster:{} ", serviceName, groupName, clusters);}// 缓存SubscriberRedoData重做数据,定时使用redoData重新订阅,// 具体实现在RedoScheduledTask(由NamingGrpcRedoService定时调度),最终调用的也是NamingGrpcClientProxy#doSubscribe// 缓存重做数据保存在map中:private final ConcurrentMap<String, SubscriberRedoData> subscribes = new ConcurrentHashMap<>();redoService.cacheSubscriberForRedo(serviceName, groupName, clusters);// 使用grpc发送服务订阅请求return doSubscribe(serviceName, groupName, clusters);
}public void cacheSubscriberForRedo(String serviceName, String groupName, String cluster) {String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster);SubscriberRedoData redoData = SubscriberRedoData.build(serviceName, groupName, cluster);// private final ConcurrentMap<String, SubscriberRedoData> subscribes = new ConcurrentHashMap<>();synchronized (subscribes) {subscribes.put(key, redoData);}
}

订阅服务首先会缓存SubscriberRedoData重做数据,实际上就是保存在一个map中,后续可以定时使用SubscriberRedoData重做数据来重新订阅,然后使用grpc发送服务订阅请求。

我们来看下如何订阅服务的。

public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {// 构建一个SubscribeServiceRequest客户端订阅请求// 服务端处理代码: com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler.handleSubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters,true);// grpc请求Nacos服务端进行订阅SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);// 标记SubscriberRedoData重做数据为已订阅redoService.subscriberRegistered(serviceName, groupName, clusters);return response.getServiceInfo();
}

通过grpc向Nacos服务端发起一个订阅请求,服务端真正的处理是在:com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler#handle()方法。

public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {String namespaceId = request.getNamespace();String serviceName = request.getServiceName();String groupName = request.getGroupName();String app = request.getHeader("app", "unknown");String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);// 构建一个Service服务,指定为临时实例Service service = Service.newService(namespaceId, groupName, serviceName, true);// 构建Subscriber订阅者对象Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(),namespaceId, groupedServiceName, 0, request.getClusters());// serviceStorage.getData(service): 从缓存中获取serviceInfo// metadataManager.getServiceMetadata(service).orElse(null): 从内存(map)获取ServiceMetadata// ServiceUtil.selectInstancesWithHealthyProtection(): 仅包含有保护机制的健康实例ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service),metadataManager.getServiceMetadata(service).orElse(null), subscriber.getCluster(), false,true, subscriber.getIp());if (request.isSubscribe()) {// 订阅服务clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());NotifyCenter.publishEvent(new SubscribeServiceTraceEvent(System.currentTimeMillis(),meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));} else {// 取消订阅服务clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());NotifyCenter.publishEvent(new UnsubscribeServiceTraceEvent(System.currentTimeMillis(),meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));}return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
}

我们重点关注订阅服务的方法:

public void subscribeService(Service service, Subscriber subscriber, String clientId) {Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}// 添加到订阅者列表中,实际上就是保存在map中// 订阅者列表: protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);client.addServiceSubscriber(singleton, subscriber);client.setLastUpdatedTime();// 发布客户端订阅事件NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));
}

将service添加到订阅者列表中,然后发布客户端订阅事件,这个在之前分析过,客户端订阅事件是在com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#handleClientOperation进行处理,

核心逻辑就是将服务添加到ClientServiceIndexesManager的subscriberIndexes订阅者列表中:

private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();

 3.2.3、处理服务信息

public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {String serviceKey = serviceInfo.getKey();if (serviceKey == null) {return null;}// 获取老的服务ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());if (isEmptyOrErrorPush(serviceInfo)) {//empty or error push, just ignorereturn oldService;}// 重新存入客户端缓存中serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);// 对比下服务信息是否发生变更boolean changed = isChangedServiceInfo(oldService, serviceInfo);if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));}MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());if (changed) {NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),JacksonUtils.toJson(serviceInfo.getHosts()));// 如果发生改变,发送实例变更事件,处理源码在:InstancesChangeNotifierNotifyCenter.publishEvent(new InstancesChangeEvent(notifierEventScope, serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));// 同步serviceInfo数据到本地文件DiskCache.write(serviceInfo, cacheDir);}return serviceInfo;
}

3.3、非订阅模式,通过grpc发送ServiceQueryRequest服务查询请求

真正执行的是com.alibaba.nacos.client.naming.remote.NamingClientProxyDelegate#queryInstancesOfService():

public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,boolean healthyOnly) throws NacosException {return grpcClientProxy.queryInstancesOfService(serviceName, groupName, clusters, udpPort, healthyOnly);
}public ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort,boolean healthyOnly) throws NacosException {// 构建服务查询请求// Nacos服务端处理是在:com.alibaba.nacos.naming.remote.rpc.handler.ServiceQueryRequestHandler.handleServiceQueryRequest request = new ServiceQueryRequest(namespaceId, serviceName, groupName);request.setCluster(clusters);request.setHealthyOnly(healthyOnly);request.setUdpPort(udpPort);// 通过grpc请求Nacos服务端处理QueryServiceResponse response = requestToServer(request, QueryServiceResponse.class);return response.getServiceInfo();
}

queryInstancesOfService()核心就是构建了一个服务查询请求,通过grpc请求Nacos服务端,接下来我们直接看服务端的处理代码,具体是在:com.alibaba.nacos.naming.remote.rpc.handler.ServiceQueryRequestHandler#handle。

public QueryServiceResponse handle(ServiceQueryRequest request, RequestMeta meta) throws NacosException {String namespaceId = request.getNamespace();String groupName = request.getGroupName();String serviceName = request.getServiceName();Service service = Service.newService(namespaceId, groupName, serviceName);String cluster = null == request.getCluster() ? "" : request.getCluster();boolean healthyOnly = request.isHealthyOnly();// 从缓存中获取serviceInfoServiceInfo result = serviceStorage.getData(service);// 从内存(map)获取ServiceMetadataServiceMetadata serviceMetadata = metadataManager.getServiceMetadata(service).orElse(null);// 获取有保护机制的健康实例result = ServiceUtil.selectInstancesWithHealthyProtection(result, serviceMetadata, cluster, healthyOnly, true,meta.getClientIp());return QueryServiceResponse.buildSuccessResponse(result);
}

3.4、筛选满足条件的实例 

private List<Instance> selectInstances(ServiceInfo serviceInfo, boolean healthy) {List<Instance> list;if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {return new ArrayList<>();}// 遍历所有实例,直接移除掉不满足条件的实例Iterator<Instance> iterator = list.iterator();while (iterator.hasNext()) {Instance instance = iterator.next();// 筛选出健康、启用、权重大于0的实例if (healthy != instance.isHealthy() || !instance.isEnabled() || instance.getWeight() <= 0) {iterator.remove();}}return list;
}

3.5、总结图

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/461714.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【跳槽须知】关于企业所签订的竞业协议你知道多少?

年后跳槽须知自己签订的合同中是否存在竞业协议&#xff0c;谨防协议造成经济损失 &#x1f413; 什么是竞业协议 竞业协议时用于保护自己的权益&#xff0c;在员工离职时决定是否启动的一种协议&#xff0c;避免一些掌握公司机密的一些重要岗位人才流入竞争对手的公司&#xf…

C语言辨析——声明int a[3][6], a[0][9]越界吗?

本文来源&#xff1a;声明int a[3][6], a[0][9]越界吗&#xff1f; 1. 问题 看下面的程序&#xff1a; #include <stdio.h> int main(void) {int a[3][6];for(int i0; i<3; i) {for(int j0; j<6; j){a[i][j] i * 6 j;}}printf("%d\n",a[0][9]);retu…

【大厂AI课学习笔记】【1.5 AI技术领域】(7)图像分割

今天学习到了图像分割。 这是我学习笔记的脑图。 图像分割&#xff0c;Image Segmentation&#xff0c;就是将数字图像分割为若干个图像子区域&#xff08;像素的集合&#xff0c;也被称为超像素&#xff09;&#xff0c;改变图像的表达方式&#xff0c;以更容易理解和分析。 …

伯克利研究院推出Ghostbuster用于检测由LLM代笔的文本

Ghostbuster的架构&#xff0c;用于检测人工智能生成文本的最先进的新方法 像 ChatGPT 这样的大型语言模型写得非常好&#xff0c;但事实上&#xff0c;它们已经成为一个棘手的问题。学生们已经开始使用这些模型代写作业&#xff0c;导致一些学校禁止 ChatGPT。此外&#xff0c…

【C语言】通过socket看系统调用过程

一、通过socket看系统调用过程 在Linux操作系统中&#xff0c;系统调用是用户空间与内核空间之间交互的一种方式。当一个应用程序需要执行操作系统级别的任务时&#xff0c;比如创建一个网络套接字&#xff08;socket&#xff09;&#xff0c;它必须通过系统调用请求内核来执行…

【服务器数据恢复】服务器RAID模块硬件损坏的数据恢复案例

服务器数据恢复环境&故障&#xff1a; 某品牌服务器中有一组由数块SAS硬盘组建的RAID5磁盘阵列&#xff0c;服务器操作系统是WINDOWS SERVER&#xff0c;服务器中存放企业数据&#xff0c;无数据库文件。 服务器出故障之前出现过几次意外断电的情况&#xff0c;服务器断电…

用HTML5实现灯笼效果

本文介绍了两种实现效果&#xff1a;一种使用画布&#xff08;canvas&#xff09;标签/元素&#xff0c;另一种不用画布&#xff08;canvas&#xff09;标签/元素主要使用CSS实现。 使用画布&#xff08;canvas&#xff09;标签/元素实现&#xff0c;下面&#xff0c;在画布上…

一键部署自动化运维工具spug

简介 Spug是面向中小型企业设计的轻量级无Agent的自动化运维平台&#xff0c;整合了主机管理、主机批量执行、主机在线终端、应用发布部署、在线任务计划、配置中心、监控、报警等一系列功能。 部署 1.创建目录 mkdir -p /opt/spug/{mysql,service,repos} 2.进入目录 cd /o…

Node.js之npm单独与批量升级依赖包的方式

Node.js之npm单独与批量升级依赖包的方式 文章目录 Node.js之npm单独与批量升级依赖包的方式npm查看与升级依赖包1. 单独安装或升级最新版本2. 查看依赖但不升级1. npm outdated2. npm update 3. 批量升级新版本4. npm-check-updates1. 全局安装2. ncu查看可升级的版本3. 升级依…

Leetcode 213 打家劫舍 II

题意理解&#xff1a; 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋&#xff0c;每间房内都藏有一定的现金。这个地方所有的房屋都 围成一圈 &#xff0c;这意味着第一个房屋和最后一个房屋是紧挨着的。同时&#xff0c;相邻的房屋装有相互连通的防盗系统&#xff0c;如果…

《PCI Express体系结构导读》随记 —— 第II篇 第4章 PCIe总线概述(10)

接前一篇文章&#xff1a;《PCI Express体系结构导读》随记 —— 第II篇 第4章 PCIe总线概述&#xff08;9&#xff09; 4.2 PCIe体系结构的组成部件 PCIe总线作为处理器系统的局部总线&#xff0c;其作用与PCI总线类似&#xff0c;主要目的是为了连接处理器系统中的外部设备&…

HiveSQL——用户中两人一定认识的组合数

注&#xff1a;参考文章&#xff1a; SQL之用户中两人一定认识的组合数--HQL面试题36【快手数仓面试题】_sql面试题-快手-CSDN博客文章浏览阅读1.2k次&#xff0c;点赞3次&#xff0c;收藏12次。目录0 需求分析1 数据准备2 数据分析3 小结0 需求分析设表名&#xff1a;table0现…