Netty 实现dubbo rpc

一、RPC 的基本介绍

  RPC (Remote Procedure Call) 远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外的为这个交互编程。也就是说可以达到两个或者多个应用程序部署在不同的服务器上,他们之间的调用都像是本地方法调用一样。RPC 的调用如下图。

常用的RPC 框架有阿里的dubbo,Google的gRPC,Go 语言的rpcx,Apache的thrift,Spring的Spring Cloud.

若想了解dubbo与Spring Cloud的区别参考:SpringCloud 与 Dubbo 的区别,终于有人讲明白了...-腾讯云开发者社区-腾讯云

二、RPC 调用的过程

在RPC 中,Client 端叫做服务消费者,Server 叫做服务提供者。

调用流程说明

  • 服务消费方(client)以本地调用方式调用服务
  • client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  • client stub 将消息进行编码并发送到服务端
  • server stub 接收到消息后进行解码
  • server stub 根据解码结果调用本地的服务
  • 本地服务执行并将结果返回给server stub
  • server stub 将返回导入结果进行编码并发送给消费方
  • client stub 接收到消息并进行解码
  • 服务消费方(client) 得到结果
  • 其中,RPC 框架的目标就是把2-8 这些步骤封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用。

三、dubbo RPC

1.需求说明

dubbo 底层使用了Netty 作为网络通信框架,要求用netty 实现一个简单的RPC框架。

模仿dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信给予Netty 4.x。

2.设计说明

创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。

创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。

创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用netty请求提供者返回数据。 开发的分析图如下:

3.代码实现

netty用的包:4.1.20.Final。pom.xml如下:

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.20.Final</version>
</dependency>

1)公共接口

/*** @author: fqtang* @date: 2024/05/05/21:51* @description: 服务提供方和服务消费方都需要*/
public interface HelloService {String say(String mes);
}

2)公共接口实现类

import org.springframework.util.StringUtils;
import com.tfq.netty.dubborpc.publicinterface.HelloService;/*** @author: fqtang* @date: 2024/05/05/21:53* @description: 描述*/
public class HelloServiceImpl implements HelloService {private static int count = 0;/*** 当有消费方调用该方法时就返回一个结果** @param mes 传入消息* @return 返回结果*/@Overridepublic String say(String mes) {System.out.println("收到客户端消息=" + mes);if(StringUtils.isEmpty(mes)) {return "你好客户端,我已经收到你的消息 ";}else{return "你好客户端,我已经收到你的消息:【" + mes+"】,第 "+(++count)+"次。";}}
}

3)服务提供者

import com.tfq.netty.dubborpc.netty.NettyServer;/*** @author: fqtang* @date: 2024/05/05/21:57* @description: 启动服务提供者,就是NettyServer*/
public class ServerBootstrap {public static void main(String[] args) {String hostName="127.0.0.1";int port = 8001;NettyServer.startServer(hostName,port);}}import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;/*** @author: fqtang* @date: 2024/05/05/21:59* @description: 描述*/
public class NettyServer {public static void startServer(String hostName,int port){startServer0(hostName,port);}/*** 编写一个方法,完成对Netty Server的初始化工作和启动* @param hostName* @param port*/private static void startServer0(String hostName,int port){EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try{ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new NettyServerHandler());}});ChannelFuture channelFuture = serverBootstrap.bind(hostName,port).sync();System.out.println("服务提供方开始提供服务~~~");channelFuture.channel().closeFuture().sync();}catch(Exception e){e.printStackTrace();}finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import com.tfq.netty.dubborpc.consumer.ClientBootstrap;
import com.tfq.netty.dubborpc.provider.HelloServiceImpl;/*** @author: fqtang* @date: 2024/05/05/22:03* @description: 描述*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//获取客户端调用的消息,并调用服务System.out.println("msg = " + msg);//客户端在调用服务器的时候,需要定义一个协议。比如我们要求每次发消息时,都必须以某个字符器开头//比如:dubboserver#hello#xxxxif(msg.toString().startsWith(ClientBootstrap.ProtocolHeader)) {String res = new HelloServiceImpl().say(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));ctx.writeAndFlush(res);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}

4)消费者

import com.tfq.netty.dubborpc.netty.NettyClient;
import com.tfq.netty.dubborpc.publicinterface.HelloService;/*** @author: fqtang* @date: 2024/05/05/23:26* @description: 消费者*/
public class ClientBootstrap {/*** 这里定义协议头*/public static final String ProtocolHeader = "dubboserver#say#";public static void main(String[] args) throws InterruptedException {//创建一个消费者NettyClient customer = new NettyClient();//创建代理对象HelloService helloService = (HelloService) customer.getBean(HelloService.class, ProtocolHeader);while(true) {Thread.sleep(10 * 1000);//通过代理对象调用提供者的方法(服务)String res = helloService.say("你好 dubbo~");System.out.println("调用的结果 res = " + res);}}
}import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;/*** @author: fqtang* @date: 2024/05/05/23:04* @description: 描述*/
public class NettyClient {//创建一个线程池private static ExecutorService executorService = Executors.newFixedThreadPool(2);private static NettyClientHandler clientHandler;/*** 编写方法使用代理模式,获取一个代理对象* @param serviceClass* @param protocolHeader* @return*/public Object getBean(final Class<?> serviceClass, final String protocolHeader) {return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),new Class<?>[]{serviceClass}, (proxy, method, args) -> {if(clientHandler == null) {initClient("127.0.0.1", 8001);}//设置要发送给服务器端的信息,protocolHeader为协议头[dubboserver#hello#],//args[0] 就是客户端调用api say(???),参数clientHandler.setParam(protocolHeader + args[0]);return executorService.submit(clientHandler).get();});}private static void initClient(String hostName, int port) {EventLoopGroup worker = new NioEventLoopGroup();try {clientHandler = new NettyClientHandler();Bootstrap bootstrap = new Bootstrap();bootstrap.group(worker).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline channelPipeline = ch.pipeline();channelPipeline.addLast(new StringDecoder());channelPipeline.addLast(new StringEncoder());channelPipeline.addLast(clientHandler);}});ChannelFuture channelFuture = bootstrap.connect(hostName, port).sync();/*channelFuture.channel().closeFuture().sync();*/} catch(InterruptedException e) {e.printStackTrace();} /*finally {worker.shutdownGracefully();}*/}
}package com.tfq.netty.dubborpc.netty;import java.util.concurrent.Callable;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;/*** @author: fqtang* @date: 2024/05/05/22:48* @description: 描述*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {private ChannelHandlerContext context;/*** 返回的结果*/private String result;/*** 客户端调用方法返回的参数*/private String param;/*** 与服务器的连接创建后,就会被调用,这个方法被第一个,调用(1)* @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//因为在其他方法会使用到这个ctxcontext = ctx;System.out.println("调用(1) channelActive--->连接到服务器");}/***  被调用(4)* 收到服务器的数据后,调用方法* @param ctx* @param msg* @throws Exception*/@Overridepublic synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {result = (String) msg;System.out.println("调用(4)channelRead--->从服务器读取到数据:"+result);//唤醒等待的线程notify();System.out.println("调用(4)channelRead---notify()---->从服务器读取到数据后唤醒线程.....");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}/*** 被调用(3), 被调用(5)* 被代理对象调用,发送数据给服务器,--->wait ---> 等待被唤醒 --->返回结果* @return* @throws Exception*/@Overridepublic synchronized Object call() throws Exception {context.writeAndFlush(param);System.out.println("调用(3) call()--->被代理对象调用,发送数据给服务器.....");//进行wait,等待channelRead 方法获取到服务器的结果后,唤醒wait();System.out.println("调用(5) call()--->wait() 等待channelRead 方法获取到服务器的结果后.....");return result;}/*** 被调用(2)* @param param*/void setParam(String param){System.out.println("调用(2) setParam()--->被代理对象调用,发送数据给服务器.....");this.param = param;}
}

若有问题请留言。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/681042.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

YOLOv5入门 | 重要性能衡量指标、训练结果评价及分析及影响mAP的因素

在深度学习目标检测领域&#xff0c;YOLOv5成为了备受关注的模型之一。训练结束后&#xff0c;对训练结果的仔细分析至关重要。这就涉及到了重要性能的衡量指标。本文将手把手教学如何进行YOLOv5的结果分析和重要性能指标的参考&#xff0c;以帮助您更好地学习深度学习目标检测…

操作系统原理与实验——实验十分段存储管理

实验指南 运行环境&#xff1a; Dev c 算法思想&#xff1a; 本实验是模拟分段存储管理&#xff0c;系统需要建立两张分区表&#xff0c;分别是已分配和未分配分区表&#xff0c;首先根据装入作业的大小判断是否小于空闲分区的总容量&#xff0c;若满足&#xff0c;则对该作业继…

自注意力架构大成者_Transformer(Pytorch 17)

1 模型简介 在上节比较了 卷积神经网络&#xff08;CNN&#xff09;、循环神经网络&#xff08;RNN&#xff09;和 自注意力&#xff08;self‐attention&#xff09;。值得注意的是&#xff0c; 自注意力同时具有并行计算和最短的最大路径长度这两个优势。因此&#xff0c;使…

数字工厂管理系统如何实现生产过程透明化

随着科技的飞速发展&#xff0c;数字化转型已成为制造业不可逆转的趋势。数字工厂管理系统作为实现生产自动化、智能化的重要工具&#xff0c;其在提升生产效率、降低运营成本、优化资源配置等方面的作用日益凸显。其中&#xff0c;实现生产过程的透明化是数字工厂管理系统的重…

Netty详解,含EventLoop、Channel、Handler、Pipeline和ByteBuf等组件详解(长文)

前言&#xff1a;本文是博主学习视频后所整理的笔记&#xff0c;用于回顾。 Netty中的工作原理&#xff1a;首先使用 Bootstrap/ServerBootstrap 启动器启动&#xff0c;通过使用包含了多个EventLoop 的 EventLoopGroup 去处理多组 Channel 的事件循环。而每个 Channel 是一个产…

Java设计模式 _结构型模式_外观模式

一、外观模式 1、外观模式 外观模式&#xff08;Facade Pattern&#xff09;是一种结构型模式。主要特点为隐藏系统的复杂性&#xff0c;并向客户端提供了一个客户端可以访问系统的接口。这有助于降低系统的复杂性&#xff0c;提高可维护性。当客户端与多个子系统之间存在大量…

46. UE5 RPG 增加角色受击反馈

在前面的文章中&#xff0c;我们实现了对敌人的属性的初始化&#xff0c;现在敌人也拥有的自己的属性值&#xff0c;技能击中敌人后&#xff0c;也能够实现血量的减少。 现在还需要的就是在技能击中敌人后&#xff0c;需要敌人进行一些击中反馈&#xff0c;比如敌人被技能击中后…

社交新时代:Facebook如何塑造我们的互动方式

在当今社交媒体充斥着人们日常生活的情况下&#xff0c;Facebook作为影响力最大的社交平台之一&#xff0c;已经深深地影响了我们的互动方式和社交行为。从初期的大学校园社交网络发展到如今的全球社交巨头&#xff0c;Facebook已经成为许多人日常生活中不可或缺的组成部分。本…

学习大数据,所需更要的shell基础(2)

文章目录 read读取控制台输入函数系统函数bashnamedirname 自定义函数Shell工具&#xff08;重点&#xff09;cutawk 正则表达式入门常规匹配常用特殊字符 read读取控制台输入 1&#xff09;基本语法 read (选项) (参数) ①选项&#xff1a; -p&#xff1a;指定读取值时的提示…

补强板大全

一&#xff0e;名词介绍&#xff1a; 补强板又叫Stiffeners&#xff0c;加强板&#xff0c;增强板&#xff0c;支撑板&#xff0c;保强板&#xff0c;裙托板&#xff0c;撑托板&#xff0c;托强板&#xff0c;加强筋。补强板主要用在建筑&#xff0c;石油管道&#xff0c;机工设…

STL 总结

STL 在 C 标准模板库&#xff08;STL&#xff09;中&#xff0c;主要包含了一系列的容器、迭代器、算法、函数对象、适配器。 容器 容器是用于存储数据的类模板。STL 容器可以分为序列型容器、关联型容器和链表型容器三类&#xff1a;序列型容器&#xff1a;vector、deque、…

网工路由基础——静态路由

一、静态路由的定义 静态路由是一种需要管理员手动配置的特殊路由。 二、静态路由的目的或背景 1&#xff09;当网络结构比较简单时&#xff0c;只需要配置静态路由就可以使网络正常工作&#xff1b; 2&#xff09;在复杂网络中&#xff0c;配置静态路由可以改进网络的性能&am…