上一篇文章大概讲了如何将自定义的protobuf类型的message转换成相应的go文件,这次就结合grpc写一个比较认真的客户端和服务器端例子
一、项目结构
client存放rpc服务的客户端文件
server存放rpc服务的服务端文件
protobuf存放自定义的proto文件
grpc存放生成的grpc、potobuf转换后的文件
utils存放工具性的文件
补充一个整个项目完成后展开后的结构图:
二、依赖下载
上篇文章中我们主要是安装了protoc指令程序,这次我们要安装一下
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
这是一个转换protobuf的同时生成grpc服务的插件
此外,项目之中还要现在这两个包
github.com/golang/protobuf
google.golang.org/grpc
三、proto文件魔改
因为这个例子还算比较严肃,所以我魔改了一下原先的文件,现在的结构是这样的:
service.proto
其定义了我们rpc服务的类型,如下
syntax="proto3";
package service;
option go_package="../grpc;grpc";
import "request.proto";
import "user.proto";
// 我们将定义相关的rpc服务,主要是在请求信息去请求我们User message
// 在此之前我对之前的protobuf结构进行了修改
// 首先将内部的role/parent/user全部整合到user.proto一个文件中去
service Test{rpc GetUser1(request.Request)returns(user.User){};//这是一个单对单的服务,传入带名字和id的信息,返回一个userrpc GetUser2(stream request.Request)returns(user.UserGroup){};// 传入多个信息,返回一个用户组rpc GetUser3(request.IdGroup)returns(stream user.User){};// 传入单个信息,返回多个用户rpc GetUser4(stream request.Request)returns(stream user.User){};//传入多个用户信息,返回多个用户实例
}
我们看到有四个服务,分别对应着不同的input/output类型(分别在user.proto和request.proto两个文件中定义),grpc使用的是http2协议,所以有个流式传输,也就是可以一直传输某些内容,特征是在参数或者return的值前面加上“stream”关键词。以服务GetUser3为例,用户传入一个id组,会根据这个id列表中的id一直返回对应的user。
注意这里的Test,是我们这次rpc服务的命名空间,会反映到生成的grpc服务结构体名上
request.proto
定义了两个参数类型
syntax="proto3";
package request;
import "google/protobuf/any.proto";//引入any类型
import "google/protobuf/timestamp.proto";//引入时间戳类型
option go_package="../grpc;grpc";message Request{optional string name=1;//可选name参数uint64 id=2;//id参数optional google.protobuf.Any other_msg=3;//其他信息google.protobuf.Timestamp timestamp=4;//请求的时间
}// 创建一个id组成的数组
message IdGroup{repeated uint64 ids=1;google.protobuf.Timestamp timestamp=2;//请求的时间optional google.protobuf.Any other_msg=3;//其他信息
}
分别是单独的request和id组成的数组
user.proto
syntax="proto3";//顶格声明protobuf版本,默认是protobuf2
// 注意一下语句后面要加";",否则不识别
package user;//定义一下protobuf的包名
import "google/protobuf/any.proto";//引入any类型
import "google/protobuf/timestamp.proto";//引入时间戳类型
/*
import public "some path"
这个public关键词用于控制这个包的依赖是否可以传递,例子如下a.proto:
import "b.proto"
import public "c.proto"index.proto:
import "a.proto"
那么这个index文件当中除了能使用a文件中定义的变量,还能使用c文件当中的遍历,但是b文件就不能使用*/
option go_package="../grpc;grpc";//规定生成文件的输出文件,同时规定对应文件package的名称
//这里指定的 out_path 并不是绝对路径,只是相对路径或者说只是路径的一部分,和 protoc 的 --go_out 拼接后才是完整的路径。所以我的侧率就是不写go_out// 这边我们写一个User结构体
//结构体的名称我们采取的是官网上的格式,注意和golang默认格式区分
// 具体的protobuf基础类型和对应语言之间对应表,参见https://protobuf.dev/programming-guides/proto3/#specifying-field-rulesmessage User{// 保留字段和保留数字,这个用法我不是很懂,我看的资料显示如下/*如果一个字段不再需要,如果删除或者注释掉,则其他人在修改时会再次使用这些字段编号,那么旧的引用程序就可能出现一些错误,所以使用保留字段,保留已弃用的字段编号或字段名 我个人觉得这应该涉及到可拓展性的问题(难道更改的时候不会去重新生成对应的文件吗)*/reserved "hh";reserved 99 to 100;string name=1;uint64 id=2;bool merried=3;Role role=4;//这个就是role包引入的枚举类型,枚举定义在message内部就是独占枚举optional google.protobuf.Any other_msg=5;//any类型oneof child_name{string son_name=6;string daughter_name=7;//暂时看起来不同于枚举,oneof控制的事不同字段只能选一个}repeated string hobbies=8;//可重复字段,应该会生成一个切片//内嵌字段,注意tag只是在同一个message内不能重复,内嵌的字段不算// 内嵌的字段是能单独拿出来用的,比如在另一个字段中,可以使用TestUser.GameCharacter// 注意这里的行为只是定义,要使用可以如下这样写// 内嵌的role枚举enum Role{NORMAL_USER=0;VIP_USER=1;BOSS=3;};// 内嵌的parent结构message Parent{string name=1;uint32 age=2;}// 创建一个mapmap<string,Parent> parents=9;// 创建一个时间戳类型optional google.protobuf.Timestamp timestamp=10;
}
// 我尝试了一下extend关键词,试图拓展User,但是遭到了失败,不支持extension关键词,但是生成的时候又要,陷入两难// 创建一个用户组
message UserGroup{repeated User users=1;google.protobuf.Timestamp timestamp=2;optional google.protobuf.Any other_msg=3;//any类型
}
这是我们上篇文章涉及到的文件,我对user进行了删改,并加上了用户组(user组成的数组)
other_msg.proto
syntax="proto3";
package otherMsg;
option go_package="../grpc;grpc";
// 这些信息是用来填充any类型的other_msg字段的
message MsgFromWeekday{string msg=1;
}
message MsgFromWeekend{string msg=1;
}
这个则是定义了protobuf中any的结构,需要注意,protobuf中的any不是说真的啥类型都可以的,因为要对any进行反序列化,所以这里要提前定义
四、生成服务所需要的文件
我们在protobuf文件下进入命令行,输入以下指令:
protoc --go_out=. --go-grpc_out=. *.proto
--go_out是转换成的golang文件地址
--go-grpc_out是grpc服务文件生成地址
*.proto 则是对protobuf文件下所有proto文件进行处理
则会在我们的grpc文件中生成如下文件:
我们可以看到,明显 多了一个service_grpc.pb.go.这个就是grpc服务器和客户端的构成文件。这个文件内容呢,他非常讨厌,有点多,我看是看得懂,但是说不清,主要是包含了客户端和服务器端的一些接口、方法。
五、写一下服务端
服务端是一个main.go文件
这里我可能写的有点复杂了,主要是把函数精细化了一下
照例先贴一下
package mainimport ("context""fmt"pb "grpc_test/grpc""grpc_test/utils""io""log""net""google.golang.org/grpc"
)//第一部分:虚假数据库type User_Role int32const (User_NORMAL_USER User_Role = 0User_VIP_USER User_Role = 1User_BOSS User_Role = 3
)type Parent struct {Name stringAge uint32
}
type Child struct {Gender uintName string
}type User struct {ID uintName stringMerried boolRole User_Rolehobbies []stringParents map[string]ParentChild *Child
}var users = map[uint]User{1: {ID: 1, Name: "黄烽", Merried: false, Role: User_VIP_USER, hobbies: []string{"sleep", "game"}},2: {ID: 2, Name: "张胜前", Merried: false, Role: User_NORMAL_USER},3: {ID: 3, Name: "王怡雯", Merried: true, Role: User_VIP_USER, Child: &Child{1, "汪汪"}},4: {ID: 4, Name: "王福林", Merried: false, Role: User_NORMAL_USER, hobbies: []string{"sleep", "game"}, Parents: map[string]Parent{"father": {"me", 99},"father2": {"you", 99},}},
}// 第二部分:定义一下对应的四个服务
type TestServer struct {pb.UnimplementedTestServer
}// 第一个服务
func (s *TestServer) GetUser1(ctx context.Context, in *pb.Request) (resp *pb.User, err error) {//GetUser1是一个单对单的服务fmt.Println("进入GetUser1服务")// 打印一下输入的基本信息err = Broadcast(in)if err != nil {log.Println(err)}// 根据id查找一下用户,这里就使用一个map代替数据库u, ok := users[uint(in.Id)]if ok {resp = GenaratePbUser(&u)// other_msg部分进行处理any, err := utils.GenerateOtherMsg("this msg is from weekday", true)if err != nil {log.Println(err)}resp.OtherMsg = any// 时间戳部分进行处理resp.Timestamp = utils.GenerateTimestamp()}fmt.Println("GetUser1服务结束")fmt.Println("---------------")return
}// 第二个服务
func (s *TestServer) GetUser2(server pb.Test_GetUser2Server) (err error) {// 该服务会不断接受requst,然后返回一个用户组var pbUsers []*pb.Userfmt.Println("进入GetUser2服务")// 接受一下流信息for {req, err := server.Recv()// 判断接受是否完成if err == io.EOF {fmt.Println("接受所有请求完成")break}if err != nil {log.Println("接受流信息失败:", err)break}//进行req的处理// 首先广播一下信息err = Broadcast(req)if err != nil {log.Println(err)}// 查找对象并返回结果切片中u, ok := users[uint(req.Id)]if ok {pbUsers = append(pbUsers, GenaratePbUser(&u))}}// 进行返回操作res := &pb.UserGroup{Users: pbUsers,}// 添加other_msg和时间戳any, err := utils.GenerateOtherMsg("this msg is from weekday", true)if err != nil {log.Println(err)}res.OtherMsg = anyres.Timestamp = utils.GenerateTimestamp()//发送信息并关闭通道server.SendAndClose(res)fmt.Println("GetUser2服务结束")fmt.Println("---------------")return
}// 第三个服务
func (s *TestServer) GetUser3(iG *pb.IdGroup, server pb.Test_GetUser3Server) (err error) {// 第三个服务,发过来一个id组,流式返回用户信息fmt.Println("进入GetUser3服务")// 打印一下传递进来的idfmt.Println("ids:", iG.Ids)// 打印一下other_msg和时间戳if iG.OtherMsg != nil {if err = utils.BroadcastMsg(iG); err != nil {log.Println(err)}}utils.BroadcastTimestamp(iG)// 查找用户并进行返回for _, v := range iG.Ids {u, ok := users[uint(v)]if ok {resp := GenaratePbUser(&u)// 创建信息any, err := utils.GenerateOtherMsg("this msg is from weekend", false)if err != nil {log.Println(err)}resp.OtherMsg = any// 时间戳部分进行处理resp.Timestamp = utils.GenerateTimestamp()err = server.Send(resp)if err != nil {log.Println("发送失败:", err)break}}}fmt.Println("GetUser3服务结束")fmt.Println("---------------")return
}// 第四个服务
func (s *TestServer) GetUser4(server pb.Test_GetUser4Server) (err error) {// 第四个服务是双向流fmt.Println("进入GetUser4服务")for {req, err := server.Recv()// 判断一下是否全部接受完成if err == io.EOF {fmt.Println("全部接受完成")break}if err != nil {log.Println("接受信息失败:", err)break}// 打印一下信息if err = Broadcast(req); err != nil {log.Println(err)}// 查找一下用户u, ok := users[uint(req.Id)]if ok {resp := GenaratePbUser(&u)any, err := utils.GenerateOtherMsg("this msg is from weekend", false)if err != nil {log.Println(err)}resp.OtherMsg = any// 时间戳部分进行处理resp.Timestamp = utils.GenerateTimestamp()err = server.Send(resp)if err != nil {log.Println("发送失败:", err)break}}}fmt.Println("GetUser4服务结束")fmt.Println("---------------")return
}// 第四个部分:开启服务器
func main() {// main函数之中开启一下服务器listener, err := net.Listen("tcp", "localhost:996")if err != nil {log.Fatalln("listen fail: ", err)}var opt []grpc.ServerOptiongrpcServer := grpc.NewServer(opt...)pb.RegisterTestServer(grpcServer, &TestServer{})fmt.Println("grpc客户端启动,正在localhost:996进行监听")err = grpcServer.Serve(listener)if err != nil {log.Fatalf("failed to serve: %v", err)}
}// 第三个部分:一些工具函数// 根据user结构体创建pb.User类型的函数
func GenaratePbUser(u *User) (pbUser *pb.User) {// 对结果进行赋值// 能直接赋值的直接赋值pbUser = &pb.User{Name: u.Name,Id: uint64(u.ID),Merried: u.Merried,Role: pb.User_Role(u.Role),Hobbies: u.hobbies,Parents: make(map[string]*pb.User_Parent),}// 需要经过处理的部分// 孩子部分if u.Child != nil {// 判断一下孩子性别if u.Child.Gender == 0 {pbUser.ChildName = &pb.User_SonName{SonName: u.Child.Name}} else {pbUser.ChildName = &pb.User_DaughterName{DaughterName: u.Child.Name}}}// 家长map部分进行处理if u.Parents != nil {for k, v := range u.Parents {pbUser.Parents[k] = &pb.User_Parent{Name: v.Name, Age: v.Age}}}return
}// 打印Requst之中传入的信息
func Broadcast(in *pb.Request) (err error) {// 用户id必定存在可以不判断fmt.Println("用户id:", in.GetId())// 用户name是可选属性,所以要进行一下判断if in.Name != nil {fmt.Println("用户name:", in.GetName())}// 打印一下other_msg,这就涉及到对protobuf自定义any类型的反序列化// 因为是可选所以做一下处理if in.OtherMsg != nil {if err = utils.BroadcastMsg(in); err != nil {log.Println(err)}}// 打印一下时间的信息,这个涉及到对protobuf时间戳类型进行反序列化utils.BroadcastTimestamp(in)return
}
这个文件有点长啊,涉及到主要是四个部分
第一部分:假数据库
我创建一个比较虚假的数据库,并对一些用户进行了初始化,那样就可以按照id去查找用户
第二部分:创建服务器结构体并定义四个服务
重新定义server并重新定义四个方法。在写rpc服务我们可以看到,我们写的是一个类似golang接口的东西,具体服务的实现是没有定义的,那我们这里就要将其定义起来。核心流程就是打印request之中的信息、查找用户、返回用户。
那这部分的方法的框架应该如何定义呢?很简单,在grpc下的service_grpc.pb.go这个我们生成的rpc服务中,找到TestServer这个接口就行了,我生成的这个接口给大家看一下:
type TestServer interface {GetUser1(context.Context, *Request) (*User, error)GetUser2(Test_GetUser2Server) errorGetUser3(*IdGroup, Test_GetUser3Server) errorGetUser4(Test_GetUser4Server) errormustEmbedUnimplementedTestServer()
}
如果你看过这个文件就知道了,我们是基于pb.UnimplementedTestServer这个备用结构体的基础上重新创建了自己的TestServer,然后重新定义了四个服务
type TestServer struct {pb.UnimplementedTestServer
}
第三部分:工具函数
就是GenaratePbUser和Broadcast工具函数,这部分我写的不满意,主要是写的有点复杂(各个函数的功能已经写在注释里了),此外这部分本来应该和utils包合并的,但我真心不想改,同时放一下utils包中的内容(看不懂结构的可以借助最上面的项目结构)
package utilsimport ("fmt"pb "grpc_test/grpc""google.golang.org/protobuf/types/known/anypb""google.golang.org/protobuf/types/known/timestamppb"
)// 生成any类型的other_msg信息
func GenerateOtherMsg(msg string, isWeekday bool) (any *anypb.Any, err error) {if isWeekday {any, err = anypb.New(&pb.MsgFromWeekday{Msg: msg})} else {any, err = anypb.New(&pb.MsgFromWeekday{Msg: msg})}return
}// 生成timestamp信息
func GenerateTimestamp() *timestamppb.Timestamp {return timestamppb.Now()
}// 打印other_msg的函数
// 因为需要重用,所以创建一个接口
type Messager interface {GetOtherMsg() *anypb.Any
}func BroadcastMsg(in Messager) (err error) {// protobuf中的any类型还是需要转化成protobuf之中的自定义类型(需要提前生成)//这里我选择预先不知道any所代表类型的方式进行处理// 我在这里写一个other_msg.proto去定义一下Request的other_msg的具体类型var anyMsg stringm, err := in.GetOtherMsg().UnmarshalNew()if err != nil {return err}// 因为这个other_msg可能对应两种类型,所以我们要对其进行判断switch m := m.(type) {case *pb.MsgFromWeekday:anyMsg = m.GetMsg()case *pb.MsgFromWeekend:anyMsg = m.GetMsg()default:anyMsg = "不是默认类型"// err = errors.New("传入非默认类型信息")}fmt.Println("其他信息:", anyMsg)return
}// 打印时间戳的函数
// 因为需要重用,所以还是要创建接口
type TimeCarrier interface {GetTimestamp() *timestamppb.Timestamp
}func BroadcastTimestamp(in TimeCarrier) {fmt.Println("请求/回复时间:", in.GetTimestamp().AsTime().Format("2006-01-02T15:04:05 -07:00:00"))
}
多说一下utils包的中的内容,如果你看过上篇内容就知道我们使用了protobuf的any和时间戳类型,这里GenerateOtherMsg、GenerateTimestamp分别对应了对any类型、时间戳类型的序列化,BroadcastMsg、BroadcastTimestamp则对应了反序列化(重点看一下!)
第四部分:服务端创建
grpc服务端监听,官网上抄的,后续可能会在这里加一些内容
六、客户端的创建
客户端也是一个main.go的文件,照例贴一下
package mainimport ("context""fmt"pb "grpc_test/grpc""grpc_test/utils""io""log""time""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure""google.golang.org/protobuf/types/known/anypb"
)func FirstRequest(client pb.TestClient, msg *anypb.Any, id uint64, ctx context.Context) (err error) {// 该请求是发送一个请求,返回一个结果fmt.Println("开始第一个请求")req := &pb.Request{Id: id}req.OtherMsg = msgreq.Timestamp = utils.GenerateTimestamp()resp, err := client.GetUser1(ctx, req)if err != nil {log.Println("GetUser1 请求失败")return}// 简单打印一下用户名和id,otherMsg以及时间戳fmt.Println("收到反馈!")fmt.Println("用户id:", resp.Id)fmt.Println("用户名称:", resp.Name)utils.BroadcastMsg(resp)utils.BroadcastTimestamp(resp)fmt.Println("第一次请求结束!")fmt.Println("---------------")return
}func SecondRequest(client pb.TestClient, msg *anypb.Any, ids []uint64, ctx context.Context) (err error) {fmt.Println("开始第二个请求")var req = &pb.Request{}getUser2Client, err := client.GetUser2(ctx)if err != nil {log.Println("GetUser2 请求失败")return}for k, v := range ids {req.Id = vreq.OtherMsg = msgreq.Timestamp = utils.GenerateTimestamp()getUser2Client.Send(req)if k == len(ids)-1 {getUser2Client.CloseSend()break}}userGroup, err := getUser2Client.CloseAndRecv()if err != nil {log.Println("GetUser2 接受返回值失败")return}// 打印一下所有的userGroup的id和name、时间戳for _, u := range userGroup.GetUsers() {fmt.Println("收到反馈!")fmt.Printf("%v:%v\n", u.Id, u.Name)}utils.BroadcastTimestamp(userGroup)fmt.Println("第二次请求结束!")fmt.Println("---------------")return
}func ThirdRequest(client pb.TestClient, msg *anypb.Any, ids []uint64, ctx context.Context) (err error) {fmt.Println("开始第三个请求")iG := &pb.IdGroup{Ids: ids,Timestamp: utils.GenerateTimestamp(),OtherMsg: msg,}Test_GetUser3Client, err := client.GetUser3(ctx, iG)if err != nil {log.Println("GetUser3 请求失败")return}for {user, err := Test_GetUser3Client.Recv()// 首先判断一下是否完全接收if err == io.EOF {fmt.Println("所有回复接收完成")break}if err != nil {log.Println("GetUser3 接受返回值失败")return err}// 打印一下user的信息fmt.Println("收到反馈!")fmt.Println("用户id:", user.Id)fmt.Println("用户名称:", user.Name)utils.BroadcastTimestamp(user)}fmt.Println("第三次请求结束!")fmt.Println("---------------")return
}
func ForthRequest(client pb.TestClient, msg *anypb.Any, ids []uint64, ctx context.Context) (err error) {fmt.Println("开始第四个请求")var req = &pb.Request{}test_GetUser4Client, err := client.GetUser4(ctx)if err != nil {log.Println("GetUser4 请求失败")return}for i, v := range ids {// 复用上面的reqreq.Id = vreq.Timestamp = utils.GenerateTimestamp()req.OtherMsg = msgerr = test_GetUser4Client.Send(req)if err != nil {log.Println("GetUser4 发送请求失败")return}u, err := test_GetUser4Client.Recv()if err != nil {log.Println("GetUser4 接受信息失败")return err}// 打印一下返回值的信息fmt.Println("收到反馈!")fmt.Println("用户id:", u.Id)fmt.Println("用户名称:", u.Name)utils.BroadcastTimestamp(u)// 加一个关闭send的判断if i == len(ids)-1 {err = test_GetUser4Client.CloseSend()if err != nil {log.Println("关闭send失败:", err)}break}}fmt.Println("第四次请求结束!")fmt.Println("---------------")return
}func main() {// 连接服务器conn, err := grpc.Dial("localhost:996", grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {log.Fatal("dial fail: ", err)}defer conn.Close()testClient := pb.NewTestClient(conn)// 创建共用的ctxctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)defer cancel()// 因为就两个any类型的message,所以这边事先创建一下weekdayMsg, err := utils.GenerateOtherMsg("this msg is from weekday", true)if err != nil {log.Println("创建message失败", err)}weekendMsg, err := utils.GenerateOtherMsg("this msg is from weekend", false)if err != nil {log.Println("创建message失败", err)}// 开始进行任务// 请求第一个服务// 该请求是一来一回err = FirstRequest(testClient, weekdayMsg, 1, ctx)if err != nil {log.Println("FirstRequest", err)return}// 进行第二个请求// 该请求是发送多个请求,返回一个用户组err = SecondRequest(testClient, weekendMsg, []uint64{1, 2}, ctx)if err != nil {log.Println("SecondRequest", err)return}// 进行第三个请求// 发过来一个id组,流式返回用户信息err = ThirdRequest(testClient, weekdayMsg, []uint64{3, 4}, ctx)if err != nil {log.Println("ThirdRequest", err)return}// 进行第四个请求// 双方都是流式err = ForthRequest(testClient, weekendMsg, []uint64{2, 3}, ctx)if err != nil {log.Println("ForthRequest", err)return}}
客户端比较简单,但我为了清楚,还是对四个服务的请求各自写了一个函数,主要还是请求接受回复那一套,还是比较清楚的。
需要注意的是,在第四次请求中,也就是对应GetUser4服务的这个请求(如果你记性好的话,应该记得他是一个“流对流”的服务),context值一定不能是初始值(即ctx:=context.Background()),一定要有时间的限制,否则会报错
七、服务端和客户端的运行测试
期待已久的时刻开启了,现在在项目中的server和client文件下打开命令行(server和client下都是两个main.go文件),先server,后client分别输入
go run .
完美传输
本文结束,有机会上个gitee地址。