Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)

基于Dubbo 3.1,详细介绍了Dubbo服务的发布与引用的源码。

此前我们学习了Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3),也就是Dubbo服务导出的核心方法doExportUrl的上半部分源码,现在我们继续学习,服务导出的核心方法doExportUrl的下半部分源码,也就是具体的远程服务协议导出。

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
  3. Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
  4. Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
  5. Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)

文章目录

  • 1 RegistryProtocol应用级远程服务导出协议
    • 1.1 getRegistryUrl获取注册中心url
    • 1.2 doLocalExport本地导出服务
  • 2 InterfaceCompatibleRegistryProtocol接口级远程服务导出协议
    • 2.1 getRegistryUrl获取注册中心url
  • 3 DubboProtocol Dubbo协议导出
    • 3.1 openServer开启服务器
      • 3.1.1 createServer创建服务器
        • 3.1.1.1 Exchangers#bind创建交换器
        • 3.1.1.2 NettyTransporter#bind创建NettyServer
        • 3.1.1.3 doOpen初始化并启动netty服务器
  • 4 总结

1 RegistryProtocol应用级远程服务导出协议

应用级服务远程导出协议以service-discovery-registry开头,其对应的Protocol实现就是RegistryProtocol。

  1. 这里的Invoker中的url实际上是注册中心的url地址,例如service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=registry1&application=demo-provider&application.version=1&dubbo=2.0.2&pid=48272&registry=zookeeper&timeout=20001&timestamp=1666601882084
  2. 真正的服务导出url被保存到attributes属性中,key为export,value例如dubbo://127.0.0.1:20880/org.apache.dubbo.demo.GreetingService?anyhost=true&application=demo-provider&application.version=1&background=false&bind.ip=127.0.0.1&bind.port=20880&delay=5000&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=greeting&interface=org.apache.dubbo.demo.GreetingService&methods=hello&pid=48272&revision=1.0.0&service-name-mapping=true&side=provider&timeout=5000&timestamp=1666601909853&version=1.0.0

这个方法内容很多,大概步骤为:

  1. 创建OverrideListener存入overrideListeners集合,用于监听zk服务端configurators配置目录变更。若监听到有变更,则会内部的originInvoker进行reExport,即重新导出。
  2. 获取真实服务提供者url,调用doLocalExport方法进行服务导出,得到Exporter
  3. 基于Dubbo SPI机制根据注册中心url加载具体的注册中心操作类,service-discovery-registry对应着ServiceDiscoveryRegistry
  4. 调用register方法向远程注册中心注册服务提供者url。
  5. Exporter包装为一个DestroyableExporter返回。
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {//获取注册中心url,实际上就是Invoker的url属性//service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=registry1&application=demo-provider&application.version=1&dubbo=2.0.2&pid=48272&registry=zookeeper&timeout=20001&timestamp=1666601882084URL registryUrl = getRegistryUrl(originInvoker);// url to export locally 获取服务提供者导出url,实际上是Invoker的url属性内部的export属性//dubbo://127.0.0.1:20880/org.apache.dubbo.demo.GreetingService?anyhost=true&application=demo-provider&application.version=1&background=false&bind.ip=127.0.0.1&bind.port=20880&delay=5000&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=greeting&interface=org.apache.dubbo.demo.GreetingService&methods=hello&pid=48272&revision=1.0.0&service-name-mapping=true&side=provider&timeout=5000&timestamp=1666601909853&version=1.0.0URL providerUrl = getProviderUrl(originInvoker);// Subscribe the override data// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call//  the same service. Because the subscribed is cached key with the name of the service, it causes the//  subscription information to cover.//根据服务提供者url获取订阅的url,将协议改为provider,添加category、configurators、check三个url参数,默认值false//provider://10.253.45.126:20880/org.apache.dubbo.demo.GreetingService?anyhost=true&application=demo-provider&application.version=1&background=false&bind.ip=10.253.45.126&bind.port=20880&category=configurators&check=false&delay=5000&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=greeting&interface=org.apache.dubbo.demo.GreetingService&methods=hello&pid=49884&revision=1.0.0&service-name-mapping=true&side=provider&timeout=5000&timestamp=1666602580965&version=1.0.0final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);//创建OverrideListener这个url监听器final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);//获取ProviderConfigurationListener内部的overrideListenersMap<URL, NotifyListener> overrideListeners = getProviderConfigurationListener(providerUrl).getOverrideListeners();//将OverrideListener加入到overrideListeners,key为注册中心urloverrideListeners.put(registryUrl, overrideSubscribeListener);providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);//export invoker/** 1 根据真正providerUrl的协议来导出url*/final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);// url to registry//基于Dubbo SPI机制根据注册中心url加载具体的注册中心操作类,service-discovery-registry对应着ServiceDiscoveryRegistryfinal Registry registry = getRegistry(registryUrl);//获取需要注册的服务url//dubbo://192.168.31.84:20880/org.apache.dubbo.demo.GreetingService?anyhost=true&application=demo-provider&application.version=1&background=false&delay=5000&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=greeting&interface=org.apache.dubbo.demo.GreetingService&methods=hello&pid=10920&revision=1.0.0&service-name-mapping=true&side=provider&timeout=5000&timestamp=1666621305141&version=1.0.0final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);// decide if we need to delay publish (provider itself and registry should both need to register)//决定我们是否需要延迟发布(提供者本身和注册中心都需要注册)boolean register = providerUrl.getParameter(REGISTER_KEY, true) && registryUrl.getParameter(REGISTER_KEY, true);/** 2 向注册中心注册服务*/if (register) {register(registry, registeredProviderUrl);}// register stated url on provider model//注册标准urlregisterStatedUrl(registryUrl, registeredProviderUrl, register);//设置注册中心url和订阅urlexporter.setRegisterUrl(registeredProviderUrl);exporter.setSubscribeUrl(overrideSubscribeUrl);//如果不支持服务发现if (!registry.isServiceDiscovery()) {// Deprecated! Subscribe to override rules in 2.6.x or before.//订阅符合条件的注册数据,并在注册数据发生更改时自动推送。新版本不再需要了registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);}//通知exporter,实际上是通知RegistryProtocolListenernotifyExport(exporter);//根据exporter构建为一个新的DestroyableExporter返回return new DestroyableExporter<>(exporter);
}

1.1 getRegistryUrl获取注册中心url

该方法并不是真正的注册中心协议,直接返回原服务发现协议url,例如:service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=registry1&application=demo-provider&application.version=1&dubbo=2.0.2&pid=48272&registry=zookeeper&timeout=20001&timestamp=1666601882084

/*** RegistryProtocol的方法*/
protected URL getRegistryUrl(Invoker<?> originInvoker) {//直接返回原服务发现协议urlreturn originInvoker.getUrl();
}

1.2 doLocalExport本地导出服务

该方法根据指定的服务url进行服务导出。可以看到,该方法内部,将会根据真正的服务提供者url创建一个新的Invoker,然后调用Protocol#export方法导出为exporter,最后会存入bounds缓存中。这里所谓的localExport,并不是此前说的injvm协议的导出,而是区别于远程注册中心协议的其他协议的导出,例如dubbo协议导出。

这里的protocol同样是Protocol的自适应扩展实现,即Protocol$Adaptive,也就是说会根据传入的url中的protocol选择对应的Protocol SPI实现类,而默认实现就是dubbo协议,即DubboProtocol。

/*** RegistryProtocol的方法* <p>* 根据真正providerUrl的协议来导出url** @param originInvoker 可执行对象* @param providerUrl   providerUrl的协议url*/
@SuppressWarnings("unchecked")
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {//获取缓存key,去除dynamic、enabled参数  dubbo://10.253.45.126:20880/org.apache.dubbo.demo.GreetingService?anyhost=true&application=demo-provider&application.version=1&background=false&bind.ip=10.253.45.126&bind.port=20880&delay=5000&deprecated=false&dubbo=2.0.2&generic=false&group=greeting&interface=org.apache.dubbo.demo.GreetingService&methods=hello&pid=56001&revision=1.0.0&service-name-mapping=true&side=provider&timeout=5000&timestamp=1666605432012&version=1.0.0String key = getCacheKey(originInvoker);//将exporter存入bounds缓存中return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {//新建一个invokerInvoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);//这里的这里的protocol是Protocol的自适应扩展实现,即Protocol$Adaptive//也就是说会根据传入的url中的protocol选择对应的Protocol SPI实现类,而默认实现就是dubbo协议,即DubboProtocol。return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);});
}

2 InterfaceCompatibleRegistryProtocol接口级远程服务导出协议

接口级服务远程导出协议以registry开头,其对应的Protocol实现就是InterfaceCompatibleRegistryProtocol。

实际上InterfaceCompatibleRegistryProtocol继承了RegistryProtocol,大部分代码都是一样的,例如export导出协议的方法,仅仅是一些细微的代码不一致。我们来看看它在export过程中的特有的方法。
image.png

2.1 getRegistryUrl获取注册中心url

该方法获取注册中心url,如果是RegistryProtocol协议,则直接返回originInvoker的url,并不会还原,而InterfaceCompatibleRegistryProtocol则会进行处理、还原。

获取url中的registry属性,也就是真实的注册中心协议,例如zookeeper,默认dubbo,然后替换掉registry协议并返回。

/*** 获取注册中心url* @param originInvoker* @return*/
@Override
protected URL getRegistryUrl(Invoker<?> originInvoker) {//获取注册中心url,例如://registry://xxx.xxx.xxx.xxx:2181/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=registry1&application=demo-provider&application.version=1&dubbo=2.0.2&pid=38947&registry=zookeeper&timeout=20001&timestamp=1666681960810URL registryUrl = originInvoker.getUrl();//如果协议是registry,即是接口级远程注册协议if (REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {//获取url中的registry属性,也就是真实的注册中心协议,例如zookeeper,默认dubboString protocol = registryUrl.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY);//使用真实的注册中心协议替换registry协议,并且删除url中的registry参数registryUrl = registryUrl.setProtocol(protocol).removeParameter(REGISTRY_KEY);}//替换后的协议//zookeeper://xxx.xxx.xxx.xxx:2181/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=registry1&application=demo-provider&application.version=1&dubbo=2.0.2&pid=38947&timeout=20001&timestamp=1666681960810return registryUrl;
}

3 DubboProtocol Dubbo协议导出

上面讲到RegistryProtocol内部的doLocalExport方法会对真实协议进行导出,假设我们的Service服务协议采用dubbo协议,export方法我们在此前就说过了,会经过层层的包装,Dubbo协议的导出也不例外。区别是DubboProtocol还会进行网络通信处理。

这里我们直接看最底层DubboProtocol的export方法,它的大概步骤为:

  1. 根据服务提供者url构建服务key,例如:org.apache.dubbo.demo.DemoService:20880。
  2. 根据invoker,key构建一个DubboExporter,然后将key - invoker,存入到exporterMap这个缓存中,后续调用时,将会从exporterMap找到Exporter,然后找到Invoker进行调用
  3. 导出用于调度事件的存根服务。
  4. 调用openServer方法,开启服务提供者端服务器,监听端口,这样就能接收consumer的远程调用请求
  5. 调用optimizeSerialization方法,优化序列化。
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {checkDestroyed();//获取服务提供者url//例如: dubbo://192.168.31.84:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&application.version=1&background=false&bind.ip=192.168.31.84&bind.port=20880&delay=5000&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=20496&service-name-mapping=true&side=provider&timeout=3000&timestamp=1666619245421URL url = invoker.getUrl();/** 1 导出服务*/// export service.//构建服务key,例如  org.apache.dubbo.demo.DemoService:20880String key = serviceKey(url);//根据invoker,key构建一个DubboExporter//然后将key - invoker,存入到exporterMap这个缓存中,后续调用时,将会从exporterMap找到Exporter,然后找到Invoker进行调用。DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);//export a stub service for dispatching event//导出用于调度事件的存根服务boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);boolean isCallbackService = url.getParameter(IS_CALLBACK_SERVICE, false);if (isStubSupportEvent && !isCallbackService) {String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);if (stubServiceMethods == null || stubServiceMethods.length() == 0) {if (logger.isWarnEnabled()) {logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +"], has set stub proxy support event ,but no stub methods founded."));}}}/** 2 开启服务提供者端服务器*/openServer(url);/** 3 优化序列化*/optimizeSerialization(url);return exporter;
}

3.1 openServer开启服务器

开启服务提供者端服务器,监听端口,这样就能接收consumer的远程调用请求。

从服务缓存serverMap获取协议服务ProtocolServer,serverMap缓存key为url的address,因此同服务器同端口的多个Dubbo Service将会使用同一个key,即一般情况下,一个服务实例中的所有服务接口使用同一个服务器。

如果没有服务器那么调用createServer创建,否则调用reset重置,重置实际上就是更新Server的一些配置信息以及url参数信息。

/*** DubboProtocol的方法* <p>* 开启服务端* @param url 服务提供者url*/
private void openServer(URL url) {checkDestroyed();// find server.//获取服务器key,就是url的address、,例如:192.168.31.84:20880String key = url.getAddress();// client can export a service which only for server to invokeboolean isServer = url.getParameter(IS_SERVER_KEY, true);if (isServer) {//从服务map缓存获取协议服务ProtocolServer,同服务器同端口的多个Dubbo Service将会使用同一个keyProtocolServer server = serverMap.get(key);if (server == null) {//加锁synchronized (this) {//双重检测server = serverMap.get(key);if (server == null) {//创建一个ProtocolServer存入进去serverMap.put(key, createServer(url));return;}}}// server supports reset, use together with override//如果已经有了ProtocolServer,那么重置server.reset(url);}
}

3.1.1 createServer创建服务器

该方法创建生产者网络通信服务器,即dubbo提供者端网络服务,默认采用netty作为底层网络通信库。

该方法的核心就是Exchangers#bind方法,该方法默认通过HeaderExchanger创建HeaderExchangeServer,HeaderExchangeServer内部包含基于NettyTransporter创建的NettyServer,NettyServer内部包含url和ChannelHandler,创建NettyServer时构造器内部会调用doOpen方法绑定端口并启动netty服务器,而ChannelHandler的包含关系为:ChannelHandlerDispatcher -> DecodeHandler -> HeaderExchangeHandler -> requestHandler(ExchangeHandlerAdapter)。

/*** DubboProtocol的方法* 创建服务器** @param url 服务提供者url* @return 协议服务器*/
private ProtocolServer createServer(URL url) {//添加url参数url = URLBuilder.from(url)// send readonly event when server closes, it's enabled by default 当服务器关闭时发送只读事件,默认情况下是启用的.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())// enable heartbeat by default  默认启用心跳.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)).addParameter(CODEC_KEY, DubboCodec.NAME) //编解码器 dubbo.build();//获取指定的网络传输器,dubbo还提供了netty、grizzly、mina三种,默认dubboString transporter = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);//如果没有该传输器的扩展,那么抛出异常,目前仅提供了NettyTransporterif (StringUtils.isNotEmpty(transporter) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(transporter)) {throw new RpcException("Unsupported server type: " + transporter + ", url: " + url);}//创建ExchangeServer交换器,启动NettyServerExchangeServer server;try {//默认通过HeaderExchanger创建HeaderExchangeServer//HeaderExchangeServer内部包含基于NettyTransporter创建的NettyServer//NettyServer内部包含url和ChannelHandler,ChannelHandlerDispatcher -> DecodeHandler -> HeaderExchangeHandler -> requestHandler(ExchangeHandlerAdapter)server = Exchangers.bind(url, requestHandler);} catch (RemotingException e) {throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);}transporter = url.getParameter(CLIENT_KEY);if (StringUtils.isNotEmpty(transporter) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(transporter)) {throw new RpcException("Unsupported client type: " + transporter);}//包装为DubboProtocolServerDubboProtocolServer protocolServer = new DubboProtocolServer(server);//加载服务器配置,主要是配置dubbo.service.shutdown.wait属性,即服务器关闭等待超时时间,默认10000msloadServerProperties(protocolServer);return protocolServer;
}
3.1.1.1 Exchangers#bind创建交换器

该方法内部基于Dubbo SPI获取Exchanger,默认HeaderExchanger,然后调用HeaderExchanger#bind方法。

/*** Exchangers的方法** 创建交换服务器* @param url 服务提供者url* @param handler  请求处理器*/
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handler == null) {throw new IllegalArgumentException("handler == null");}//添加url属性codec,值为exchangeurl = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");//基于Dubbo SPI获取Exchanger,默认HeaderExchanger,然后调用HeaderExchanger#bind方法return getExchanger(url).bind(url, handler);
}

HeaderExchanger#bind方法中,首先对handler进行包装:DecodeHandler -> HeaderExchangeHandler -> requestHandler。

  1. DecodeHandler用于负责内部的dubbo协议的请求解码。
  2. HeaderExchangeHandler用于完成请求响应的映射。
  3. 用于nettyHandler真正处理请求。

随后调用Transporters#bind方法绑定并启动底层远程网络通信服务器,返回RemotingServer。Transporter是Dubbo对网络传输层的抽象接口,Exchanger依赖于Transporter。
**最后基于RemotingServer构建HeaderExchangeServer返回。

@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {//包装handler:DecodeHandler -> HeaderExchangeHandler -> handler//调用Transporters#bind方法绑定并启动底层远程网络通信服务器,返回RemotingServer//基于RemotingServer构建HeaderExchangeServer返回return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

Transporters#bind方法将会在handler的最外层继续包装一层ChannelHandlerDispatcher,它所有的 ChannelHandler 接口实现都会调用其中每个 ChannelHandler 元素的相应方法。随后基于Dubbo SPI机制获取Transporter的实现,并调用bind方法完成绑定,目前仅NettyTransporter,基于netty4。

public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handlers == null || handlers.length == 0) {throw new IllegalArgumentException("handlers == null");}//继续包装一层ChannelHandlerDispatcherChannelHandler handler;if (handlers.length == 1) {handler = handlers[0];} else {handler = new ChannelHandlerDispatcher(handlers);}//基于Dubbo SPI机制获取Transporter的实现,并调用bind方法完成绑定return getTransporter(url).bind(url, handler);
}
3.1.1.2 NettyTransporter#bind创建NettyServer

该方法很简单,就是根据url和handler创建一个NettyServer实例,在NettyServer的构造器中,会调用doOpen()开启服务,创建ServerBootstrap,设置EventLoopGroup,编配ChannelHandlerPipeline,最终调用bind()绑定本地端口。

@Override
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {//基于url和handler创建NettyServerreturn new NettyServer(url, handler);
}

NettyServer的构造器如下,将会调用父类构造器启动服务。

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREAD_POOL_KEY in CommonConstants.// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler//可通过CommonConstants中的THREAD_NAME_KEY和THREAD_POOL_KEY自定义客户端线程池的名称和类型//继续包装handler: MultiMessageHandler->HeartbeatHandler->handlersuper(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));// read config before destroy//获取服务器关闭等待超时时间,默认10000msserverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(getUrl().getOrDefaultModuleModel());
}

AbstractServer的构造器如下,将会获取绑定的ip和端口以及其他参数,然后调用doOpen方法真正的开启netty服务。最后还会为当前url构建线程池并存入executors。

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {super(url, handler);executorRepository = url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class).getDefaultExtension();localAddress = getUrl().toInetSocketAddress();//获取绑定的ipString bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());//获取绑定的端口int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());//如果url有anyhost=true参数或者ip是本地ip,那么设置绑定ip为0.0.0.0if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {bindIp = ANYHOST_VALUE;}//构建绑定hostbindAddress = new InetSocketAddress(bindIp, bindPort);//获取accepts参数,即最大可接受链接数,默认值0this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);try {/** 开启netty服务器*/doOpen();if (logger.isInfoEnabled()) {logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());}} catch (Throwable t) {throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);}//为当前url构建线程池并存入executorsexecutors.add(executorRepository.createExecutorIfAbsent(url));
}
3.1.1.3 doOpen初始化并启动netty服务器

该方法用于初始化并启动netty服务器,是非常标准的netty服务端启动代码。创建ServerBootstrap,设置bossGroup和workerGroup,编配ChannelHandler,最终调用bind()绑定本地端口。至此成功启动netty服务端,可以开始监听网络请求了。

/*** NettyServer的方法** 初始化并启动netty服务器*/
@Override
protected void doOpen() throws Throwable {//创建ServerBootstrap,说明这是一个netty服务端bootstrap = new ServerBootstrap();//创建bossGroupbossGroup = createBossGroup();//创建workerGroupworkerGroup = createWorkerGroup();//创建NettyServerHandlerfinal NettyServerHandler nettyServerHandler = createNettyServerHandler();//获取 worker channelchannels = nettyServerHandler.getChannels();//初始化ServerBootstrapinitServerBootstrap(nettyServerHandler);// bind//绑定ip:port,并且生成一个ChannelFuture,启动服务器,现在可以开始监听网络请求了ChannelFuture channelFuture = bootstrap.bind(getBindAddress());//让当前线程同步等待Netty server的close事件channelFuture.syncUninterruptibly();channel = channelFuture.channel();}protected EventLoopGroup createBossGroup() {//默认1个线程,线程名NettyServerBossreturn NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME);
}protected EventLoopGroup createWorkerGroup() {//默认Math.min(Runtime.getRuntime().availableProcessors() + 1, 32)个线程,可通过iothreads参数指定//线程名NettyServerWorkerreturn NettyEventLoopFactory.eventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),EVENT_LOOP_WORKER_POOL_NAME);
}protected NettyServerHandler createNettyServerHandler() {//创建NettyServerHandler,当前NettyServer对象本身也是一个ChannelHandler实例,其received方法委托给创建实例时传递的内部的handler处理return new NettyServerHandler(getUrl(), this);
}protected void initServerBootstrap(NettyServerHandler nettyServerHandler) {//从url参数keep.alive获取是否保持连接,默认falseboolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);//配置线程组bootstrap.group(bossGroup, workerGroup)//IO模型.channel(NettyEventLoopFactory.serverSocketChannelClass())//设置Socket 参数.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE).childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE).childOption(ChannelOption.SO_KEEPALIVE, keepalive).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//设置处理器.childHandler(new ChannelInitializer<SocketChannel>() {//用于添加ServerSocketChannel对应的 Handle@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// FIXME: should we use getTimeout()?int idleTimeout = UrlUtils.getIdleTimeout(getUrl());NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {ch.pipeline().addLast("negotiation", new SslServerTlsHandler(getUrl()));}//自定义客户端消息的业务处理逻辑Handlerch.pipeline()//解码.addLast("decoder", adapter.getDecoder())//编码.addLast("encoder", adapter.getEncoder())//心跳检测.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))//最后是此前创建的nettyServerHandler.addLast("handler", nettyServerHandler);}});
}

4 总结

对于远程导出就比较复杂,包括接口级注册中心的registry对应InterfaceCompatibleRegistryProtocol,应用级注册中心的service-discovery-registry对应RegistryProtocol。大概步骤如下:

  1. 需要从注册中心url的attributes属性中,获取真实的服务导出url,然后调用doLocalExport方法进行服务导出,该方法内部实际上就是重复前面的Protocol$Adaptive#export的过程。
    1. 此时,将会调用真实协议对应的Protocol实现,例如dubbo协议对应着DubboProtocol,而在这些协议的export方法中,除了构建Exportor加入exporterMap缓存之外,还会调用openServer方法,开启一个服务提供者端服务器,监听端口,这样就能接收consumer的远程调用请求。
    2. 同ip同端口(同一个dubbo服务端)的Dubbo应用中,多个Dubbo Service将会使用同一个服务器,即只有在第一次调用openServer的时候才会创建服务器。ip就是服务器的ip,端口就是20880端口。
    3. 创建服务器的时候,默认使用netty作为底层通信库,即创建一个netty服务端。
  2. 然后,就是通过Registry#register方法向远程注册中心注册服务提供者url,这部分的源码我们下回分解!

另外,其实我们发现,凡事涉及到需要依靠网络通信的框架,无论是RPC框架还是各种MQ框架等等,底层网络通信几乎都是依靠的Netty来支持。所以,虽然我们实际开发可能不会主动接触Netty,但是我们确实是一直都在使用它。

Netty作为一个的非常底层的网络通信框架,它在我们Java工程师的进阶过程中是非常重要的一个坎,直白的说,对于高级、资深Java工程师的面试特别有帮助,后续我将会免费的、开源的、具有深度的解析Netty的各种流程源码,欢迎大家关注!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/317446.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

多技术融合在生态系统服务功能社会价值评估中的应用及论文写作、拓展分析

生态系统服务是人类从自然界中获得的直接或间接惠益&#xff0c;可分为供给服务、文化服务、调节服务和支持服务4类&#xff0c;对提升人类福祉具有重大意义&#xff0c;且被视为连接社会与生态系统的桥梁。自从启动千年生态系统评估项目&#xff08;Millennium Ecosystem Asse…

DolphinScheduler实际应用

前言 最近公司新启动了一个项目&#xff0c;然后领导想用一下新技术&#xff0c;并且为公司提供多个大数据调度解决方案&#xff0c;我呢就根据领导要求调研了下当前的开源调度工具&#xff0c;最终决定采用DolphinScheduler&#xff0c; 因此研究了一下DolphinScheduler &…

Vue3+Three.js星空球体

结果图 threejs比例不变自适应 首先是设置相机和render&#xff0c;要注意的就是相机要加上aspect不然如果页面不是正方形看到的样式就会失调。 // ---------相机设置------------- // 实例化一个透视投影相机对象 const camera new THREE.PerspectiveCamera(); //相机在Thr…

Modbus转Profinet解决方案,轻松搭建工业通信“桥梁”

在工业自动化领域&#xff0c;Modbus和Profinet是两种常见的通信协议。由于许多现有的工业设备使用的是Modbus协议&#xff0c;而现代化的工业系统通常采用Profinet&#xff0c;所以将Modbus转换为Profinet成为了解决方案的一个重要需求。 Modbus转Profinet解决方案具体包括以下…

Minio集群部署(docker版本)

先在/etc/hosts中添加虚拟域名 {ip} minio1 {ip} minio2 执行docker命令 docker run -it -d --name minio-01 --restartalways --nethost \ -e "MINIO_ROOT_USER{用户名}" \ -e "MINIO_ROOT_PASSWORD{密码}" \ -v /data/docker/minio/update:/data1 #…

CMake入门教程【基础篇】CMake+Visual Studio2022构建C++项目

文章目录 1.概述2.Visual Studio 2022简介3.安装Visual Studio 20224.安装CMake5.创建CMake项目6. 构建项目 1.概述 CMake和Visual Studio 2022结合 在现代软件开发中&#xff0c;CMake和Visual Studio 2022的结合提供了一个强大的环境&#xff0c;用于构建和管理各种规模的C项…

ITSS服务工程师vs ITSS服务经理:哪个职位更适合你?

✨在信息技术服务领域&#xff0c;ITSS服务工程师和ITSS服务经理是两个极具吸引力的职位。但它们各自的特点和要求是什么&#xff1f;哪个更适合你的职业规划和个人兴趣&#xff1f;接下来&#xff0c;我们将为你详细解读这两个职位的区别&#xff0c;帮助你做出明智的选择&…

环境准备-VMware安装

照顾到很多人不是很会环境搭建,我这里会将搭建的步骤讲的细致点 第一步,VMware下载。目的是通过VMware搭建Linux服务器,因为大家大部分还是Windows的电脑,我们先下载虚拟机搭建一个Linux系统的服务器 下载完成之后,点击安装,如下: 点击“下一步” 勾选“我接受许可协议…

【解决|三方工具】导入 XChart 后提示丢失关于 TMPPro 工具引用

开发平台&#xff1a;Unity 2021 版本 插件版本&#xff1a;XChart 3.0&#xff1a;官方文档 - https://github.com/XCharts-Team/XCharts   问题描述 导入 XChart 插件至 Unity 中出现 目录&#xff1a;Component、Theme 等提示丢失 TMPPro&#xff08;TextMeshPro 工具&…

【数据结构和算法】寻找数组的中心下标

其他系列文章导航 Java基础合集数据结构与算法合集 设计模式合集 多线程合集 分布式合集 ES合集 文章目录 其他系列文章导航 文章目录 前言 一、题目描述 二、题解 2.1 前缀和的解题模板 2.1.1 最长递增子序列长度 2.1.2 寻找数组中第 k 大的元素 2.1.3 最长公共子序列…

什么是Maven ??? (以及关于依赖,中央仓库,国内源)

文章目录 什么是 Maven创建第一个 Maven 项目依赖管理Maven 的仓库Maven 如何设置国内源 什么是 Maven Maven &#xff1a;用于构建和管理任何基于java的项目的工具。**说白了就是管理 Java项目 的工具。**我们希望我们已经创建了一些东西&#xff0c;可以使Java开发人员的日常…

paddle v4 hubserving 部署

环境准备&#xff1a;https://github.com/PaddlePaddle/PaddleOCR/tree/release/2.7/deploy/hubserving#24-%E5%90%AF%E5%8A%A8%E6%9C%8D%E5%8A%A1 服务器启动命令 hub serving start -c deploy/hubserving/ocr_system/config.json客户端请求 python tools/test_hubserving.…