Dubbo分层设计之Exchange层

前言

Dubbo 框架采用分层设计,自上而下共分为十层。Exchange 层位于倒数第三层,它在 协议层 的下方、数据传输层的上方。
第一次看源码的时候,大家应该都会有一个疑问:都已经有 Transport 层了,为啥还要定义 Exchange 层?
Dubbo 这么做自然有它的原因,今天我们一起解开这个疑惑。

理解Exchange

Exchange 层也叫 数据交换层,它和数据传输层有什么区别呢?
Transport 层是对 Netty、Mina 的统一封装,用来做网络数据传输的。一次 RPC 调用在 Dubbo 看来,本质上也就是一次请求报文和响应报文的传输过程。这么一看,好像完全没必要再单独抽象出 Exchange 层嘛。但是我们忽略了一个事情,那就是 Transport 层并没有实现 请求-应答 消息交换模式。

一般来说,我们发起一次 RPC 调用以后,业务线程会阻塞,期望拿到一个服务端发来的结果,再继续往下走。
Transport 层只有一个 tcp 长连接,tcp 本身是没有 Request、Response 概念的。它只具备消息收发的能力,至于收发的消息是 Request 还是 Response 它是不知道的,消息的语义需要靠上层来定义,一般是在协议头用一个专门的比特位来标记。以 HTTP 协议为例,它是七层协议,在传输层看来,报文是不分 Request、Response 的,这完全靠 HTTP 服务器自行实现。

正因如此,Dubbo 要直接基于 tcp 来实现 RPC 调用,就得自己实现 Request-Response 模型。

设计实现

首先我们要清楚 Dubbo 的调用流程,才好去理解这些接口的作用。
客户端发送 Request 和收到 Response 的流程是这样的:
image.png
服务端处理请求的流程是这样的:
image.png

Exchanger

Dubbo Exchange 层的核心 SPI 接口是org.apache.dubbo.remoting.exchange.Exchanger ,同样也分别提供了bindconnect 方法供服务端和客户端使用。

@SPI(HeaderExchanger.NAME)
public interface Exchanger {@Adaptive({Constants.EXCHANGER_KEY})ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;@Adaptive({Constants.EXCHANGER_KEY})ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
}

它和 Transporter 主要区别是:Transporter Channel 处理器是 ChannelHandler 接口,只具备消息收发的能力。Exchanger Channel 处理器是 ExchangeHandler,它在前者的基础上增加了reply 能力,也就是对于一个 Request,服务端可以回复一个 Response,这就是 请求-应答 模型。

public interface ExchangeHandler extends ChannelHandler, TelnetHandler {CompletableFuture<Object> reply(ExchangeChannel channel, Object request) throws RemotingException;
}

至于 ExchangeServer 和 ExchangeClient,只是在 Transport 层的 RemotingServer、Client 上做了一些封装。

HeaderExchanger

Exchanger 官方只提供了一种实现:HeaderExchanger,因为是在 Transport 上层,所以是基于 Transporter 二次封装。主要是创建了 HeaderExchangeClient 和 HeaderExchangeServer,核心是 HeaderExchangeHandler 实现。

public class HeaderExchanger implements Exchanger {public static final String NAME = "header";@Overridepublic ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);}@Overridepublic ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));}
}

HeaderExchangeServer

HeaderExchangeServer 是对 RemotingServer 的二次封装,主要是把传输层的 Channel 封装成了交换层的 ExchangeChannel。

@Override
public Collection<ExchangeChannel> getExchangeChannels() {Collection<ExchangeChannel> exchangeChannels = new ArrayList<ExchangeChannel>();Collection<Channel> channels = server.getChannels();if (CollectionUtils.isNotEmpty(channels)) {for (Channel channel : channels) {exchangeChannels.add(HeaderExchangeChannel.getOrAddChannel(channel));}}return exchangeChannels;
}

HeaderExchangeClient

HeaderExchangeClient 是对传输层 Client 的二次封装,主要是把 Client 封装成了 HeaderExchangeChannel,实现了 Request-Response 语义。

public HeaderExchangeClient(Client client, boolean startTimer) {Assert.notNull(client, "Client can't be null");this.client = client;this.channel = new HeaderExchangeChannel(client);if (startTimer) {URL url = client.getUrl();startReconnectTask(url);startHeartBeatTask(url);}
}

HeaderExchangeHandler

HeaderExchangeHandler 封装了协议层传过来的 ExchangeHandler,重写了receivedsent 方法,实现了对 Request、Response 对象的处理。

消息发送时,它会把 Channel 封装成 HeaderExchangeChannel 再交给后续 handler 处理。

@Override
public void sent(Channel channel, Object message) throws RemotingException {Throwable exception = null;try {// 封装ChannelExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);handler.sent(exchangeChannel, message);} catch (Throwable t) {exception = t;HeaderExchangeChannel.removeChannelIfDisconnected(channel);}if (message instanceof Request) {Request request = (Request) message;// 记录发送时间DefaultFuture.sent(channel, request);}......
}

收到消息时,会针对 Request、Response 分别做处理。如果收到的是 Request,会调用业务 handler 执行业务逻辑,再返回结果。

void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {Response res = new Response(req.getId(), req.getVersion());Object msg = req.getData();try {// 调用业务handler、执行业务逻辑CompletionStage<Object> future = handler.reply(channel, msg);future.whenComplete((appResult, t) -> {try {if (t == null) {res.setStatus(Response.OK);res.setResult(appResult);} else {res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(t));}// 发送结果channel.send(res);} catch (RemotingException e) {logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);}});} catch (Throwable e) {res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(e));channel.send(res);}
}

如果收到的是 Response,说明是服务端对客户端请求的响应结果,则会给 DefaultFuture 设置 Result,唤醒业务线程。

static void handleResponse(Channel channel, Response response) throws RemotingException {if (response != null && !response.isHeartbeat()) {DefaultFuture.received(channel, response);}
}
public static void received(Channel channel, Response response, boolean timeout) {try {DefaultFuture future = FUTURES.remove(response.getId());if (future != null) {Timeout t = future.timeoutCheckTask;if (!timeout) {t.cancel();}// 设置结果,业务线程被唤醒future.doReceived(response);}} finally {CHANNELS.remove(response.getId());}
}

HeaderExchangeChannel

最后是 HeaderExchangeChannel,它是 交换层 ExchangeChannel 的实现。ExchangeChannel 是对传输层 Channel 的增强。Channel 只定义了send() 数据发送的能力,ExchangeChannel 增加了request() 支持发送 Request,拿到 Response。

public interface ExchangeChannel extends Channel {CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException;CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException;ExchangeHandler getExchangeHandler();@Overridevoid close(int timeout);
}

HeaderExchangeChannel 主要是对 Channel 的一个二次封装,它会把实例化自身并放到 Channel 属性里

static HeaderExchangeChannel getOrAddChannel(Channel ch) {if (ch == null) {return null;}HeaderExchangeChannel ret = (HeaderExchangeChannel) ch.getAttribute(CHANNEL_KEY);if (ret == null) {ret = new HeaderExchangeChannel(ch);if (ch.isConnected()) {ch.setAttribute(CHANNEL_KEY, ret);}}return ret;
}

自定义Exchange

Dubbo Exchanger 也可以基于 SPI 一键替换,我们实现一个自定义的 Exchanger,加深理解。
首先,我们新建一个模块dubbo-extension-exchange-custom,并引入依赖:

<dependencies><dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo-remoting-api</artifactId><version>${dubbo.version}</version></dependency>
</dependencies>

新建 dubbo.extension.remoting.exchange.CustomExchanger,重写 Exchanger 接口,返回我们自定义的 Server、Client 实现。

public class CustomExchanger implements Exchanger {public static final String NAME = "custom";@Overridepublic ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {return new CustomeExchangeServer(Transporters.bind(url, new DecodeHandler(new CustomExchangeHandler(handler))));}@Overridepublic ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {return new CustomExchangeClient(Transporters.connect(url, new DecodeHandler(new CustomExchangeHandler(handler))));}
}

CustomeExchangeServer 本身只是对 RemotingServer 的一个封装,核心是把 Channel 封装成 ExchangeChannel

public class CustomeExchangeServer extends RemotingServerDelegate {public CustomeExchangeServer(RemotingServer server) {super(server);}@Overrideprotected ExchangeChannel toExchangeChannel(Channel channel) {return CustomExchangeChannel.getOrAddChannel(channel);}
}

CustomExchangeClient 也只是为了把 Client 封装成 ExchangeChannel,让 传输层的 Channel 拥有request能力

public class CustomExchangeClient extends ClientDelegate {private final ExchangeChannel exchangeChannel;public CustomExchangeClient(Client client) {super(client);this.exchangeChannel = new CustomExchangeChannel(client);}@Overridepublic CompletableFuture<Object> request(Object request) throws RemotingException {return exchangeChannel.request(request);}......省略几个request()
}

CustomExchangeHandler 是对业务 ExchangeHandler 的封装,增加对 Request、Response 对象的处理

public class CustomExchangeHandler implements ChannelHandlerDelegate {private final ExchangeHandler handler;public CustomExchangeHandler(ExchangeHandler handler) {this.handler = handler;}@Overridepublic ChannelHandler getHandler() {return handler;}@Overridepublic void connected(Channel channel) throws RemotingException {handler.connected(toExchangeChannel(channel));}@Overridepublic void disconnected(Channel channel) throws RemotingException {handler.disconnected(toExchangeChannel(channel));}@Overridepublic void sent(Channel channel, Object message) throws RemotingException {handler.sent(toExchangeChannel(channel), message);}@Overridepublic void received(Channel channel, Object message) throws RemotingException {System.err.println("CustomExchangeHandler received:" + message);ExchangeChannel exchangeChannel = toExchangeChannel(channel);if (message instanceof Request) {handleRequest(exchangeChannel, (Request) message);} else if (message instanceof Response) {handleResponse(exchangeChannel, (Response) message);} else {handler.received(exchangeChannel, message);}}private void handleResponse(ExchangeChannel exchangeChannel, Response response) {DefaultFuture.received(exchangeChannel, response);}private void handleRequest(ExchangeChannel exchangeChannel, Request req) {try {Response res = new Response(req.getId(), req.getVersion());CompletableFuture<Object> future = handler.reply(exchangeChannel, req.getData());future.whenComplete((r, e) -> {if (e == null) {res.setStatus((byte) 20);res.setResult(r);} else {res.setStatus((byte) 70);res.setErrorMessage(e.getMessage());}try {exchangeChannel.send(res);} catch (Exception exception) {exception.printStackTrace();}});} catch (Exception e) {e.printStackTrace();}}@Overridepublic void caught(Channel channel, Throwable exception) throws RemotingException {handler.caught(toExchangeChannel(channel), exception);}private ExchangeChannel toExchangeChannel(Channel channel) {return CustomExchangeChannel.getOrAddChannel(channel);}
}

最后是 CustomExchangeChannel,它是对 Channel 的封装,增加了request的能力,发送请求后支持返回一个 CompletableFuture,并在收到响应后设置结果。

public class CustomExchangeChannel extends ExchangeChannelDelegate {private static final String CHANNEL_KEY = CustomExchangeChannel.class.getName() + ".CHANNEL";public CustomExchangeChannel(Channel channel) {super(channel);}@Overridepublic CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {if (isClosed()) {throw new RemotingException(this.getLocalAddress(), (InetSocketAddress) null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");}System.err.println("CustomExchangeChannel request:" + request);Request req = new Request();req.setVersion(Version.getProtocolVersion());req.setTwoWay(true);req.setData(request);DefaultFuture future = DefaultFuture.newFuture(this, req, timeout, executor);send(req);return future;}public static CustomExchangeChannel getOrAddChannel(Channel channel) {CustomExchangeChannel exchangeChannel = (CustomExchangeChannel) channel.getAttribute(CHANNEL_KEY);if (exchangeChannel == null) {channel.setAttribute(CHANNEL_KEY, exchangeChannel = new CustomExchangeChannel(channel));}return exchangeChannel;}
}

尾巴

Dubbo Exchange 层在 Transport 层之上实现了 Request-Response 模型。传输层只有一个 tcp 连接,只具备单纯的消息收发能力,对于消息收发的格式和语义是不关心的。tcp 没有 Request-Response 的概念,Dubbo 基于 tcp 长连接实现 RPC 调用,就必须自己实现一套 Request-Response 消息交换模型,Exchange 层就是对这套请求应答模型的抽象。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/413176.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

游戏分组 - 华为OD统一考试

OD统一考试 分值&#xff1a; 100分 题解&#xff1a; Java / Python / C 题目描述 部门准备举办一场王者荣耀表演赛&#xff0c;有 10 名游戏爱好者参与&#xff0c;分为两队&#xff0c;每队 5 人。 每位参与者都有一个评分&#xff0c;代表着他的游戏水平。为了表演赛尽可…

解密PGSQL数据库引擎:探索数据世界的秘密

目录 1、引言 1.1 什么是PGSQL数据库引擎 1.2 数据库引擎的重要性 1.3 解密PGSQL数据库引擎的意义 2、PGSQL数据库引擎的基础知识 2.1 什么是数据库引擎 2.2 PGSQL数据库引擎的历史和发展 2.3 PGSQL数据库引擎的特点和优势 2.4 PGSQL数据库引擎的架构和组件 3、PGSQL…

无法解析服务器的名称或地址/Wsl/0x80072eff/win10 WSL2问题解决Wsl 0x800701bc/Wsl:0x80041002

无法解析服务器的名称或地址 和 Wsl/0x80072eff 1.连VPN&#xff0c;推荐的VPN如下。(如一直显示无法连接&#xff0c;则推荐使用VPN) Anycast加速器 (any4ga.com) 优点&#xff1a;无限GB 缺点&#xff1a;较贵&#xff0c;通过银行卡充值9折后的价格是每月45元左右 …

如何获取一个德国容器

1.注册discord账号 discord注册网址:https://discord.com/ 下面是容器的discord邀请链接 https://discord.com/Discord邀请链接:https://discord.com/invite/jVMSWrchC4 2.进入discord群聊点击link 在点击网址,这个网址每星期都会变就是图中的② 3.进入容器网址,进入界面…

shell简单截取curl GET返回的body消息体

目录 需求背景&#xff1a; 示例&#xff1a; 解决方式&#xff1a; 需求背景&#xff1a; 用shell解析 curl命令GET到的消息体&#xff0c;获取body消息体里的某个字段的值,只是个简单的示例&#xff0c;可以在此基础上更改满足自己的需求 示例&#xff1a; curl一个API…

Jenkins-Pipeline

Pipeline 1 安装插件 2 新建一个 Pipline 工程 3 配置Pipeline 脚本 agent的使用可以参考这个文档 pipeline {agent anystages {stage(Build) { steps {echo Building project...}}stage(Test) { steps {echo Testing project...}}stage(Deploy) { steps {echo Deploying …

【安装VMware Tools】实现Vmware虚拟机和主机之间复制、粘贴内容、拖拽文件

https://www.bilibili.com/video/BV1rN411277B/?spm_id_from333.788.recommend_more_video.6&vd_sourcefb8dcae0aee3f1aab700c21099045395 https://blog.csdn.net/wxqian25/article/details/19406673 待解决方案&#xff1a; 重新下载ubuntu&#xff0c;然后按照 https://…

瑞_Java开发手册_(六)工程结构

文章目录 工程结构的意义(一) 应用分层(二) 二方库依赖(三) 服务器 &#x1f64a;前言&#xff1a;本文章为瑞_系列专栏之《Java开发手册》的工程结构篇&#xff0c;主要介绍应用分层、二方库依赖、服务器。由于博主是从阿里的《Java开发手册》学习到Java的编程规约&#xff0c…

美国智库发布《用人工智能展望网络未来》的解析

文章目录 前言一、人工智能未来可能改善网络安全的方式二、人工智能可能损害网络安全的方式三、人工智能使用的七条建议四、人工智能的应用和有效使用AI五、安全有效地使用人工智能制定具体建议六、展望网络未来的人工智能&#xff08;一&#xff09;提高防御者的效率&#xff…

c++基础3

一 、构造函数的初始化列表 可以指定成员对象的初始化方式 构造函数的初始化列表是在 C 中用于初始化成员变量的一种机制。它在构造函数的参数列表之后&#xff0c;构造函数的函数体之前使用&#xff0c;并使用冒号 : 分隔。初始化列表可以用于给成员变量赋初值&#xff0c;而不…

C#,入门教程(19)——循环语句(for,while,foreach)的基础知识

上一篇&#xff1a; C#&#xff0c;入门教程(18)——分支语句&#xff08;switch-case&#xff09;的基础知识https://blog.csdn.net/beijinghorn/article/details/124039953 一、for循环 当老师进入教室&#xff0c;从门口开始分别按行、列点名&#xff0c;看看哪位翘课&…

Elastic Stack(1):Elastic Stack简介

1 简介 ELK是一个免费开源的日志分析架构技术栈总称&#xff0c;官网https://www.elastic.co/cn。包含三大基础组件&#xff0c;分别是Elasticsearch、Logstash、Kibana。但实际上ELK不仅仅适用于日志分析&#xff0c;它还可以支持其它任何数据搜索、分析和收集的场景&#xf…