微服务 云原生:gRPC 客户端、服务端的通信原理

gRPC Hello World

在这里插入图片描述
protoc 是 Protobuf 的核心工具,用于编写 .proto 文件并生成 protobuf 代码。在这里,以 Go 语言代码为例,进行 gRPC 相关代码编写。

  • 下载 protoc 工具:https://github.com/protocolbuffers/protobuf/releases,并添加到环境变量。
  • 下载依赖包:
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
  • 定义目录结构,在 proto 目录下编写 helloworld.proto 文件:
    在这里插入图片描述
syntax="proto3";
package proto;option go_package = "./;proto";service Greeter {rpc SayHello (HelloRequest) returns (HelloReply) {}
}message HelloRequest {string name = 1;
}message HelloReply {string message = 1;
}

运行 protoc -I . helloworld.proto --go_out=:. --go-grpc_out=require_unimplemented_servers=false:. 命令用于生成与 Protocol Buffers 和 gRPC 服务相关的代码。
在这里插入图片描述

  • 分别在 server 目录和 client 目录编写服务端和客户端代码,启动 sever 和 client,client 端可看到运行结果:Hello: Alex。
// server.go
package mainimport ("context""net""google.golang.org/grpc""your_moudle/proto"
)type Server struct {}func (s *Server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloReply, error) {return &proto.HelloReply{Message: "Hello " + request.GetName()}, nil
}func main() {lis, err := net.Listen("tcp", "0.0.0.0:8000")if err != nil {panic("failed to listen: " + err.Error())}g := grpc.NewServer()proto.RegisterGreeterServer(g, &Server{})err = g.Serve(lis)if err != nil {panic("failed to start grpc: " + err.Error())}
}// client.gopackage mainimport ("context""fmt""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure""your_moudle/proto"
)func main() {conn, err := grpc.Dial("127.0.0.1:8000", grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {panic(err)}defer conn.Close()c := proto.NewGreeterClient(conn)r, err := c.SayHello(context.Background(), &proto.HelloRequest{Name: "Alex"})if err != nil {panic(err)}fmt.Println(r.Message)
}

从上面例子中可以看出,一个完整的客户端与服务端进行通信的流程如下:
在这里插入图片描述

服务端:

  • 服务端实现 proto 文件中定义的接口:
func (s *Server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloReply, error) {return &proto.HelloReply{Message: "Hello " + request.GetName()}, nil
}
  • 监听 TCP 端口:
lis, _ := net.Listen("tcp", "0.0.0.0:8000")
  • 注册和启动服务:
g := grpc.NewServer()
proto.RegisterGreeterServer(g, &Server{})
_ = g.Serve(lis)

客户端:

  • 建立 TCP 连接:
conn, _ := grpc.Dial("127.0.0.1:8000", grpc.WithTransportCredentials(insecure.NewCredentials()))
  • 创建 Client:
c := proto.NewGreeterClient(conn)
  • 执行 RPC 调用,并接收信息:
r, _ := c.SayHello(context.Background(), &proto.HelloRequest{Name: "Alex"})

gRPC 内部怎样实现方法调用

服务端

通过 NewServer() 方法初始化一个 gRPC server,用于后续注册服务以及接受请求

创建带默认值的 gRPC server 结构体对象,初始化描述协议的各种参数选项,包括发送和接收的消息大小、buffer大小等等各种,类似于 http Headers。

func NewServer(opt ...ServerOption) *Server {// 描述协议的各种参数选项,包括发送和接收的消息大小、buffer大小等等各种,类似于 http Headersopts := defaultServerOptionsfor _, o := range globalServerOptions {o.apply(&opts)}for _, o := range opt {o.apply(&opts)}// 创建带默认值的 gRPC server 结构体对象s := &Server{lis:      make(map[net.Listener]bool),opts:     opts,conns:    make(map[string]map[transport.ServerTransport]bool),services: make(map[string]*serviceInfo),quit:     grpcsync.NewEvent(),done:     grpcsync.NewEvent(),czData:   new(channelzData),}// 配置拦截器chainUnaryServerInterceptors(s)chainStreamServerInterceptors(s)s.cv = sync.NewCond(&s.mu)// 配置简单的链路工具if EnableTracing {_, file, line, _ := runtime.Caller(1)s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))}if s.opts.numServerWorkers > 0 {s.initServerWorkers()}s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")channelz.Info(logger, s.channelzID, "Server created")return s
}

通过 RegisterService 方法来实现服务注册

使用 proto.RegisterGreeterServer(g, &Server{}) 来注册服务时,其在 xxx_grpc.pb.go 中通过 RegisterService 方法来实现服务注册。

func RegisterGreeterServer(s grpc.ServiceRegistrar, srv GreeterServer) {// 第二个参数为我们自定义实现了相应接口的实现类。s.RegisterService(&Greeter_ServiceDesc, srv)
}
// Greeter_ServiceDesc 是 greter 服务的 grpc.ServiceDesc,不能被自省或修改(即使作为副本)。
var Greeter_ServiceDesc = grpc.ServiceDesc{// 声明了名称、路由、方法及其他元数据属性ServiceName: "proto.Greeter",HandlerType: (*GreeterServer)(nil),Methods: []grpc.MethodDesc{{MethodName: "SayHello",Handler:    _Greeter_SayHello_Handler,},},Streams:  []grpc.StreamDesc{},Metadata: "helloworld.proto",
}

RegisterService 方法的具体实现如下:

func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {// 判断 ServiceServer 是否实现 ServiceDesc 中描述的 HandlerType,如果实现了则调用 s.register 方法注册if ss != nil {ht := reflect.TypeOf(sd.HandlerType).Elem()st := reflect.TypeOf(ss)if !st.Implements(ht) {logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)}}s.register(sd, ss)
}

首先做一些前置判断,接着 register 根据 sd 中的 Method 创建对应的 map,并将名称作为键,方法描述(指针)作为值,添加到相应的 map 中,最后按照服务名为 key,将 serviceInfo 信息注入到 Server 的 services map 中。

func (s *Server) register(sd *ServiceDesc, ss interface{}) {s.mu.Lock()defer s.mu.Unlock()// 一些前置性判断,注册服务必须要在 server() 方法之前调用s.printf("RegisterService(%q)", sd.ServiceName)if s.serve {logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)}if _, ok := s.services[sd.ServiceName]; ok {logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)}// 在 Server 结构体中,services 字段存放的是 {service name -> service info} map// serviceInfo 中有两个重要的属性:methods 和 streamsinfo := &serviceInfo{serviceImpl: ss,methods:     make(map[string]*MethodDesc),streams:     make(map[string]*StreamDesc),mdata:       sd.Metadata,}// register 根据 sd 中的 Method 创建对应的 map,并将名称作为键,方法描述(指针)作为值,添加到相应的 map 中for i := range sd.Methods {d := &sd.Methods[i]info.methods[d.MethodName] = d}for i := range sd.Streams {d := &sd.Streams[i]info.streams[d.StreamName] = d}// 按照服务名为 key,将 serviceInfo 信息注入到 Server 的 services map 中s.services[sd.ServiceName] = info
}

register 方法可以看出,其实对于不同的 RPC 请求,根据 services 中不同的 serviceName 去 services map 中取出不同的 handler 进行处理.

通过 Serve() 启动服务

通过死循环的方式在某一个端口实现监听,然后 client 对这个端口发起连接请求,握手成功后建立连接,然后 server 处理 client 发送过来的请求数据,根据请求参数,调用不同的 handler 进行处理,回写响应数据。

func (s *Server) Serve(lis net.Listener) error {// ...for {rawConn, err := lis.Accept()// ...s.serveWG.Add(1)go func() {// gRPC 是基于 HTTP2.0 实现// handleRawConn 实现了 http 的 handshakes.handleRawConn(lis.Addr().String(), rawConn)s.serveWG.Done()}()}//...
}
func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {// 。。。// Finish handshaking (HTTP2)st := s.newHTTP2Transport(rawConn)rawConn.SetDeadline(time.Time{})// 。。。go func() {// 继续调用 serveStreams 方法s.serveStreams(st)s.removeConn(lisAddr, st)}()
}func (s *Server) serveStreams(st transport.ServerTransport) {// 。。。st.HandleStreams(func(stream *transport.Stream) {// 。。。go func() {defer wg.Done()// 根据 serviceName 取 server 中的 services maps.handleStream(st, stream, s.traceInfo(st, stream))}()}, func(ctx context.Context, method string) context.Context {// 。。。})wg.Wait()
}func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {// ...service := sm[:pos]method := sm[pos+1:]// ...// 取出 handler 进行处理srv, knownService := s.services[service]if knownService {if md, ok := srv.methods[method]; ok {// 在该方法中实现 handler 对 rpc 的处理,以及处理后的 response s.processUnaryRPC(t, stream, srv, md, trInfo)return}if sd, ok := srv.streams[method]; ok {s.processStreamingRPC(t, stream, srv, sd, trInfo)return}}// ...
}

客户端

通过拨号 Dial() 建立连接

func Dial(target string, opts ...DialOption) (*ClientConn, error) {return DialContext(context.Background(), target, opts...)
}

调用 DialContext() 方法主要返回一个初始化的 ClientConn{} 结构体对象:

// 压缩解压缩、是否需要认证、超时时间、是否重试等信息
cc := &ClientConn{target: target,// 连接的状态管理器,每个连接具有 "IDLE"、"CONNECTING"、"READY"、"TRANSIENT_FAILURE"、"SHUTDOW N"、"Invalid-State" 这几种状态csMgr:  &connectivityStateManager{},conns:  make(map[*addrConn]struct{}),dopts:  defaultDialOptions(),// 监测 server 和 channel 的状态czData: new(channelzData),// 。。。
}

通过 NewGreeterClient 创建客户端

通过 proto.NewGreeterClient(conn) 创建 Client 对象,其在 xxx_grpc.pb.go 中通过 NewGreeterClient 创建客户端

func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {return &greeterClient{cc}
}

调用 Invoke 发起 RPC 调用

执行 c.SayHello(context.Background(), &proto.HelloRequest{Name: "Alex"}) 调用,并接收信息,SayHello 方法通过调用 Invoke 发起 RPC 调用

func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {out := new(HelloReply)// Invoke 中调用 invokeerr := c.cc.Invoke(ctx, "/proto.Greeter/SayHello", in, out, opts...)if err != nil {return nil, err}return out, nil
}func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {// 。。。return invoke(ctx, method, args, reply, cc, opts...)
}func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)// 。。。if err := cs.SendMsg(req); err != nil {return err}return cs.RecvMsg(reply)
}

cs.SendMsg(req) 方法中,首先准备数据:

hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)

接着调用 csAttempt 这个结构体中的 sendMsg 方法:

op := func(a *csAttempt) error {// 这个 sendMsg 方法中通过 a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}) 发出的数据写操作return a.sendMsg(m, hdr, payload, data)
}

cs.RecvMsg(reply) 方法中调用 csAttemptrecvMsg 方法:

func (cs *clientStream) RecvMsg(m interface{}) error {// 。。。err := cs.withRetry(func(a *csAttempt) error {return a.recvMsg(m, recvInfo)}, cs.commitAttemptLocked)// 。。。
}

通过层层调用,最终是通过一个 io.Reader 类型的 p.r 来读取数据。

func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {// ...err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)// ...
}func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {// 通过 recvAndDecompress 方法,接收数据并解压缩d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)// 。。。
}func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) {pf, d, err := p.recvMsg(maxReceiveMessageSize)// 。。。
}func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) {if _, err := p.r.Read(p.header[:]); err != nil {return 0, nil, err}// 。。。
}

代理模式

定义:提供一个代理对象,并由代理对象控制对原对象的引用。

代理模式下存在三类角色:

  • 抽象角色 Subject:通过接口或抽象类声明业务方法,需要代理角色和真实角色来实现。
  • 代理角色 Proxy:暴露给客户端,是真实角色的代理,通过真实角色的业务逻辑方法来实现抽象方法,并可以附加额外方法。
  • 真实角色 RealSubject:实现真实角色的业务逻辑,供代理角色调用。
    在这里插入图片描述

如上图所示,Client 客户端通过 toRequest 发起请求。抽象角色 Subject 作为一个接口,其中有一个Request方法。下面还有两个实现类,RealSubject 是实际的实现类,Proxy 是一个代理类,它内部是直接调用 RealSubject 的 request方法。当然,在 Proxy 中,它可以增加自己的 preRequest 和 afterRequest 处理逻辑。

不用代理模式的情况下,Client 就会直接请求 RealSubject。而使用代理模式,Client 直接调用的 Proxy,由 Proxy 先处理一遍,再由 Proxy调用 RealSubject,最后再返回给 Client。整个过程,Client 看到的只有 Proxy,对它来说,Proxy 就是真实的 Subject 实现类。而RealSubject 原来要服务很多的Client,但是现在只需要暴露给 Proxy,它的风险就小很多了,因为只需要信任 Proxy 就够了。

综上,代理模式起到中介隔离效果,避免直接暴露 RealSubject,同时这还符合开闭原则,对扩展开放,对修改关闭。Proxy 就是一个扩展,有新的需求可以在 Proxy 中实现,从而减少对 RealSubject 的修改。RealSubject 从而可以更加聚焦自己的核心能力,把一些边缘性的经常变化的扩展性需求放在 Proxy 中来实现。

作为补充,常见的 gRPC Proxy 原理:

  • 启动一个 gRPC 代理服务端;
  • 拦截 gRPC 的服务,转发到代理的一个函数中执行;
  • 接收客户端的请求,完成相关处理后转发给服务端;
  • 接收服务端的响应,完成相关处理后转发给客户端;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/26487.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【单谐波非线性振动问题求解器 GUI 】使用单个谐波表示解决 MDOF 非线性振动问题(Matlab代码实现)

💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…

ceph存储的应用

ceph存储的应用 一:创建 CephFS 文件系统 MDS 接口1.服务端操作1)在管理节点创建 mds 服务2)查看各个节点的 mds 服务3)创建存储池,启用 ceph 文件系统4)查看mds状态,一个up,其余两个…

Java使用JNI实现C文件的调用

1.使用IDEA新建工程 构建最基本的maven类型就行,文件结构如下: 其中最主要的类如下: package org.linx;public class TestJNI {static {/*** 加载jni库,有一个重要的点就是生成的为libnative.so,下面加载代码需要消…

Python应用实例(二)数据可视化(四)

数据可视化(四)下载数据 1.CSV文件格式1.1 分析CSV文件头‘1.2 打印文件头及其位置1.3 提取并读取数据1.4 绘制温度图表1.5 在图表中添加日期 从网上下载数据,并对其进行可视化。网上的数据多得令人难以置信,大多未经仔细检查。如…

设计模式day03

01gradle极速安装与配置入门 下载6.8.2版本,配置环境变量 配置镜像仓库 给gradle安装目录下init.d文件夹,放一个init.gradle文件,内容如下: gradle.projectsLoaded {rootProject.allprojects {buildscript {repositories {def JCENTER_URL…

西贝柳斯Sibelius2023旗舰版曲谱大师必备音乐软件

乐谱太复杂,打起来太费时间?革命性的省时功能,如磁性布局和动态分谱,能够快速创作复杂的乐谱。音色库太简陋,找起来麻烦?收藏丰富的音色库供您直接使用,涵盖最广泛的专业级乐器,支持…

TortoiseGit 入门指南08:浏览引用以及在引用间切换

在上一节 创建分支 中,我们学会了在分支上开发新功能,那么随之而来的问题是:如何查看项目又多少分支?如何再切换到主分支?这节来解决这些问题。 在回答之前,需要先了解一个 Git 术语:引用&…

docker-compose安装redis高可用哨兵集群(一主二从三哨兵)

以redis 7.0为例子 直接上代码 docker-compose.yaml version: 3.3 services:master:image: redis:7.0container_name: redis-master#restart: alwayscommand: redis-server --port 6379 --requirepass root --appendonly yes --masterauth root --replica-announce-ip 192.1…

duilib绝对定位与相对定位

文章目录 前言1、绝对位置(floattrue)2、窗口3、布局及控件4、相对位置(floatfalse)5、窗口6、布局与控件7、嵌套在布局与控件之中的布局与控件 前言 duilib中窗口,布局,控件等在屏幕上的显示位置都是按照…

mpVue 微信小程序基于vant-weapp 组件的二次封装TForm 表单组件(修改源码插槽使用)

一、前言 1、mpVue微信小程序不支持动态组件&#xff08;<component> &#xff09; 2、mpVue微信小程序不支持动态属性及事件穿透&#xff08;$attrs和$listeners&#xff09; 3、mpVue微信小程序不支持render函数 二、最终效果 三、配置参数&#xff08;Attributes&…

WEB:lottery

背景知识 dirsearch扫描 题目 原题目应该使用dirsearch扫描发现git泄露然后使用Githack复原的但是攻防世界这边直接把源码给了我们 下载附件可得到 打开文件后进行代码审计 function buy($req){require_registered();require_min_money(2);$money $_SESSION[money];$numbers…

VBA命令及语法列表之字典Dictionaries相关

【分享成果&#xff0c;随喜正能量】真正的修佛之人。首先&#xff0c;得明白自己的来处和归宿&#xff0c;懂得敬畏自己的生命和他人的生命&#xff0c;尽其所能释放善意。学会悲天悯人&#xff0c;渡人的同时渡己&#xff0c;始终走在止于至善的路上。真正的慈悲为怀&#xf…