go语言 grpc 拦截器

文章目录

    • 拦截器
      • 服务端拦截器
        • 一元拦截器
        • 流拦截器
      • 客户端拦截器
        • 一元拦截器
        • 流拦截`
      • 多个拦截器
  • 代码仓库

拦截器

gRPC拦截器(interceptor)是一种函数,它可以在gRPC调用之前和之后执行一些逻辑,例如认证、授权、日志记录、监控和统计等。拦截器函数是gRPC中非常重要的概念,它允许我们在服务端和客户端添加自定义逻辑,以满足业务需求和运维需求。

在gRPC中,拦截器函数通常通过实现grpc.UnaryServerInterceptor和grpc.StreamServerInterceptor接口来定义。UnaryServerInterceptor用于拦截一元RPC请求,而StreamServerInterceptor用于拦截流式RPC请求。在客户端中,我们可以使用grpc.UnaryClientInterceptor和grpc.StreamClientInterceptor来拦截gRPC调用。

在gRPC中,拦截器函数可以被链接起来,形成一个拦截器链。在这个拦截器链中,每个拦截器函数都可以处理请求并将其转发给下一个拦截器函数,或者直接返回响应。因此,我们可以在拦截器函数中编写不同的逻辑,例如实现认证、授权、监控和统计等。
以下是一些常见的gRPC拦截器:

  • 认证和授权拦截器:用于对gRPC调用进行身份验证和权限控制,例如检查token、验证用户名和密码、检查访问控制列表等;
  • 日志记录拦截器:用于记录gRPC调用的日志,例如记录请求的方法、参数、响应状态等;
  • 监控和统计拦截器:用于监控gRPC调用的性能和吞吐量,例如记录调用次数、响应时间、错误率等;
  • 缓存拦截器:用于在服务端或客户端缓存一些数据,例如缓存计算结果、缓存数据库查询结果等。

服务端拦截器

在这里插入图片描述

一元拦截器
package mainimport ("context""flag""log""time""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure"pb "mygrpc/proto/hello" // 引入编译生成的包
)const (defaultName = "world"
)var (addr = flag.String("addr", "localhost:50051", "the address to connect to")name = flag.String("name", defaultName, "Name to greet")
)func main() {flag.Parse()// 与服务建立连接.conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()// 创建指定服务的客户端c := pb.NewGreeterClient(conn)// 连接服务器并打印出其响应。ctx, cancel := context.WithTimeout(context.Background(), time.Second)defer cancel()// 调用指定方法r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})if err != nil {log.Fatalf("could not greet: %v", err)}log.Printf("Greeting: %s", r.GetMessage())
}

结果

2023/12/07 14:52:55 ======= [Server Interceptor]  /hello.Greeter/SayHello
2023/12/07 14:52:55  Pre Proc Message : name:"world"
2023/12/07 14:52:55 Received: world
2023/12/07 14:52:55  Post Proc Message : message:"Hello world"
流拦截器

流式拦截器需要对grpc.ServerStream进行包装,重新实现RecvMsg和SendMsg方法。

func (s *server) SearchOrders(req *pb.HelloRequest, stream pb.Greeter_SearchOrdersServer) error {log.Printf("Recved %v", req.GetName())// 具体返回多少个response根据业务逻辑调整for i := 0; i < 10; i++ {// 通过 send 方法不断推送数据err := stream.Send(&pb.HelloReply{})if err != nil {log.Fatalf("Send error:%v", err)return err}}return nil
}type wrappedStream struct {// 包装器流grpc.ServerStream
}
// 接受信息拦截器
func (w *wrappedStream) RecvMsg(m interface{}) error {log.Printf("====== [Server Stream Interceptor Wrapper] Receive a message (Type: %T) at %s", m, time.Now().Format(time.RFC3339))return w.ServerStream.RecvMsg(m)
}
// 发送消息拦截器
func (w *wrappedStream) SendMsg(m interface{}) error {log.Printf("====== [Server Stream Interceptor Wrapper] Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))return w.ServerStream.SendMsg(m)
}func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {return &wrappedStream{s}
}func orderServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {// 前置处理log.Println("====== [Server Stream Interceptor] ", info.FullMethod)// 包装器流调用 流RPCerr := handler(srv, newWrappedStream(ss))if err != nil {log.Printf("RPC failed with error %v", err)}return err
}
func main() {flag.Parse()lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))if err != nil {log.Fatalf("failed to listen: %v", err)}// 开启rpcs := grpc.NewServer(grpc.StreamInterceptor(orderServerStreamInterceptor))// 注册服务pb.RegisterGreeterServer(s, &server{})log.Printf("service listening at %v", lis.Addr())if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}
}

结果

GOROOT=D:\software\Go #gosetup
GOPATH=D:\software\golibrary #gosetup
D:\software\Go\bin\go.exe build -o C:\Users\29071\AppData\Local\JetBrains\GoLand2023.3\tmp\GoLand\___go_build_mygrpc_service_steamInterceptorservice.exe mygrpc/service/steamInterceptorservice #gosetup
C:\Users\29071\AppData\Local\JetBrains\GoLand2023.3\tmp\GoLand\___go_build_mygrpc_service_steamInterceptorservice.exe
2023/12/07 15:07:48 service listening at [::]:50051
2023/12/07 15:08:07 ====== [Server Stream Interceptor]  /hello.Greeter/searchOrders
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Receive a message (Type: *hello.HelloRequest) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 Recved 开始服务端rpc流测试
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
Process finished with the exit code -1073741510 (0xC000013A: interrupted by Ctrl+C)

客户端拦截器

在这里插入图片描述

一元拦截器
func orderUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {// 前置处理逻辑log.Println("Method : " + method)// 调用invoker 执行远程方法err := invoker(ctx, method, req, reply, cc, opts...)// 后置处理逻辑log.Println(reply)return err
}func main() {flag.Parse()// 与服务建立连接.conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()),grpc.WithUnaryInterceptor(orderUnaryClientInterceptor)) //添加拦截器if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()// 创建指定服务的客户端c := pb.NewGreeterClient(conn)// 连接服务器并打印出其响应。ctx, cancel := context.WithTimeout(context.Background(), time.Second) // 设置超时时间为一秒defer cancel()// 调用指定方法r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})if err != nil {log.Fatalf("could not greet: %v", err)}log.Printf("Greeting: %s", r.GetMessage())
}

结果

2023/12/07 16:37:28 Method : /hello.Greeter/SayHello
2023/12/07 16:37:28 message:"Hello world"
2023/12/07 16:37:28 Greeting: Hello worl
流拦截`
type wrappedStream struct {grpc.ClientStream
}func (w *wrappedStream) RecvMsg(m interface{}) error {log.Printf("====== [Client Stream Interceptor] Receive a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))return w.ClientStream.RecvMsg(m)
}func (w *wrappedStream) SendMsg(m interface{}) error {log.Printf("====== [Client Stream Interceptor] Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))return w.ClientStream.SendMsg(m)
}func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {return &wrappedStream{s}
}func clientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {// 前置处理逻辑log.Println("======= [Client Interceptor] ", method)// 调用streamer 来获取客户端流s, err := streamer(ctx, desc, cc, method, opts...)if err != nil {return nil, err}return newWrappedStream(s), nil
}func main(){// 注册拦截器到客户端流conn,err:=grpc.Dial(address,grpc.WithInsecure(),grpc.WithStreamInterceptor(clientStreamInterceptor))if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()c := pb.NewOrderManagementClient(conn)ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)defer cancel()// 调用客户端流RPC方法searchStream, _ := c.SearchOrders(ctx, &wrapper.StringValue{Value: "Google"})for {searchOrder, err := searchStream.Recv()if err == io.EOF {log.Print("EOF")break}if err == nil {log.Print("Search Result : ", searchOrder)}}
}

结果

2023/12/07 17:10:43 ====== [Client Stream Interceptor] Send a message (Type: *hello.HelloRequest) at 2023-12-07T17:10:43+08:00
2023/12/07 17:10:43 ====== [Client Stream Interceptor] Send a message (Type: *hello.HelloRequest) at 2023-12-07T17:10:43+08:00
2023/12/07 17:10:43 客户端流传输结束

多个拦截器

在grpc中默认的拦截器不可以传多个,因为在源码中,存在一些问题

func chainUnaryClientInterceptors(cc *ClientConn) {interceptors := cc.dopts.chainUnaryIntsif cc.dopts.unaryInt != nil {interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)}var chainedInt UnaryClientInterceptorif len(interceptors) == 0 {chainedInt = nil} else if len(interceptors) == 1 {chainedInt = interceptors[0]} else {chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)}}cc.dopts.unaryInt = chainedInt
}

当存在多个拦截器时,取的就是第一个拦截器。因此结论是允许传多个,但并没有用。

如果真的需要多个拦截器,可以使用 go-grpc-middleware 提供的 grpc.UnaryInterceptor 和 grpc.StreamInterceptor 链式方法。核心方法如下

func ChainUnaryClient(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {n := len(interceptors)if n > 1 {lastI := n - 1return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {var (chainHandler grpc.UnaryInvokercurI         int)chainHandler = func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error {if curI == lastI {return invoker(currentCtx, currentMethod, currentReq, currentRepl, currentConn, currentOpts...)}curI++err := interceptors[curI](currentCtx, currentMethod, currentReq, currentRepl, currentConn, chainHandler, currentOpts...)curI--return err}return interceptors[0](ctx, method, req, reply, cc, chainHandler, opts...)}}...
}

代码仓库

https://github.com/onenewcode/mygrpc.git

也可以直接下载绑定的资源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/259906.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

希尔排序详解:一种高效的排序方法

在探索排序算法的世界中&#xff0c;我们经常遇到需要对大量数据进行排序的情况。传统的插入排序虽然简单&#xff0c;但在处理大规模数据时效率并不高。这时&#xff0c;希尔排序&#xff08;Shell Sort&#xff09;就显得尤为重要。本文将通过深入解析希尔排序的逻辑&#xf…

降维技术——PCA、LCA 和 SVD

一、说明 降维在数据分析和机器学习中发挥着关键作用&#xff0c;为高维数据集带来的挑战提供了战略解决方案。随着数据集规模和复杂性的增长&#xff0c;特征或维度的数量通常变得难以处理&#xff0c;导致计算需求增加、潜在的过度拟合和模型可解释性降低。降维技术通过捕获数…

【已解决】SpringBoot Maven 打包失败:class lombok.javac.apt.LombokProcessor 错误

文章目录 出错原因解决办法总结 最新项目部署的时候&#xff0c;出现了一个maven打包失败的问题&#xff0c;主要是lombok这个组件出的问题&#xff0c;具体的错误信息如下&#xff1a; 我的lombok版本如下&#xff1a; <dependency><groupId>org.projectlombok&l…

Spring Cloud切换内嵌Tomcat为宝兰德Application Server

目录 替换Tomcat中间件Tomcat是什么Spring Cloud剔除tomcat引入宝兰德Application Server打包运行授权导入 替换Tomcat中间件 Tomcat是什么 Spring Cloud剔除tomcat <!--集成springmvc框架 --><dependency><groupId>org.springframework.boot</groupId&…

AWS Remote Control ( Wi-Fi ) on i.MX RT1060 EVK - 3 “编译 NXP i.MX RT1060”( 完 )

此章节叙述如何修改、建构 i.MX RT1060 的 Sample Code“aws_remote_control_wifi_nxp” 1. 点击“Import SDK example(s)” 2. 选择“MIMXRT1062xxxxA”>“evkmimxrt1060”&#xff0c;并确认 SDK 版本后&#xff0c;点击“Next>” 3. 选择“aws_examples”>“aw…

《ReactJS实践入门》:引领JavaScript前端开发的革新之旅

在当今的软件开发世界中&#xff0c;ReactJS无疑是最为引人注目的JavaScript库之一。对于初学者来说&#xff0c;如何深入理解并掌握这一强大的前端工具&#xff0c;进而应用到实际开发中&#xff0c;一直是他们所面临的问题。而《ReactJS实践入门》一书&#xff0c;正是为了解…

SAP UI5 walkthrough step7 JSON Model

这个章节&#xff0c;帮助我们理解MVC架构中的M 我们将会在APP中新增一个输入框&#xff0c;并将输入的值绑定到model&#xff0c;然后将其作为描述&#xff0c;直接显示在输入框的右边 首先修改App.controllers.js webapp/controller/App.controller.js sap.ui.define([&…

MYSQL提权

一、环境准备&#xff1a; 靶场机&#xff1a;windows7&#xff1a;192.168.200.34 攻击机&#xff1a;kali&#xff1a;192.168.200.14 二、原理&#xff1a; UDF&#xff08;User-Defined Function&#xff09;提权指的是通过在MySQL数据库中编写自定义函数&#xff08;UD…

【vtkWidgetRepresentation】第六期 vtkFinitePlaneRepresentation

很高兴在雪易的CSDN遇见你 &#xff0c;给你糖糖 欢迎大家加入雪易社区-CSDN社区云 前言 本文分享VTK中的平面Plane表示方法&#xff0c;希望对各位小伙伴有所帮助&#xff01; 感谢各位小伙伴的点赞关注&#xff0c;小易会继续努力分享&#xff0c;一起进步&#xff01; …

unity中:搭建在线AR应用

使用Imagine WebAR - Image Tracker插件部署WebGL应用 在使用Imagine WebAR - Image Tracker插件进行WebGL应用开发时&#xff0c;有两个关键知识点需要掌握&#xff1a; 1. 部署到支持HTTPS的服务器 由于WebGL应用需要访问用户的摄像头&#xff0c;因此必须在支持HTTPS的服…

【前端】CSS基础(学习笔记)

一、简介 1、HTML局限性 HTML只关注内容的语义&#xff0c;但是丑&#xff01; 2、CSS概要 CSS 是层叠样式表 ( Cascading Style Sheets ) 的简称&#xff0c;有时我们也会称之为 CSS 样式表或级联样式表。 CSS 是也是一种标记语言 CSS 主要用于设置 HTML 页面中的文本内…

面向对象中的单例模式

1、什么是设计模式 设计模式就是前人根据实际的问题提出的问题解决方案&#xff0c;我们把这种就称之为设计模式。 2、单例模式 单例模式是一种常见的设计模式&#xff01; 所谓的设计模式&#xff0c;不是一种新的语法&#xff0c;而是人们在实际的应用中&#xff0c;面对…