前言
Dubbo 框架采用分层设计,自上而下共分为十层。Exchange 层位于倒数第三层,它在 协议层 的下方、数据传输层的上方。
第一次看源码的时候,大家应该都会有一个疑问:都已经有 Transport 层了,为啥还要定义 Exchange 层?
Dubbo 这么做自然有它的原因,今天我们一起解开这个疑惑。
理解Exchange
Exchange 层也叫 数据交换层,它和数据传输层有什么区别呢?
Transport 层是对 Netty、Mina 的统一封装,用来做网络数据传输的。一次 RPC 调用在 Dubbo 看来,本质上也就是一次请求报文和响应报文的传输过程。这么一看,好像完全没必要再单独抽象出 Exchange 层嘛。但是我们忽略了一个事情,那就是 Transport 层并没有实现 请求-应答 消息交换模式。
一般来说,我们发起一次 RPC 调用以后,业务线程会阻塞,期望拿到一个服务端发来的结果,再继续往下走。
Transport 层只有一个 tcp 长连接,tcp 本身是没有 Request、Response 概念的。它只具备消息收发的能力,至于收发的消息是 Request 还是 Response 它是不知道的,消息的语义需要靠上层来定义,一般是在协议头用一个专门的比特位来标记。以 HTTP 协议为例,它是七层协议,在传输层看来,报文是不分 Request、Response 的,这完全靠 HTTP 服务器自行实现。
正因如此,Dubbo 要直接基于 tcp 来实现 RPC 调用,就得自己实现 Request-Response 模型。
设计实现
首先我们要清楚 Dubbo 的调用流程,才好去理解这些接口的作用。
客户端发送 Request 和收到 Response 的流程是这样的:
服务端处理请求的流程是这样的:
Exchanger
Dubbo Exchange 层的核心 SPI 接口是org.apache.dubbo.remoting.exchange.Exchanger
,同样也分别提供了bind
和connect
方法供服务端和客户端使用。
@SPI(HeaderExchanger.NAME)
public interface Exchanger {@Adaptive({Constants.EXCHANGER_KEY})ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;@Adaptive({Constants.EXCHANGER_KEY})ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
}
它和 Transporter 主要区别是:Transporter Channel 处理器是 ChannelHandler 接口,只具备消息收发的能力。Exchanger Channel 处理器是 ExchangeHandler,它在前者的基础上增加了reply
能力,也就是对于一个 Request,服务端可以回复一个 Response,这就是 请求-应答 模型。
public interface ExchangeHandler extends ChannelHandler, TelnetHandler {CompletableFuture<Object> reply(ExchangeChannel channel, Object request) throws RemotingException;
}
至于 ExchangeServer 和 ExchangeClient,只是在 Transport 层的 RemotingServer、Client 上做了一些封装。
HeaderExchanger
Exchanger 官方只提供了一种实现:HeaderExchanger,因为是在 Transport 上层,所以是基于 Transporter 二次封装。主要是创建了 HeaderExchangeClient 和 HeaderExchangeServer,核心是 HeaderExchangeHandler 实现。
public class HeaderExchanger implements Exchanger {public static final String NAME = "header";@Overridepublic ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);}@Overridepublic ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));}
}
HeaderExchangeServer
HeaderExchangeServer 是对 RemotingServer 的二次封装,主要是把传输层的 Channel 封装成了交换层的 ExchangeChannel。
@Override
public Collection<ExchangeChannel> getExchangeChannels() {Collection<ExchangeChannel> exchangeChannels = new ArrayList<ExchangeChannel>();Collection<Channel> channels = server.getChannels();if (CollectionUtils.isNotEmpty(channels)) {for (Channel channel : channels) {exchangeChannels.add(HeaderExchangeChannel.getOrAddChannel(channel));}}return exchangeChannels;
}
HeaderExchangeClient
HeaderExchangeClient 是对传输层 Client 的二次封装,主要是把 Client 封装成了 HeaderExchangeChannel,实现了 Request-Response 语义。
public HeaderExchangeClient(Client client, boolean startTimer) {Assert.notNull(client, "Client can't be null");this.client = client;this.channel = new HeaderExchangeChannel(client);if (startTimer) {URL url = client.getUrl();startReconnectTask(url);startHeartBeatTask(url);}
}
HeaderExchangeHandler
HeaderExchangeHandler 封装了协议层传过来的 ExchangeHandler,重写了received
和sent
方法,实现了对 Request、Response 对象的处理。
消息发送时,它会把 Channel 封装成 HeaderExchangeChannel 再交给后续 handler 处理。
@Override
public void sent(Channel channel, Object message) throws RemotingException {Throwable exception = null;try {// 封装ChannelExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);handler.sent(exchangeChannel, message);} catch (Throwable t) {exception = t;HeaderExchangeChannel.removeChannelIfDisconnected(channel);}if (message instanceof Request) {Request request = (Request) message;// 记录发送时间DefaultFuture.sent(channel, request);}......
}
收到消息时,会针对 Request、Response 分别做处理。如果收到的是 Request,会调用业务 handler 执行业务逻辑,再返回结果。
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {Response res = new Response(req.getId(), req.getVersion());Object msg = req.getData();try {// 调用业务handler、执行业务逻辑CompletionStage<Object> future = handler.reply(channel, msg);future.whenComplete((appResult, t) -> {try {if (t == null) {res.setStatus(Response.OK);res.setResult(appResult);} else {res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(t));}// 发送结果channel.send(res);} catch (RemotingException e) {logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);}});} catch (Throwable e) {res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(e));channel.send(res);}
}
如果收到的是 Response,说明是服务端对客户端请求的响应结果,则会给 DefaultFuture 设置 Result,唤醒业务线程。
static void handleResponse(Channel channel, Response response) throws RemotingException {if (response != null && !response.isHeartbeat()) {DefaultFuture.received(channel, response);}
}
public static void received(Channel channel, Response response, boolean timeout) {try {DefaultFuture future = FUTURES.remove(response.getId());if (future != null) {Timeout t = future.timeoutCheckTask;if (!timeout) {t.cancel();}// 设置结果,业务线程被唤醒future.doReceived(response);}} finally {CHANNELS.remove(response.getId());}
}
HeaderExchangeChannel
最后是 HeaderExchangeChannel,它是 交换层 ExchangeChannel 的实现。ExchangeChannel 是对传输层 Channel 的增强。Channel 只定义了send()
数据发送的能力,ExchangeChannel 增加了request()
支持发送 Request,拿到 Response。
public interface ExchangeChannel extends Channel {CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException;CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException;ExchangeHandler getExchangeHandler();@Overridevoid close(int timeout);
}
HeaderExchangeChannel 主要是对 Channel 的一个二次封装,它会把实例化自身并放到 Channel 属性里
static HeaderExchangeChannel getOrAddChannel(Channel ch) {if (ch == null) {return null;}HeaderExchangeChannel ret = (HeaderExchangeChannel) ch.getAttribute(CHANNEL_KEY);if (ret == null) {ret = new HeaderExchangeChannel(ch);if (ch.isConnected()) {ch.setAttribute(CHANNEL_KEY, ret);}}return ret;
}
自定义Exchange
Dubbo Exchanger 也可以基于 SPI 一键替换,我们实现一个自定义的 Exchanger,加深理解。
首先,我们新建一个模块dubbo-extension-exchange-custom
,并引入依赖:
<dependencies><dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo-remoting-api</artifactId><version>${dubbo.version}</version></dependency>
</dependencies>
新建 dubbo.extension.remoting.exchange.CustomExchanger,重写 Exchanger 接口,返回我们自定义的 Server、Client 实现。
public class CustomExchanger implements Exchanger {public static final String NAME = "custom";@Overridepublic ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {return new CustomeExchangeServer(Transporters.bind(url, new DecodeHandler(new CustomExchangeHandler(handler))));}@Overridepublic ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {return new CustomExchangeClient(Transporters.connect(url, new DecodeHandler(new CustomExchangeHandler(handler))));}
}
CustomeExchangeServer 本身只是对 RemotingServer 的一个封装,核心是把 Channel 封装成 ExchangeChannel
public class CustomeExchangeServer extends RemotingServerDelegate {public CustomeExchangeServer(RemotingServer server) {super(server);}@Overrideprotected ExchangeChannel toExchangeChannel(Channel channel) {return CustomExchangeChannel.getOrAddChannel(channel);}
}
CustomExchangeClient 也只是为了把 Client 封装成 ExchangeChannel,让 传输层的 Channel 拥有request
能力
public class CustomExchangeClient extends ClientDelegate {private final ExchangeChannel exchangeChannel;public CustomExchangeClient(Client client) {super(client);this.exchangeChannel = new CustomExchangeChannel(client);}@Overridepublic CompletableFuture<Object> request(Object request) throws RemotingException {return exchangeChannel.request(request);}......省略几个request()
}
CustomExchangeHandler 是对业务 ExchangeHandler 的封装,增加对 Request、Response 对象的处理
public class CustomExchangeHandler implements ChannelHandlerDelegate {private final ExchangeHandler handler;public CustomExchangeHandler(ExchangeHandler handler) {this.handler = handler;}@Overridepublic ChannelHandler getHandler() {return handler;}@Overridepublic void connected(Channel channel) throws RemotingException {handler.connected(toExchangeChannel(channel));}@Overridepublic void disconnected(Channel channel) throws RemotingException {handler.disconnected(toExchangeChannel(channel));}@Overridepublic void sent(Channel channel, Object message) throws RemotingException {handler.sent(toExchangeChannel(channel), message);}@Overridepublic void received(Channel channel, Object message) throws RemotingException {System.err.println("CustomExchangeHandler received:" + message);ExchangeChannel exchangeChannel = toExchangeChannel(channel);if (message instanceof Request) {handleRequest(exchangeChannel, (Request) message);} else if (message instanceof Response) {handleResponse(exchangeChannel, (Response) message);} else {handler.received(exchangeChannel, message);}}private void handleResponse(ExchangeChannel exchangeChannel, Response response) {DefaultFuture.received(exchangeChannel, response);}private void handleRequest(ExchangeChannel exchangeChannel, Request req) {try {Response res = new Response(req.getId(), req.getVersion());CompletableFuture<Object> future = handler.reply(exchangeChannel, req.getData());future.whenComplete((r, e) -> {if (e == null) {res.setStatus((byte) 20);res.setResult(r);} else {res.setStatus((byte) 70);res.setErrorMessage(e.getMessage());}try {exchangeChannel.send(res);} catch (Exception exception) {exception.printStackTrace();}});} catch (Exception e) {e.printStackTrace();}}@Overridepublic void caught(Channel channel, Throwable exception) throws RemotingException {handler.caught(toExchangeChannel(channel), exception);}private ExchangeChannel toExchangeChannel(Channel channel) {return CustomExchangeChannel.getOrAddChannel(channel);}
}
最后是 CustomExchangeChannel,它是对 Channel 的封装,增加了request
的能力,发送请求后支持返回一个 CompletableFuture,并在收到响应后设置结果。
public class CustomExchangeChannel extends ExchangeChannelDelegate {private static final String CHANNEL_KEY = CustomExchangeChannel.class.getName() + ".CHANNEL";public CustomExchangeChannel(Channel channel) {super(channel);}@Overridepublic CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {if (isClosed()) {throw new RemotingException(this.getLocalAddress(), (InetSocketAddress) null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");}System.err.println("CustomExchangeChannel request:" + request);Request req = new Request();req.setVersion(Version.getProtocolVersion());req.setTwoWay(true);req.setData(request);DefaultFuture future = DefaultFuture.newFuture(this, req, timeout, executor);send(req);return future;}public static CustomExchangeChannel getOrAddChannel(Channel channel) {CustomExchangeChannel exchangeChannel = (CustomExchangeChannel) channel.getAttribute(CHANNEL_KEY);if (exchangeChannel == null) {channel.setAttribute(CHANNEL_KEY, exchangeChannel = new CustomExchangeChannel(channel));}return exchangeChannel;}
}
尾巴
Dubbo Exchange 层在 Transport 层之上实现了 Request-Response 模型。传输层只有一个 tcp 连接,只具备单纯的消息收发能力,对于消息收发的格式和语义是不关心的。tcp 没有 Request-Response 的概念,Dubbo 基于 tcp 长连接实现 RPC 调用,就必须自己实现一套 Request-Response 消息交换模型,Exchange 层就是对这套请求应答模型的抽象。