基于Dubbo 3.1,详细介绍了Dubbo服务的发布与引用的源码。
此前我们学习了Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3),也就是Dubbo服务导出的核心方法doExportUrl的上半部分源码,现在我们继续学习,服务导出的核心方法doExportUrl的下半部分源码,也就是具体的远程服务协议导出。
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
- Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
- Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
- Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
文章目录
- 1 RegistryProtocol应用级远程服务导出协议
- 1.1 getRegistryUrl获取注册中心url
- 1.2 doLocalExport本地导出服务
- 2 InterfaceCompatibleRegistryProtocol接口级远程服务导出协议
- 2.1 getRegistryUrl获取注册中心url
- 3 DubboProtocol Dubbo协议导出
- 3.1 openServer开启服务器
- 3.1.1 createServer创建服务器
- 3.1.1.1 Exchangers#bind创建交换器
- 3.1.1.2 NettyTransporter#bind创建NettyServer
- 3.1.1.3 doOpen初始化并启动netty服务器
- 4 总结
1 RegistryProtocol应用级远程服务导出协议
应用级服务远程导出协议以service-discovery-registry开头,其对应的Protocol实现就是RegistryProtocol。
- 这里的Invoker中的url实际上是注册中心的url地址,例如service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=registry1&application=demo-provider&application.version=1&dubbo=2.0.2&pid=48272®istry=zookeeper&timeout=20001×tamp=1666601882084
- 真正的服务导出url被保存到attributes属性中,key为export,value例如dubbo://127.0.0.1:20880/org.apache.dubbo.demo.GreetingService?anyhost=true&application=demo-provider&application.version=1&background=false&bind.ip=127.0.0.1&bind.port=20880&delay=5000&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=greeting&interface=org.apache.dubbo.demo.GreetingService&methods=hello&pid=48272&revision=1.0.0&service-name-mapping=true&side=provider&timeout=5000×tamp=1666601909853&version=1.0.0
这个方法内容很多,大概步骤为:
- 创建OverrideListener存入overrideListeners集合,用于监听zk服务端configurators配置目录变更。若监听到有变更,则会内部的originInvoker进行reExport,即重新导出。
- 获取真实服务提供者url,调用doLocalExport方法进行服务导出,得到Exporter。
- 基于Dubbo SPI机制根据注册中心url加载具体的注册中心操作类,service-discovery-registry对应着ServiceDiscoveryRegistry
- 调用register方法向远程注册中心注册服务提供者url。
- 将Exporter包装为一个DestroyableExporter返回。
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {//获取注册中心url,实际上就是Invoker的url属性//service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=registry1&application=demo-provider&application.version=1&dubbo=2.0.2&pid=48272®istry=zookeeper&timeout=20001×tamp=1666601882084URL registryUrl = getRegistryUrl(originInvoker);// url to export locally 获取服务提供者导出url,实际上是Invoker的url属性内部的export属性//dubbo://127.0.0.1:20880/org.apache.dubbo.demo.GreetingService?anyhost=true&application=demo-provider&application.version=1&background=false&bind.ip=127.0.0.1&bind.port=20880&delay=5000&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=greeting&interface=org.apache.dubbo.demo.GreetingService&methods=hello&pid=48272&revision=1.0.0&service-name-mapping=true&side=provider&timeout=5000×tamp=1666601909853&version=1.0.0URL providerUrl = getProviderUrl(originInvoker);// Subscribe the override data// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call// the same service. Because the subscribed is cached key with the name of the service, it causes the// subscription information to cover.//根据服务提供者url获取订阅的url,将协议改为provider,添加category、configurators、check三个url参数,默认值false//provider://10.253.45.126:20880/org.apache.dubbo.demo.GreetingService?anyhost=true&application=demo-provider&application.version=1&background=false&bind.ip=10.253.45.126&bind.port=20880&category=configurators&check=false&delay=5000&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=greeting&interface=org.apache.dubbo.demo.GreetingService&methods=hello&pid=49884&revision=1.0.0&service-name-mapping=true&side=provider&timeout=5000×tamp=1666602580965&version=1.0.0final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);//创建OverrideListener这个url监听器final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);//获取ProviderConfigurationListener内部的overrideListenersMap<URL, NotifyListener> overrideListeners = getProviderConfigurationListener(providerUrl).getOverrideListeners();//将OverrideListener加入到overrideListeners,key为注册中心urloverrideListeners.put(registryUrl, overrideSubscribeListener);providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);//export invoker/** 1 根据真正providerUrl的协议来导出url*/final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);// url to registry//基于Dubbo SPI机制根据注册中心url加载具体的注册中心操作类,service-discovery-registry对应着ServiceDiscoveryRegistryfinal Registry registry = getRegistry(registryUrl);//获取需要注册的服务url//dubbo://192.168.31.84:20880/org.apache.dubbo.demo.GreetingService?anyhost=true&application=demo-provider&application.version=1&background=false&delay=5000&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=greeting&interface=org.apache.dubbo.demo.GreetingService&methods=hello&pid=10920&revision=1.0.0&service-name-mapping=true&side=provider&timeout=5000×tamp=1666621305141&version=1.0.0final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);// decide if we need to delay publish (provider itself and registry should both need to register)//决定我们是否需要延迟发布(提供者本身和注册中心都需要注册)boolean register = providerUrl.getParameter(REGISTER_KEY, true) && registryUrl.getParameter(REGISTER_KEY, true);/** 2 向注册中心注册服务*/if (register) {register(registry, registeredProviderUrl);}// register stated url on provider model//注册标准urlregisterStatedUrl(registryUrl, registeredProviderUrl, register);//设置注册中心url和订阅urlexporter.setRegisterUrl(registeredProviderUrl);exporter.setSubscribeUrl(overrideSubscribeUrl);//如果不支持服务发现if (!registry.isServiceDiscovery()) {// Deprecated! Subscribe to override rules in 2.6.x or before.//订阅符合条件的注册数据,并在注册数据发生更改时自动推送。新版本不再需要了registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);}//通知exporter,实际上是通知RegistryProtocolListenernotifyExport(exporter);//根据exporter构建为一个新的DestroyableExporter返回return new DestroyableExporter<>(exporter);
}
1.1 getRegistryUrl获取注册中心url
该方法并不是真正的注册中心协议,直接返回原服务发现协议url,例如:service-discovery-registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=registry1&application=demo-provider&application.version=1&dubbo=2.0.2&pid=48272®istry=zookeeper&timeout=20001×tamp=1666601882084
/*** RegistryProtocol的方法*/
protected URL getRegistryUrl(Invoker<?> originInvoker) {//直接返回原服务发现协议urlreturn originInvoker.getUrl();
}
1.2 doLocalExport本地导出服务
该方法根据指定的服务url进行服务导出。可以看到,该方法内部,将会根据真正的服务提供者url创建一个新的Invoker,然后调用Protocol#export方法导出为exporter,最后会存入bounds缓存中。这里所谓的localExport,并不是此前说的injvm协议的导出,而是区别于远程注册中心协议的其他协议的导出,例如dubbo协议导出。
这里的protocol同样是Protocol的自适应扩展实现,即Protocol$Adaptive,也就是说会根据传入的url中的protocol选择对应的Protocol SPI实现类,而默认实现就是dubbo协议,即DubboProtocol。
/*** RegistryProtocol的方法* <p>* 根据真正providerUrl的协议来导出url** @param originInvoker 可执行对象* @param providerUrl providerUrl的协议url*/
@SuppressWarnings("unchecked")
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {//获取缓存key,去除dynamic、enabled参数 dubbo://10.253.45.126:20880/org.apache.dubbo.demo.GreetingService?anyhost=true&application=demo-provider&application.version=1&background=false&bind.ip=10.253.45.126&bind.port=20880&delay=5000&deprecated=false&dubbo=2.0.2&generic=false&group=greeting&interface=org.apache.dubbo.demo.GreetingService&methods=hello&pid=56001&revision=1.0.0&service-name-mapping=true&side=provider&timeout=5000×tamp=1666605432012&version=1.0.0String key = getCacheKey(originInvoker);//将exporter存入bounds缓存中return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {//新建一个invokerInvoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);//这里的这里的protocol是Protocol的自适应扩展实现,即Protocol$Adaptive//也就是说会根据传入的url中的protocol选择对应的Protocol SPI实现类,而默认实现就是dubbo协议,即DubboProtocol。return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);});
}
2 InterfaceCompatibleRegistryProtocol接口级远程服务导出协议
接口级服务远程导出协议以registry开头,其对应的Protocol实现就是InterfaceCompatibleRegistryProtocol。
实际上InterfaceCompatibleRegistryProtocol继承了RegistryProtocol,大部分代码都是一样的,例如export导出协议的方法,仅仅是一些细微的代码不一致。我们来看看它在export过程中的特有的方法。
2.1 getRegistryUrl获取注册中心url
该方法获取注册中心url,如果是RegistryProtocol协议,则直接返回originInvoker的url,并不会还原,而InterfaceCompatibleRegistryProtocol则会进行处理、还原。
获取url中的registry属性,也就是真实的注册中心协议,例如zookeeper,默认dubbo,然后替换掉registry协议并返回。
/*** 获取注册中心url* @param originInvoker* @return*/
@Override
protected URL getRegistryUrl(Invoker<?> originInvoker) {//获取注册中心url,例如://registry://xxx.xxx.xxx.xxx:2181/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=registry1&application=demo-provider&application.version=1&dubbo=2.0.2&pid=38947®istry=zookeeper&timeout=20001×tamp=1666681960810URL registryUrl = originInvoker.getUrl();//如果协议是registry,即是接口级远程注册协议if (REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {//获取url中的registry属性,也就是真实的注册中心协议,例如zookeeper,默认dubboString protocol = registryUrl.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY);//使用真实的注册中心协议替换registry协议,并且删除url中的registry参数registryUrl = registryUrl.setProtocol(protocol).removeParameter(REGISTRY_KEY);}//替换后的协议//zookeeper://xxx.xxx.xxx.xxx:2181/org.apache.dubbo.registry.RegistryService?REGISTRY_CLUSTER=registry1&application=demo-provider&application.version=1&dubbo=2.0.2&pid=38947&timeout=20001×tamp=1666681960810return registryUrl;
}
3 DubboProtocol Dubbo协议导出
上面讲到RegistryProtocol内部的doLocalExport方法会对真实协议进行导出,假设我们的Service服务协议采用dubbo协议,export方法我们在此前就说过了,会经过层层的包装,Dubbo协议的导出也不例外。区别是DubboProtocol还会进行网络通信处理。
这里我们直接看最底层DubboProtocol的export方法,它的大概步骤为:
- 根据服务提供者url构建服务key,例如:org.apache.dubbo.demo.DemoService:20880。
- 根据invoker,key构建一个DubboExporter,然后将key - invoker,存入到exporterMap这个缓存中,后续调用时,将会从exporterMap找到Exporter,然后找到Invoker进行调用。
- 导出用于调度事件的存根服务。
- 调用openServer方法,开启服务提供者端服务器,监听端口,这样就能接收consumer的远程调用请求。
- 调用optimizeSerialization方法,优化序列化。
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {checkDestroyed();//获取服务提供者url//例如: dubbo://192.168.31.84:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&application.version=1&background=false&bind.ip=192.168.31.84&bind.port=20880&delay=5000&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=20496&service-name-mapping=true&side=provider&timeout=3000×tamp=1666619245421URL url = invoker.getUrl();/** 1 导出服务*/// export service.//构建服务key,例如 org.apache.dubbo.demo.DemoService:20880String key = serviceKey(url);//根据invoker,key构建一个DubboExporter//然后将key - invoker,存入到exporterMap这个缓存中,后续调用时,将会从exporterMap找到Exporter,然后找到Invoker进行调用。DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);//export a stub service for dispatching event//导出用于调度事件的存根服务boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);boolean isCallbackService = url.getParameter(IS_CALLBACK_SERVICE, false);if (isStubSupportEvent && !isCallbackService) {String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);if (stubServiceMethods == null || stubServiceMethods.length() == 0) {if (logger.isWarnEnabled()) {logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +"], has set stub proxy support event ,but no stub methods founded."));}}}/** 2 开启服务提供者端服务器*/openServer(url);/** 3 优化序列化*/optimizeSerialization(url);return exporter;
}
3.1 openServer开启服务器
开启服务提供者端服务器,监听端口,这样就能接收consumer的远程调用请求。
从服务缓存serverMap获取协议服务ProtocolServer,serverMap缓存key为url的address,因此同服务器同端口的多个Dubbo Service将会使用同一个key,即一般情况下,一个服务实例中的所有服务接口使用同一个服务器。
如果没有服务器那么调用createServer创建,否则调用reset重置,重置实际上就是更新Server的一些配置信息以及url参数信息。
/*** DubboProtocol的方法* <p>* 开启服务端* @param url 服务提供者url*/
private void openServer(URL url) {checkDestroyed();// find server.//获取服务器key,就是url的address、,例如:192.168.31.84:20880String key = url.getAddress();// client can export a service which only for server to invokeboolean isServer = url.getParameter(IS_SERVER_KEY, true);if (isServer) {//从服务map缓存获取协议服务ProtocolServer,同服务器同端口的多个Dubbo Service将会使用同一个keyProtocolServer server = serverMap.get(key);if (server == null) {//加锁synchronized (this) {//双重检测server = serverMap.get(key);if (server == null) {//创建一个ProtocolServer存入进去serverMap.put(key, createServer(url));return;}}}// server supports reset, use together with override//如果已经有了ProtocolServer,那么重置server.reset(url);}
}
3.1.1 createServer创建服务器
该方法创建生产者网络通信服务器,即dubbo提供者端网络服务,默认采用netty作为底层网络通信库。
该方法的核心就是Exchangers#bind方法,该方法默认通过HeaderExchanger创建HeaderExchangeServer,HeaderExchangeServer内部包含基于NettyTransporter创建的NettyServer,NettyServer内部包含url和ChannelHandler,创建NettyServer时构造器内部会调用doOpen方法绑定端口并启动netty服务器,而ChannelHandler的包含关系为:ChannelHandlerDispatcher -> DecodeHandler -> HeaderExchangeHandler -> requestHandler(ExchangeHandlerAdapter)。
/*** DubboProtocol的方法* 创建服务器** @param url 服务提供者url* @return 协议服务器*/
private ProtocolServer createServer(URL url) {//添加url参数url = URLBuilder.from(url)// send readonly event when server closes, it's enabled by default 当服务器关闭时发送只读事件,默认情况下是启用的.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())// enable heartbeat by default 默认启用心跳.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)).addParameter(CODEC_KEY, DubboCodec.NAME) //编解码器 dubbo.build();//获取指定的网络传输器,dubbo还提供了netty、grizzly、mina三种,默认dubboString transporter = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);//如果没有该传输器的扩展,那么抛出异常,目前仅提供了NettyTransporterif (StringUtils.isNotEmpty(transporter) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(transporter)) {throw new RpcException("Unsupported server type: " + transporter + ", url: " + url);}//创建ExchangeServer交换器,启动NettyServerExchangeServer server;try {//默认通过HeaderExchanger创建HeaderExchangeServer//HeaderExchangeServer内部包含基于NettyTransporter创建的NettyServer//NettyServer内部包含url和ChannelHandler,ChannelHandlerDispatcher -> DecodeHandler -> HeaderExchangeHandler -> requestHandler(ExchangeHandlerAdapter)server = Exchangers.bind(url, requestHandler);} catch (RemotingException e) {throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);}transporter = url.getParameter(CLIENT_KEY);if (StringUtils.isNotEmpty(transporter) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(transporter)) {throw new RpcException("Unsupported client type: " + transporter);}//包装为DubboProtocolServerDubboProtocolServer protocolServer = new DubboProtocolServer(server);//加载服务器配置,主要是配置dubbo.service.shutdown.wait属性,即服务器关闭等待超时时间,默认10000msloadServerProperties(protocolServer);return protocolServer;
}
3.1.1.1 Exchangers#bind创建交换器
该方法内部基于Dubbo SPI获取Exchanger,默认HeaderExchanger,然后调用HeaderExchanger#bind方法。
/*** Exchangers的方法** 创建交换服务器* @param url 服务提供者url* @param handler 请求处理器*/
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handler == null) {throw new IllegalArgumentException("handler == null");}//添加url属性codec,值为exchangeurl = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");//基于Dubbo SPI获取Exchanger,默认HeaderExchanger,然后调用HeaderExchanger#bind方法return getExchanger(url).bind(url, handler);
}
HeaderExchanger#bind方法中,首先对handler进行包装:DecodeHandler -> HeaderExchangeHandler -> requestHandler。
- DecodeHandler用于负责内部的dubbo协议的请求解码。
- HeaderExchangeHandler用于完成请求响应的映射。
- 用于nettyHandler真正处理请求。
随后调用Transporters#bind方法绑定并启动底层远程网络通信服务器,返回RemotingServer。Transporter是Dubbo对网络传输层的抽象接口,Exchanger依赖于Transporter。
**最后基于RemotingServer构建HeaderExchangeServer返回。
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {//包装handler:DecodeHandler -> HeaderExchangeHandler -> handler//调用Transporters#bind方法绑定并启动底层远程网络通信服务器,返回RemotingServer//基于RemotingServer构建HeaderExchangeServer返回return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
Transporters#bind方法将会在handler的最外层继续包装一层ChannelHandlerDispatcher,它所有的 ChannelHandler 接口实现都会调用其中每个 ChannelHandler 元素的相应方法。随后基于Dubbo SPI机制获取Transporter的实现,并调用bind方法完成绑定,目前仅NettyTransporter,基于netty4。
public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handlers == null || handlers.length == 0) {throw new IllegalArgumentException("handlers == null");}//继续包装一层ChannelHandlerDispatcherChannelHandler handler;if (handlers.length == 1) {handler = handlers[0];} else {handler = new ChannelHandlerDispatcher(handlers);}//基于Dubbo SPI机制获取Transporter的实现,并调用bind方法完成绑定return getTransporter(url).bind(url, handler);
}
3.1.1.2 NettyTransporter#bind创建NettyServer
该方法很简单,就是根据url和handler创建一个NettyServer实例,在NettyServer的构造器中,会调用doOpen()开启服务,创建ServerBootstrap,设置EventLoopGroup,编配ChannelHandlerPipeline,最终调用bind()绑定本地端口。
@Override
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {//基于url和handler创建NettyServerreturn new NettyServer(url, handler);
}
NettyServer的构造器如下,将会调用父类构造器启动服务。
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREAD_POOL_KEY in CommonConstants.// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler//可通过CommonConstants中的THREAD_NAME_KEY和THREAD_POOL_KEY自定义客户端线程池的名称和类型//继续包装handler: MultiMessageHandler->HeartbeatHandler->handlersuper(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));// read config before destroy//获取服务器关闭等待超时时间,默认10000msserverShutdownTimeoutMills = ConfigurationUtils.getServerShutdownTimeout(getUrl().getOrDefaultModuleModel());
}
AbstractServer的构造器如下,将会获取绑定的ip和端口以及其他参数,然后调用doOpen方法真正的开启netty服务。最后还会为当前url构建线程池并存入executors。
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {super(url, handler);executorRepository = url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class).getDefaultExtension();localAddress = getUrl().toInetSocketAddress();//获取绑定的ipString bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());//获取绑定的端口int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());//如果url有anyhost=true参数或者ip是本地ip,那么设置绑定ip为0.0.0.0if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {bindIp = ANYHOST_VALUE;}//构建绑定hostbindAddress = new InetSocketAddress(bindIp, bindPort);//获取accepts参数,即最大可接受链接数,默认值0this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);try {/** 开启netty服务器*/doOpen();if (logger.isInfoEnabled()) {logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());}} catch (Throwable t) {throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);}//为当前url构建线程池并存入executorsexecutors.add(executorRepository.createExecutorIfAbsent(url));
}
3.1.1.3 doOpen初始化并启动netty服务器
该方法用于初始化并启动netty服务器,是非常标准的netty服务端启动代码。创建ServerBootstrap,设置bossGroup和workerGroup,编配ChannelHandler,最终调用bind()绑定本地端口。至此成功启动netty服务端,可以开始监听网络请求了。
/*** NettyServer的方法** 初始化并启动netty服务器*/
@Override
protected void doOpen() throws Throwable {//创建ServerBootstrap,说明这是一个netty服务端bootstrap = new ServerBootstrap();//创建bossGroupbossGroup = createBossGroup();//创建workerGroupworkerGroup = createWorkerGroup();//创建NettyServerHandlerfinal NettyServerHandler nettyServerHandler = createNettyServerHandler();//获取 worker channelchannels = nettyServerHandler.getChannels();//初始化ServerBootstrapinitServerBootstrap(nettyServerHandler);// bind//绑定ip:port,并且生成一个ChannelFuture,启动服务器,现在可以开始监听网络请求了ChannelFuture channelFuture = bootstrap.bind(getBindAddress());//让当前线程同步等待Netty server的close事件channelFuture.syncUninterruptibly();channel = channelFuture.channel();}protected EventLoopGroup createBossGroup() {//默认1个线程,线程名NettyServerBossreturn NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME);
}protected EventLoopGroup createWorkerGroup() {//默认Math.min(Runtime.getRuntime().availableProcessors() + 1, 32)个线程,可通过iothreads参数指定//线程名NettyServerWorkerreturn NettyEventLoopFactory.eventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),EVENT_LOOP_WORKER_POOL_NAME);
}protected NettyServerHandler createNettyServerHandler() {//创建NettyServerHandler,当前NettyServer对象本身也是一个ChannelHandler实例,其received方法委托给创建实例时传递的内部的handler处理return new NettyServerHandler(getUrl(), this);
}protected void initServerBootstrap(NettyServerHandler nettyServerHandler) {//从url参数keep.alive获取是否保持连接,默认falseboolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);//配置线程组bootstrap.group(bossGroup, workerGroup)//IO模型.channel(NettyEventLoopFactory.serverSocketChannelClass())//设置Socket 参数.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE).childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE).childOption(ChannelOption.SO_KEEPALIVE, keepalive).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//设置处理器.childHandler(new ChannelInitializer<SocketChannel>() {//用于添加ServerSocketChannel对应的 Handle@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// FIXME: should we use getTimeout()?int idleTimeout = UrlUtils.getIdleTimeout(getUrl());NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {ch.pipeline().addLast("negotiation", new SslServerTlsHandler(getUrl()));}//自定义客户端消息的业务处理逻辑Handlerch.pipeline()//解码.addLast("decoder", adapter.getDecoder())//编码.addLast("encoder", adapter.getEncoder())//心跳检测.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))//最后是此前创建的nettyServerHandler.addLast("handler", nettyServerHandler);}});
}
4 总结
对于远程导出就比较复杂,包括接口级注册中心的registry对应InterfaceCompatibleRegistryProtocol,应用级注册中心的service-discovery-registry对应RegistryProtocol。大概步骤如下:
- 需要从注册中心url的attributes属性中,获取真实的服务导出url,然后调用doLocalExport方法进行服务导出,该方法内部实际上就是重复前面的Protocol$Adaptive#export的过程。
- 此时,将会调用真实协议对应的Protocol实现,例如dubbo协议对应着DubboProtocol,而在这些协议的export方法中,除了构建Exportor加入exporterMap缓存之外,还会调用openServer方法,开启一个服务提供者端服务器,监听端口,这样就能接收consumer的远程调用请求。
- 同ip同端口(同一个dubbo服务端)的Dubbo应用中,多个Dubbo Service将会使用同一个服务器,即只有在第一次调用openServer的时候才会创建服务器。ip就是服务器的ip,端口就是20880端口。
- 创建服务器的时候,默认使用netty作为底层通信库,即创建一个netty服务端。
- 然后,就是通过Registry#register方法向远程注册中心注册服务提供者url,这部分的源码我们下回分解!
另外,其实我们发现,凡事涉及到需要依靠网络通信的框架,无论是RPC框架还是各种MQ框架等等,底层网络通信几乎都是依靠的Netty来支持。所以,虽然我们实际开发可能不会主动接触Netty,但是我们确实是一直都在使用它。
Netty作为一个的非常底层的网络通信框架,它在我们Java工程师的进阶过程中是非常重要的一个坎,直白的说,对于高级、资深Java工程师的面试特别有帮助,后续我将会免费的、开源的、具有深度的解析Netty的各种流程源码,欢迎大家关注!