14-RPC-自研微服务框架

RPC

RPC 框架是分布式领域核心组件,也是微服务的基础。

RPC (Remote Procedure Call)全称是远程过程调用,相对于本地方法调用,在同一内存空间可以直接通过方法栈实现调用,远程调用则跨了不同的服务终端,并不能直接调用。

RPC框架 要解决的就是远程方法调用的问题,并且实现调用远程服务像调用本地服务一样简单,框架内部封装实现了网络调用的细节。

在这里插入图片描述

1. 通信协议选择

根据不同的需求来选择通信协议,UDP是不可靠传输,一般来说很少做为RPC框架的选择。

TCP和HTTP是最佳选择。

HTTP虽然有很多无用的头部信息,传输效率上会比较低,但是HTTP通用性更强,跨语言,跨平台,更易移植。

TCP可靠传输,需要自定义协议,传输效率更高,但是通用性不强。

1.1 HTTP/1.0和HTTP/1.1的区别

HTTP1.0最早在网页中使用是在1996年,那个时候只是使用一些较为简单的网页上和网络请求上,而HTTP1.1则在1999年才开始广泛应用于现在的各大浏览器网络请求中,同时HTTP1.1也是当前使用最为广泛的HTTP协议。 主要区别主要体现在:

  1. 缓存处理,在HTTP1.0中主要使用header里的If-Modified-Since,Expires来做为缓存判断的标准,HTTP1.1则引入了更多的缓存控制策略例如Entity tag,If-Unmodified-Since, If-Match, If-None-Match等更多可供选择的缓存头来控制缓存策略。
  2. 带宽优化及网络连接的使用,HTTP1.0中,存在一些浪费带宽的现象,例如客户端只是需要某个对象的一部分,而服务器却将整个对象送过来了,并且不支持断点续传功能,HTTP1.1则在请求头引入了range头域,它允许只请求资源的某个部分,即返回码是206(Partial Content),这样就方便了开发者自由的选择以便于充分利用带宽和连接。
  3. 错误通知的管理,在HTTP1.1中新增了24个错误状态响应码,如409(Conflict)表示请求的资源与资源的当前状态发生冲突;410(Gone)表示服务器上的某个资源被永久性的删除。
  4. Host头处理,在HTTP1.0中认为每台服务器都绑定一个唯一的IP地址,因此,请求消息中的URL并没有传递主机名(hostname)。但随着虚拟主机技术的发展,在一台物理服务器上可以存在多个虚拟主机(Multi-homed Web Servers),并且它们共享一个IP地址。HTTP1.1的请求消息和响应消息都应支持Host头域,且请求消息中如果没有Host头域会报告一个错误(400 Bad Request)。
  5. 长连接,HTTP 1.1支持长连接(PersistentConnection)和请求的流水线(Pipelining)处理,在一个TCP连接上可以传送多个HTTP请求和响应,减少了建立和关闭连接的消耗和延迟,在HTTP1.1中默认开启Connection: keep-alive,一定程度上弥补了HTTP1.0每次请求都要创建连接的缺点。

1.2 HTTP/1.1和HTTP/2的区别

  • 新的二进制格式(Binary Format),HTTP1.x的解析是基于文本。基于文本协议的格式解析存在天然缺陷,文本的表现形式有多样性,要做到健壮性考虑的场景必然很多,二进制则不同,只认0和1的组合。基于这种考虑HTTP2.0的协议解析决定采用二进制格式,实现方便且健壮。
  • 多路复用(MultiPlexing),即连接共享,即每一个request都是是用作连接共享机制的。一个request对应一个id,这样一个连接上可以有多个request,每个连接的request可以随机的混杂在一起,接收方可以根据request的 id将request再归属到各自不同的服务端请求里面。
  • header压缩,如上文中所言,对前面提到过HTTP1.x的header带有大量信息,而且每次都要重复发送,HTTP2.0使用encoder来减少需要传输的header大小,通讯双方各自cache一份header fields表,既避免了重复header的传输,又减小了需要传输的大小。
  • 服务端推送(server push)HTTP2.0也具有server push功能。

grpc采用了http2协议,由于http的通用性,所以现在的几乎所有的rpc框架都支持grpc

2. 序列化协议

数据在网络中传输,必须是二进制的,所以我们需要先将传输的对象进行序列化之后,才能传输。

接收方通过反序列化将数据解析出来。

序列化协议有XML、 JSON、Protobuf、Thrift 等,Golang 原生支持的 Gob 协议。

3. 编解码

如果使用TCP,我们需要定义数据传输的格式,防止在传输过程中出现的粘包,拆包等问题。

在这里插入图片描述

假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到字节数是不确定的,故可能存在以下四种情况:

  1. 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包
  2. 服务端一次接受到了两个数据包,D1和D2粘合在一起,称之为TCP粘包
  3. 服务端分两次读取到了数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这称之为TCP拆包
  4. 服务端分两次读取到了数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余部分内容D1_2和完整的D2包。

特别要注意的是,如果TCP的接受滑窗非常小,而数据包D1和D2比较大,很有可能会发生第五种情况,即服务端分多次才能将D1和D2包完全接收,期间发生多次拆包

自定义格式可以使用定长的头和不定长的体,标识数据长度即可

1字节1字节4字节1字节1字节1字节8字节不定
魔法数(Magic Number)版本(Version)消息长度(full length)消息类型(messageType)压缩类型(compress)序列化类型(serialize)请求id(requestId)请求体(body)
  • magic number : 通信双方协商的一个暗号 魔数的作用是用于服务端在接收数据时先解析出魔数做正确性对比。如果和协议中的魔数不匹配,则认为是非法数据
  • version : 不同版本的协议对应的解析方法可能是不同的,应对业务变化需求
  • full length: 记录了整个消息的长度
  • messageType:普通请求、普通响应、心跳等,根据消息类型做出不同的解析
  • compress: 序列化的字节流,还可以进行压缩,使得体积更小,在网络传输更快,不一定要使用
  • serialize:序列化方式,比如json,protostuff,glob等
  • request id:每个请求分配好请求Id,这样响应数据的时候,才能对的上
  • body:具体的数据

4. 实现

4.1 http方式

package rpcimport ("bufio""bytes""encoding/json""errors""fmt""io""net/http""net/url""strings""time"
)type MsHttpClient struct {client http.Client
}// NewHttpClient Transport请求分发,协程安全,支持连接池s
func NewHttpClient() *MsHttpClient {client := http.Client{Timeout: time.Duration(3) * time.Second,Transport: &http.Transport{MaxIdleConnsPerHost:   5,MaxConnsPerHost:       100,IdleConnTimeout:       90 * time.Second,TLSHandshakeTimeout:   10 * time.Second,ExpectContinueTimeout: 1 * time.Second,},}return &MsHttpClient{client: client}
}func (c *MsHttpClient) GetRequest(method string, url string, args map[string]any) (*http.Request, error) {if args != nil && len(args) > 0 {url = url + "?" + c.toValues(args)}req, err := http.NewRequest(method, url, nil)if err != nil {return nil, err}return req, nil
}func (c *MsHttpClient) FormRequest(method string, url string, args map[string]any) (*http.Request, error) {req, err := http.NewRequest(method, url, strings.NewReader(c.toValues(args)))if err != nil {return nil, err}return req, nil
}func (c *MsHttpClient) JsonRequest(method string, url string, args map[string]any) (*http.Request, error) {jsonStr, _ := json.Marshal(args)req, err := http.NewRequest(method, url, bytes.NewReader(jsonStr))if err != nil {return nil, err}return req, nil
}func (c *MsHttpClient) Get(url string, args map[string]any) ([]byte, error) {if args != nil && len(args) > 0 {url = url + "?" + c.toValues(args)}req, err := http.NewRequest("GET", url, nil)if err != nil {return nil, err}return c.handleResponse(req)
}func (c *MsHttpClient) PostForm(url string, args map[string]any) ([]byte, error) {req, err := http.NewRequest("POST", url, strings.NewReader(c.toValues(args)))if err != nil {return nil, err}return c.handleResponse(req)
}func (c *MsHttpClient) PostJson(url string, args map[string]any) ([]byte, error) {jsonStr, _ := json.Marshal(args)req, err := http.NewRequest("POST", url, bytes.NewReader(jsonStr))if err != nil {return nil, err}return c.handleResponse(req)
}func (c *MsHttpClient) Response(req *http.Request) ([]byte, error) {return c.handleResponse(req)
}
func (c *MsHttpClient) handleResponse(req *http.Request) ([]byte, error) {var err errorresponse, err := c.client.Do(req)if err != nil {return nil, err}if response.StatusCode != 200 {return nil, errors.New(response.Status)}buffLen := 79buff := make([]byte, buffLen)body := make([]byte, 0)reader := bufio.NewReader(response.Body)for {n, err := reader.Read(buff)if err == io.EOF || n == 0 {break}body = append(body, buff[:n]...)if n < buffLen {break}}defer response.Body.Close()if err != nil {return nil, err}return body, nil
}func (c *MsHttpClient) toValues(args map[string]any) string {if args != nil && len(args) > 0 {params := url.Values{}for k, v := range args {params.Set(k, fmt.Sprintf("%v", v))}return params.Encode()}return ""
}

ordercenter:

package mainimport ("encoding/json""fmt""github.com/mszlu521/msgo""github.com/mszlu521/msgo/rpc""net/http"
)type Result struct {Code int    `json:"code"`Msg  string `json:"msg"`Data any    `json:"data"`
}type Goods struct {Id   int64  `json:"id"`Name string `json:"name"`
}func main() {engine := msgo.Default()client := rpc.NewHttpClient()g := engine.Group("order")g.Get("/find", func(ctx *msgo.Context) {//查询商品bytes, err := client.Get("http://localhost:9002/goods/find", nil)if err != nil {ctx.Logger.Error(err)}fmt.Println(string(bytes))v := &Result{}json.Unmarshal(bytes, v)ctx.JSON(http.StatusOK, v)})engine.Run(":9003")
}

goodsCenter:

package mainimport ("github.com/mszlu521/msgo""net/http"
)type Result struct {Code int    `json:"code"`Msg  string `json:"msg"`Data any    `json:"data"`
}type Goods struct {Id   int64  `json:"id"`Name string `json:"name"`
}func main() {engine := msgo.Default()g := engine.Group("goods")g.Get("/find", func(ctx *msgo.Context) {//查询商品goods := Goods{Id: 1000, Name: "商品中心9001商品"}ctx.JSON(http.StatusOK, &Result{Code: 200, Msg: "success", Data: goods})})engine.Run(":9002")
}

4.2 改造http方式

config:

package rpcimport "strconv"type Config struct {Protocol stringHost     stringPort     intSsl      bool
}func (c Config) Url() string {switch c.Protocol {case HTTP, HTTP2:prefix := "http://"if c.Ssl {prefix = "https://"}return prefix + c.Host + ":" + strconv.FormatInt(int64(c.Port), 10)}return ""
}const (HTTP  = "HTTP"HTTP2 = "HTTP2"TCP   = "TCP"
)const (GET      = "GET"POSTForm = "POST_FORM"POSTJson = "POST_JSON"
)

rpc.go:

package rpctype MsService interface {Env() Config
}

func (c *MsHttpClient) Use(name string, s MsService) {if c.serviceMap == nil {c.serviceMap = make(map[string]MsService)}c.serviceMap[name] = s
}func (c *MsHttpClient) Do(name string, method string) MsService {s, ok := c.serviceMap[name]if !ok {panic(errors.New(name + " not exist, please action"))}t := reflect.TypeOf(s)v := reflect.ValueOf(s)if t.Kind() != reflect.Pointer {panic(errors.New("service must be pointer"))}tVar := t.Elem()vVar := v.Elem()findIndex := -1for i := 0; i < tVar.NumField(); i++ {field := tVar.Field(i)name := field.Nameif method == name {findIndex = i}}if findIndex == -1 {panic(errors.New(method + " not exist"))}requestPath := tVar.Field(findIndex).Tag.Get("msrpc")if requestPath == "" {panic(errors.New("msrpc tag not exist"))}split := strings.Split(requestPath, ",")mt := split[0]path := split[1]co := s.Env()prefix := co.Url()f := func(args map[string]any) ([]byte, error) {if mt == GET {return c.Get(prefix+path, args)}if mt == POSTForm {return c.PostForm(prefix+path, args)}if mt == POSTJson {return c.PostJson(prefix+path, args)}return nil, nil}value := reflect.ValueOf(f)vVar.Field(findIndex).Set(value)return s
}

goods:

package serviceimport ("github.com/mszlu521/msgo/rpc"
)type Goods struct {Id   int64  `json:"id"`Name string `json:"name"`
}type GoodsService struct {Find func(args map[string]any) ([]byte, error) `msrpc:"GET,/goods/find"`
}func (r *GoodsService) Env() rpc.Config {c := rpc.Config{Host:     "localhost",Port:     9002,Protocol: rpc.HTTP,}return c
}
package mainimport ("encoding/json""fmt""github.com/mszlu521/msgo""github.com/mszlu521/msgo/rpc""github.com/mszlu521/ordercenter/model""github.com/mszlu521/ordercenter/service""net/http"
)func main() {engine := msgo.Default()client := rpc.NewHttpClient()g := engine.Group("order")goodsService := &service.GoodsService{}client.Use("goodsService", goodsService)g.Get("/find", func(ctx *msgo.Context) {//查询商品v := &model.Result{}bytes, err := client.Do("goodsService", "Find").(*service.GoodsService).Find(nil)if err != nil {ctx.Logger.Error(err)}fmt.Println(string(bytes))json.Unmarshal(bytes, v)ctx.JSON(http.StatusOK, v)})engine.Run(":9003")
}

通过上述改造,我们可以比较轻易的使用框架,来实现http方式的rpc调用

记住:框架的目的是易用,但同时需要遵守规则,所以定义规则也是框架的一部分

4.3 http2(grpc)方式

有关grpc的使用可以先去看教程,教程地址

go get google.golang.org/grpc
protoc  --go_out=./ --go-grpc_out=./  .\api\goods.proto

goodscenter服务端:

syntax = "proto3";//import "google/protobuf/any.proto";option go_package="/api";package api;service GoodsApi {rpc Find(GoodsRequest) returns (GoodsResponse);
}message GoodsRequest {}message GoodsResponse {int64 Code = 1;string Msg = 2;Goods Data = 3;
}message Goods {int64 Id = 1;string Name = 2;
}
package serviceimport ("context""github.com/mszlu521/goodscenter/api"
)type GoodsApiService struct {
}func (GoodsApiService) Find(context.Context, *api.GoodsRequest) (*api.GoodsResponse, error) {goods := &api.Goods{Id: 1000, Name: "商品中心9002商品,grpc提供"}res := &api.GoodsResponse{Code: 200,Msg:  "success",Data: goods,}return res, nil
}
func (GoodsApiService) mustEmbedUnimplementedGoodsApiServer() {}

grpc服务端:

listen, _ := net.Listen("tcp", ":9111")server := grpc.NewServer()api.RegisterGoodsApiServer(server, &api.GoodsApiService{})err := server.Serve(listen)log.Println(err)

grpc客户端:

g.Get("/findGrpc", func(ctx *msgo.Context) {//查询商品var serviceHost = "127.0.0.1:9111"conn, err := grpc.Dial(serviceHost, grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {fmt.Println(err)}defer conn.Close()client := api.NewGoodsApiClient(conn)rsp, err := client.Find(context.TODO(), &api.GoodsRequest{})if err != nil {fmt.Println(err)}ctx.JSON(http.StatusOK, rsp)})
4.3.1 形成框架工具

服务端:

package rpcimport ("google.golang.org/grpc""net"
)type MsGrpcServer struct {listen     net.ListenergrpcServer *grpc.Serverregisters  []func(grpcServer *grpc.Server)ops        []grpc.ServerOption
}func NewGrpcServer(address string, ops ...MsGrpcOption) (*MsGrpcServer, error) {listen, err := net.Listen("tcp", address)if err != nil {return nil, err}ms := &MsGrpcServer{listen: listen,}for _, op := range ops {op.Apply(ms)}s := grpc.NewServer(ms.ops...)ms.grpcServer = sreturn ms, nil
}func (s *MsGrpcServer) Run() error {for _, register := range s.registers {register(s.grpcServer)}return s.grpcServer.Serve(s.listen)
}func (s *MsGrpcServer) Register(register func(grpServer *grpc.Server)) {s.registers = append(s.registers, register)
}type MsGrpcOption interface {Apply(s *MsGrpcServer)
}type DefaultGrpcOption struct {f func(s *MsGrpcServer)
}func (d DefaultGrpcOption) Apply(s *MsGrpcServer) {d.f(s)
}func WithGrpcOptions(options ...grpc.ServerOption) MsGrpcOption {return DefaultGrpcOption{f: func(s *MsGrpcServer) {s.ops = append(s.ops, options...)}}
}
   grpcServer, _ := rpc.NewGrpcServer(":9111")grpcServer.Register(func(grpServer *grpc.Server) {api.RegisterGoodsApiServer(grpServer, &api.GoodsApiService{})})err := grpcServer.Run()

type MsGrpcClient struct {Conn *grpc.ClientConn
}func NewGrpcClient(config *MsGrpcClientConfig) (*MsGrpcClient, error) {var ctx = context.Background()var dialOptions = config.dialOptionsif config.Block {//阻塞if config.DialTimeout > time.Duration(0) {var cancel context.CancelFuncctx, cancel = context.WithTimeout(ctx, config.DialTimeout)defer cancel()}dialOptions = append(dialOptions, grpc.WithBlock())}if config.KeepAlive != nil {dialOptions = append(dialOptions, grpc.WithKeepaliveParams(*config.KeepAlive))}conn, err := grpc.DialContext(ctx, config.Address, dialOptions...)if err != nil {return nil, err}return &MsGrpcClient{Conn: conn,}, nil
}type MsGrpcClientConfig struct {Address     stringBlock       boolDialTimeout time.DurationReadTimeout time.DurationDirect      boolKeepAlive   *keepalive.ClientParametersdialOptions []grpc.DialOption
}func DefaultGrpcClientConfig() *MsGrpcClientConfig {return &MsGrpcClientConfig{dialOptions: []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials()),},DialTimeout: time.Second * 3,ReadTimeout: time.Second * 2,Block:       true,}
}

4.4 TCP方式

tcp方式就需要实现序列化,编解码等操作了

序列化协议支持两种:

Protobuf 和 go的Gob协议。

4.4.1 server端

type Serializer interface {Serialize(i interface{}) ([]byte, error)Deserialize(data []byte, i interface{}) error
}
type GobSerializer struct{}func (c GobSerializer) Serialize(data any) ([]byte, error) {var buffer bytes.Bufferencoder := gob.NewEncoder(&buffer)if err := encoder.Encode(data); err != nil {return nil, err}return buffer.Bytes(), nil
}func (c GobSerializer) Deserialize(data []byte, target any) error {buffer := bytes.NewBuffer(data)decoder := gob.NewDecoder(buffer)return decoder.Decode(target)
}type MsRpcMessage struct {//头Header *Header//消息体Data any
}const mn byte = 0x1d
const version = 0x01type CompressType byteconst (Gzip CompressType = iota
)type SerializeType byteconst (Gob SerializeType = iotaProtoBuff
)type MessageType byteconst (msgRequest MessageType = iotamsgResponsemsgPingmsgPong
)type Header struct {MagicNumber   byteVersion       byteFullLength    int32MessageType   MessageTypeCompressType  CompressTypeSerializeType SerializeTypeRequestId     int64
}type MsRpcRequest struct {RequestId   int64ServiceName stringMethodName  stringArgs        []any
}type MsRpcResponse struct {RequestId     int64Code          int16Msg           stringCompressType  CompressTypeSerializeType SerializeTypeData          any
}type MsRpcServer interface {Register(name string, service interface{})Run()Stop()
}type MsTcpServer struct {listener   net.ListenerHost       stringPort       intNetwork    stringserviceMap map[string]interface{}
}type MsTcpConn struct {s       *MsTcpServerconn    net.ConnrspChan chan *MsRpcResponse
}func (c *MsTcpConn) writeHandle() {ctx := context.Background()_, cancel := context.WithTimeout(ctx, time.Duration(3)*time.Second)defer cancel()select {case rsp := <-c.rspChan://编码数据err := c.Send(c.conn, rsp)if err != nil {log.Println(err)}returncase <-ctx.Done():log.Println("超时了")return}
}func (c *MsTcpConn) Send(conn net.Conn, rsp *MsRpcResponse) error {headers := make([]byte, 17)//magic numberheaders[0] = mn//versionheaders[1] = version//full length//消息类型headers[6] = byte(msgResponse)//压缩类型headers[7] = byte(rsp.CompressType)//序列化headers[8] = byte(rsp.SerializeType)//请求idbinary.BigEndian.PutUint64(headers[9:], uint64(rsp.RequestId))serializer, err := loadSerialize(rsp.SerializeType)if err != nil {return err}body, err := serializer.Serialize(rsp)if err != nil {return err}body, err = compress(body, rsp.CompressType)if err != nil {return err}fullLen := 17 + len(body)binary.BigEndian.PutUint32(headers[2:6], uint32(fullLen))_, err = conn.Write(headers[:])if err != nil {return err}err = binary.Write(c.conn, binary.BigEndian, body[:])if err != nil {return err}log.Println("发送数据成功")return nil
}func NewTcpServer(host string, port int) *MsTcpServer {return &MsTcpServer{Host:    host,Port:    port,Network: "tcp",}
}
func (s *MsTcpServer) Register(name string, service interface{}) {if s.serviceMap == nil {s.serviceMap = make(map[string]interface{})}v := reflect.ValueOf(service)if v.Kind() != reflect.Pointer {panic(errors.New("service not pointer"))}s.serviceMap[name] = service
}
func (s *MsTcpServer) Run() {addr := fmt.Sprintf("%s:%d", s.Host, s.Port)listen, err := net.Listen(s.Network, addr)if err != nil {panic(err)}s.listener = listenfor {conn, err := s.listener.Accept()if err != nil {log.Println(err)continue}msConn := &MsTcpConn{conn: conn, rspChan: make(chan *MsRpcResponse, 1), s: s}go s.readHandle(msConn)go msConn.writeHandle()}
}func (s *MsTcpServer) readHandle(msConn *MsTcpConn) {defer func() {if err := recover(); err != nil {log.Println(err)msConn.conn.Close()}}()msg := s.decodeFrame(msConn.conn)if msg == nil {msConn.rspChan <- nilreturn}//根据请求if msg.Header.MessageType == msgRequest {req := msg.Data.(*MsRpcRequest)//查找注册的服务匹配后进行调用,调用完发送到一个channel当中service, ok := s.serviceMap[req.ServiceName]rsp := &MsRpcResponse{RequestId: req.RequestId, CompressType: msg.Header.CompressType, SerializeType: msg.Header.SerializeType}if !ok {rsp.Code = 500rsp.Msg = "no service found"msConn.rspChan <- rspreturn}v := reflect.ValueOf(service)reflectMethod := v.MethodByName(req.MethodName)args := make([]reflect.Value, len(req.Args))for i := range req.Args {args[i] = reflect.ValueOf(req.Args[i])}result := reflectMethod.Call(args)if len(result) == 0 {//无返回结果rsp.Code = 200msConn.rspChan <- rspreturn}resArgs := make([]interface{}, len(result))for i := 0; i < len(result); i++ {resArgs[i] = result[i].Interface()}var err errorif _, ok := result[len(result)-1].Interface().(error); ok {err = result[len(result)-1].Interface().(error)}if err != nil {rsp.Code = 500rsp.Msg = err.Error()}rsp.Code = 200rsp.Data = resArgs[0]msConn.rspChan <- rsplog.Println("接收数据成功")return}
}func (s *MsTcpServer) Close() {if s.listener != nil {s.listener.Close()}
}func (*MsTcpServer) decodeFrame(conn net.Conn) *MsRpcMessage {//读取数据 先读取header部分//1+1+4+1+1+1+8 = 17字节headers := make([]byte, 17)_, err := io.ReadFull(conn, headers)if err != nil {log.Println(err)return nil}//magic numbermagicNumber := headers[0]if magicNumber != mn {log.Println("magic number not valid : ", magicNumber)return nil}//versionversion := headers[1]//fullLength := headers[2:6]//mt := headers[6]messageType := MessageType(mt)//压缩类型compressType := headers[7]//序列化类型serializeType := headers[8]//请求idrequestId := headers[9:]//将body解析出来,包装成request 根据请求内容查找对应的服务,完成调用//网络调用 大端fl := int32(binary.BigEndian.Uint32(fullLength))bodyLen := fl - 17body := make([]byte, bodyLen)_, err = io.ReadFull(conn, body)log.Println("读完了")if err != nil {log.Println(err)return nil}//先解压body, err = unCompress(body, CompressType(compressType))if err != nil {log.Println(err)return nil}//反序列化serializer, err := loadSerialize(SerializeType(serializeType))if err != nil {log.Println(err)return nil}header := &Header{}header.MagicNumber = magicNumberheader.FullLength = flheader.CompressType = CompressType(compressType)header.Version = versionheader.SerializeType = SerializeType(serializeType)header.RequestId = int64(binary.BigEndian.Uint64(requestId))header.MessageType = messageTypeif messageType == msgRequest {msg := &MsRpcMessage{}msg.Header = headerreq := &MsRpcRequest{}err := serializer.Deserialize(body, req)if err != nil {log.Println(err)return nil}msg.Data = reqreturn msg}if messageType == msgResponse {msg := &MsRpcMessage{}msg.Header = headerrsp := &MsRpcResponse{}err := serializer.Deserialize(body, rsp)if err != nil {log.Println(err)return nil}msg.Data = rspreturn msg}return nil
}func loadSerialize(serializeType SerializeType) (Serializer, error) {switch serializeType {case Gob://gobs := &GobSerializer{}return s, nil}return nil, errors.New("no serializeType")
}func compress(body []byte, compressType CompressType) ([]byte, error) {switch compressType {case Gzip://return body, nil//gzip//创建一个新的 byte 输出流var buf bytes.Bufferw := gzip.NewWriter(&buf)_, err := w.Write(body)if err != nil {return nil, err}if err := w.Close(); err != nil {return nil, err}return buf.Bytes(), nil}return nil, errors.New("no compressType")
}func unCompress(body []byte, compressType CompressType) ([]byte, error) {switch compressType {case Gzip://return body, nil//gzipreader, err := gzip.NewReader(bytes.NewReader(body))defer reader.Close()if err != nil {return nil, err}buf := new(bytes.Buffer)// 从 Reader 中读取出数据if _, err := buf.ReadFrom(reader); err != nil {return nil, err}return buf.Bytes(), nil}return nil, errors.New("no compressType")
}
tcpServer := rpc.NewTcpServer("localhost", 9112)gob.Register(&model.Result{})gob.Register(&model.Goods{})tcpServer.Register("goods", &service.GoodsRpcService{})go tcpServer.Run()go engine.Run(":9002")quit := make(chan os.Signal)signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT)<-quittcpServer.Close()
package serviceimport ("github.com/mszlu521/goodscenter/model"
)type GoodsRpcService struct {
}func (*GoodsRpcService) Find(id int64) *model.Result {goods := model.Goods{Id: 1000, Name: "商品中心9002商品"}return &model.Result{Code: 200, Msg: "success", Data: goods}
}
4.4.2 client端

type MsRpcClient interface {Connect() errorInvoke(context context.Context, serviceName string, methodName string, args []any) (any, error)Close() error
}type MsTcpClient struct {conn   net.Connoption TcpClientOption
}type TcpClientOption struct {Retries           intConnectionTimeout time.DurationSerializeType     SerializeTypeCompressType      CompressTypeHost              stringPort              int
}var DefaultOption = TcpClientOption{Host:              "127.0.0.1",Port:              9112,Retries:           3,ConnectionTimeout: 5 * time.Second,SerializeType:     Gob,CompressType:      Gzip,
}func NewTcpClient(option TcpClientOption) *MsTcpClient {return &MsTcpClient{option: option}
}func (c *MsTcpClient) Connect() error {addr := fmt.Sprintf("%s:%d", c.option.Host, c.option.Port)conn, err := net.DialTimeout("tcp", addr, c.option.ConnectionTimeout)if err != nil {return err}c.conn = connreturn nil
}var reqId int64func (c *MsTcpClient) Invoke(ctx context.Context, serviceName string, methodName string, args []any) (any, error) {var cancel context.CancelFuncctx, cancel = context.WithTimeout(ctx, c.option.ConnectionTimeout)defer cancel()req := &MsRpcRequest{}req.RequestId = atomic.AddInt64(&reqId, 1)req.ServiceName = serviceNamereq.MethodName = methodNamereq.args = argsheaders := make([]byte, 17)//magic numberheaders[0] = mn//versionheaders[1] = version//full length//消息类型headers[6] = byte(msgRequest)//压缩类型headers[7] = byte(c.option.CompressType)//序列化headers[8] = byte(c.option.SerializeType)//请求idbinary.BigEndian.PutUint64(headers[9:], uint64(req.RequestId))serializer, err := loadSerialize(c.option.SerializeType)if err != nil {return nil, err}body, err := serializer.Serialize(req)if err != nil {return nil, err}body, err = compress(body, c.option.CompressType)if err != nil {return nil, err}fullLen := 17 + len(body)binary.BigEndian.PutUint32(headers[2:6], uint32(fullLen))_, err = c.conn.Write(headers[:])if err != nil {return nil, err}err = binary.Write(c.conn, binary.BigEndian, body[:])if err != nil {return nil, err}rspChan := make(chan *MsRpcResponse)go c.readHandle(rspChan)rsp := <-rspChanreturn rsp, nil
}func (c *MsTcpClient) Close() error {if c.conn != nil {return c.conn.Close()}return nil
}func (c *MsTcpClient) readHandle(rspChan chan *MsRpcResponse) {defer func() {if err := recover(); err != nil {log.Println(err)c.conn.Close()}}()for {msg := c.decodeFrame(c.conn)if msg == nil {log.Println("未解析出任何数据")rspChan <- nilreturn}//根据请求if msg.Header.MessageType == msgResponse {rsp := msg.Data.(*MsRpcResponse)rspChan <- rspreturn}}
}func (*MsTcpClient) decodeFrame(conn net.Conn) *MsRpcMessage {//读取数据 先读取header部分//1+1+4+1+1+1+8 = 17字节headers := make([]byte, 17)_, err := io.ReadFull(conn, headers)if err != nil {log.Println(err)return nil}//magic numbermagicNumber := headers[0]if magicNumber != mn {log.Println("magic number not valid : ", magicNumber)return nil}//versionversion := headers[1]//fullLength := headers[2:6]//mt := headers[6]messageType := MessageType(mt)//压缩类型compressType := headers[7]//序列化类型serializeType := headers[8]//请求idrequestId := headers[9:]//将body解析出来,包装成request 根据请求内容查找对应的服务,完成调用//网络调用 大端fl := int32(binary.BigEndian.Uint32(fullLength))bodyLen := fl - 17body := make([]byte, bodyLen)_, err = io.ReadFull(conn, body)log.Println("读完了")if err != nil {log.Println(err)return nil}//先解压body, err = unCompress(body, CompressType(compressType))if err != nil {log.Println(err)return nil}//反序列化serializer, err := loadSerialize(SerializeType(serializeType))if err != nil {log.Println(err)return nil}header := &Header{}header.MagicNumber = magicNumberheader.FullLength = flheader.CompressType = CompressType(compressType)header.Version = versionheader.SerializeType = SerializeType(serializeType)header.RequestId = int64(binary.BigEndian.Uint64(requestId))header.MessageType = messageTypeif messageType == msgRequest {msg := &MsRpcMessage{}msg.Header = headerreq := &MsRpcRequest{}err := serializer.Deserialize(body, req)if err != nil {log.Println(err)return nil}msg.Data = reqreturn msg}if messageType == msgResponse {msg := &MsRpcMessage{}msg.Header = headerrsp := &MsRpcResponse{}err := serializer.Deserialize(body, rsp)if err != nil {log.Println(err)return nil}msg.Data = rspreturn msg}return nil
}type MsTcpClientProxy struct {client *MsTcpClientoption TcpClientOption
}func NewMsTcpClientProxy(option TcpClientOption) *MsTcpClientProxy {return &MsTcpClientProxy{option: option}
}func (p *MsTcpClientProxy) Call(ctx context.Context, serviceName string, methodName string, args []any) (any, error) {client := NewTcpClient(p.option)p.client = clienterr := client.Connect()if err != nil {return nil, err}for i := 0; i < p.option.Retries; i++ {result, err := client.Invoke(ctx, serviceName, methodName, args)if err != nil {if i >= p.option.Retries-1 {log.Println(errors.New("already retry all time"))client.Close()return nil, err}continue}client.Close()return result, nil}return nil, errors.New("retry time is 0")
}
g.Get("/findTcp", func(ctx *msgo.Context) {//查询商品gob.Register(&model.Result{})gob.Register(&model.Goods{})args := make([]any, 1)args[0] = 1result, err := proxy.Call(context.Background(), "goods", "Find", args)if err != nil {panic(err)}ctx.JSON(http.StatusOK, result)})
4.4.3 protobuf序列化支持

type ProtobufSerializer struct{}func (c ProtobufSerializer) Serialize(data any) ([]byte, error) {marshal, err := proto.Marshal(data.(proto.Message))if err != nil {return nil, err}return marshal, nil
}func (c ProtobufSerializer) Deserialize(data []byte, target any) error {message := target.(proto.Message)return proto.Unmarshal(data, message)
}
protoc  --go_out=./ --go-grpc_out=./  .\rpc\tcp.proto 
syntax = "proto3";import "google/protobuf/struct.proto";option go_package="/rpc";package rpc;message Request {int64 RequestId = 1;string ServiceName = 2;string MethodName = 3;repeated google.protobuf.Value Args = 4;
}message Response {int64 RequestId = 1;int32 Code = 2;string Msg = 3;int32 CompressType = 4;int32 SerializeType = 5;google.protobuf.Value Data = 6;
}
package rpcimport ("bytes""compress/gzip""context""encoding/binary""encoding/gob""encoding/json""errors""fmt""google.golang.org/protobuf/proto""google.golang.org/protobuf/types/known/structpb""io""log""net""reflect""sync/atomic""time"
)type Serializer interface {Serialize(i interface{}) ([]byte, error)Deserialize(data []byte, i interface{}) error
}
type GobSerializer struct{}func (c GobSerializer) Serialize(data any) ([]byte, error) {var buffer bytes.Bufferencoder := gob.NewEncoder(&buffer)if err := encoder.Encode(data); err != nil {return nil, err}return buffer.Bytes(), nil
}func (c GobSerializer) Deserialize(data []byte, target any) error {buffer := bytes.NewBuffer(data)decoder := gob.NewDecoder(buffer)return decoder.Decode(target)
}type ProtobufSerializer struct{}func (c ProtobufSerializer) Serialize(data any) ([]byte, error) {marshal, err := proto.Marshal(data.(proto.Message))if err != nil {return nil, err}return marshal, nil
}func (c ProtobufSerializer) Deserialize(data []byte, target any) error {message := target.(proto.Message)return proto.Unmarshal(data, message)
}type MsRpcMessage struct {//头Header *Header//消息体Data any
}const mn byte = 0x1d
const version = 0x01type CompressType byteconst (Gzip CompressType = iota
)type SerializeType byteconst (Gob SerializeType = iotaProtoBuff
)type MessageType byteconst (msgRequest MessageType = iotamsgResponsemsgPingmsgPong
)type Header struct {MagicNumber   byteVersion       byteFullLength    int32MessageType   MessageTypeCompressType  CompressTypeSerializeType SerializeTypeRequestId     int64
}type MsRpcRequest struct {RequestId   int64ServiceName stringMethodName  stringArgs        []any
}type MsRpcResponse struct {RequestId     int64Code          int16Msg           stringCompressType  CompressTypeSerializeType SerializeTypeData          any
}type MsRpcServer interface {Register(name string, service interface{})Run()Stop()
}type MsTcpServer struct {listener   net.ListenerHost       stringPort       intNetwork    stringserviceMap map[string]interface{}
}type MsTcpConn struct {s       *MsTcpServerconn    net.ConnrspChan chan *MsRpcResponse
}func (c *MsTcpConn) writeHandle() {ctx := context.Background()_, cancel := context.WithTimeout(ctx, time.Duration(3)*time.Second)defer cancel()select {case rsp := <-c.rspChan://编码数据err := c.Send(c.conn, rsp)if err != nil {log.Println(err)}returncase <-ctx.Done():log.Println("超时了")return}
}func (c *MsTcpConn) Send(conn net.Conn, rsp *MsRpcResponse) error {headers := make([]byte, 17)//magic numberheaders[0] = mn//versionheaders[1] = version//full length//消息类型headers[6] = byte(msgResponse)//压缩类型headers[7] = byte(rsp.CompressType)//序列化headers[8] = byte(rsp.SerializeType)//请求idbinary.BigEndian.PutUint64(headers[9:], uint64(rsp.RequestId))serializer, err := loadSerialize(SerializeType(rsp.SerializeType))if err != nil {return err}var body []byteif ProtoBuff == rsp.SerializeType {pRsp := &Response{}pRsp.SerializeType = int32(rsp.SerializeType)pRsp.CompressType = int32(rsp.CompressType)pRsp.Code = int32(rsp.Code)pRsp.Msg = rsp.MsgpRsp.RequestId = rsp.RequestId//value, err := structpb.//	log.Println(err)m := make(map[string]any)marshal, _ := json.Marshal(rsp.Data)_ = json.Unmarshal(marshal, &m)value, err := structpb.NewStruct(m)log.Println(err)pRsp.Data = structpb.NewStructValue(value)body, err = serializer.Serialize(pRsp)} else {body, err = serializer.Serialize(rsp)}if err != nil {return err}body, err = compress(body, CompressType(rsp.CompressType))if err != nil {return err}fullLen := 17 + len(body)binary.BigEndian.PutUint32(headers[2:6], uint32(fullLen))_, err = conn.Write(headers[:])if err != nil {return err}err = binary.Write(c.conn, binary.BigEndian, body[:])if err != nil {return err}log.Println("发送数据成功")return nil
}func NewTcpServer(host string, port int) *MsTcpServer {return &MsTcpServer{Host:    host,Port:    port,Network: "tcp",}
}
func (s *MsTcpServer) Register(name string, service interface{}) {if s.serviceMap == nil {s.serviceMap = make(map[string]interface{})}v := reflect.ValueOf(service)if v.Kind() != reflect.Pointer {panic(errors.New("service not pointer"))}s.serviceMap[name] = service
}
func (s *MsTcpServer) Run() {addr := fmt.Sprintf("%s:%d", s.Host, s.Port)listen, err := net.Listen(s.Network, addr)if err != nil {panic(err)}s.listener = listenfor {conn, err := s.listener.Accept()if err != nil {log.Println(err)continue}msConn := &MsTcpConn{conn: conn, rspChan: make(chan *MsRpcResponse, 1), s: s}go s.readHandle(msConn)go msConn.writeHandle()}
}func (s *MsTcpServer) readHandle(msConn *MsTcpConn) {defer func() {if err := recover(); err != nil {log.Println(err)msConn.conn.Close()}}()msg := s.decodeFrame(msConn.conn)if msg == nil {msConn.rspChan <- nilreturn}//根据请求if msg.Header.MessageType == msgRequest {req := msg.Data.(*Request)//查找注册的服务匹配后进行调用,调用完发送到一个channel当中service, ok := s.serviceMap[req.ServiceName]rsp := &MsRpcResponse{RequestId: req.RequestId, CompressType: msg.Header.CompressType, SerializeType: msg.Header.SerializeType}if !ok {rsp.Code = 500rsp.Msg = "no service found"msConn.rspChan <- rspreturn}v := reflect.ValueOf(service)reflectMethod := v.MethodByName(req.MethodName)args := make([]reflect.Value, len(req.Args))for i := range req.Args {of := reflect.ValueOf(req.Args[i].AsInterface())of = of.Convert(reflectMethod.Type().In(i))args[i] = of}result := reflectMethod.Call(args)if len(result) == 0 {//无返回结果rsp.Code = 200msConn.rspChan <- rspreturn}resArgs := make([]interface{}, len(result))for i := 0; i < len(result); i++ {resArgs[i] = result[i].Interface()}var err errorif _, ok := result[len(result)-1].Interface().(error); ok {err = result[len(result)-1].Interface().(error)}if err != nil {rsp.Code = 500rsp.Msg = err.Error()}rsp.Code = 200rsp.Data = resArgs[0]msConn.rspChan <- rsplog.Println("接收数据成功")return}
}func (s *MsTcpServer) Close() {if s.listener != nil {s.listener.Close()}
}func (*MsTcpServer) decodeFrame(conn net.Conn) *MsRpcMessage {//读取数据 先读取header部分//1+1+4+1+1+1+8 = 17字节headers := make([]byte, 17)_, err := io.ReadFull(conn, headers)if err != nil {log.Println(err)return nil}//magic numbermagicNumber := headers[0]if magicNumber != mn {log.Println("magic number not valid : ", magicNumber)return nil}//versionversion := headers[1]//fullLength := headers[2:6]//mt := headers[6]messageType := MessageType(mt)//压缩类型compressType := headers[7]//序列化类型serializeType := headers[8]//请求idrequestId := headers[9:]//将body解析出来,包装成request 根据请求内容查找对应的服务,完成调用//网络调用 大端fl := int32(binary.BigEndian.Uint32(fullLength))bodyLen := fl - 17body := make([]byte, bodyLen)_, err = io.ReadFull(conn, body)log.Println("读完了")if err != nil {log.Println(err)return nil}//先解压body, err = unCompress(body, CompressType(compressType))if err != nil {log.Println(err)return nil}//反序列化serializer, err := loadSerialize(SerializeType(serializeType))if err != nil {log.Println(err)return nil}header := &Header{}header.MagicNumber = magicNumberheader.FullLength = flheader.CompressType = CompressType(compressType)header.Version = versionheader.SerializeType = SerializeType(serializeType)header.RequestId = int64(binary.BigEndian.Uint64(requestId))header.MessageType = messageTypeif messageType == msgRequest {msg := &MsRpcMessage{}msg.Header = headerif ProtoBuff == SerializeType(serializeType) {req := &Request{}err := serializer.Deserialize(body, req)if err != nil {log.Println(err)return nil}msg.Data = req} else {req := &MsRpcRequest{}err := serializer.Deserialize(body, req)if err != nil {log.Println(err)return nil}msg.Data = req}return msg}if messageType == msgResponse {msg := &MsRpcMessage{}msg.Header = headerif ProtoBuff == SerializeType(serializeType) {rsp := &Response{}err := serializer.Deserialize(body, rsp)if err != nil {log.Println(err)return nil}msg.Data = rsp} else {rsp := &MsRpcResponse{}err := serializer.Deserialize(body, rsp)if err != nil {log.Println(err)return nil}msg.Data = rsp}return msg}return nil
}func loadSerialize(serializeType SerializeType) (Serializer, error) {switch serializeType {case Gob://gobs := &GobSerializer{}return s, nilcase ProtoBuff:s := &ProtobufSerializer{}return s, nil}return nil, errors.New("no serializeType")
}func compress(body []byte, compressType CompressType) ([]byte, error) {switch compressType {case Gzip://return body, nil//gzip//创建一个新的 byte 输出流var buf bytes.Bufferw := gzip.NewWriter(&buf)_, err := w.Write(body)if err != nil {return nil, err}if err := w.Close(); err != nil {return nil, err}return buf.Bytes(), nil}return nil, errors.New("no compressType")
}func unCompress(body []byte, compressType CompressType) ([]byte, error) {switch compressType {case Gzip://return body, nil//gzipreader, err := gzip.NewReader(bytes.NewReader(body))defer reader.Close()if err != nil {return nil, err}buf := new(bytes.Buffer)// 从 Reader 中读取出数据if _, err := buf.ReadFrom(reader); err != nil {return nil, err}return buf.Bytes(), nil}return nil, errors.New("no compressType")
}type MsRpcClient interface {Connect() errorInvoke(context context.Context, serviceName string, methodName string, args []any) (any, error)Close() error
}type MsTcpClient struct {conn   net.Connoption TcpClientOption
}type TcpClientOption struct {Retries           intConnectionTimeout time.DurationSerializeType     SerializeTypeCompressType      CompressTypeHost              stringPort              int
}var DefaultOption = TcpClientOption{Host:              "127.0.0.1",Port:              9112,Retries:           3,ConnectionTimeout: 5 * time.Second,SerializeType:     Gob,CompressType:      Gzip,
}func NewTcpClient(option TcpClientOption) *MsTcpClient {return &MsTcpClient{option: option}
}func (c *MsTcpClient) Connect() error {addr := fmt.Sprintf("%s:%d", c.option.Host, c.option.Port)conn, err := net.DialTimeout("tcp", addr, c.option.ConnectionTimeout)if err != nil {return err}c.conn = connreturn nil
}var reqId int64func (c *MsTcpClient) Invoke(ctx context.Context, serviceName string, methodName string, args []any) (any, error) {var cancel context.CancelFuncctx, cancel = context.WithTimeout(ctx, c.option.ConnectionTimeout)defer cancel()req := &MsRpcRequest{}req.RequestId = atomic.AddInt64(&reqId, 1)req.ServiceName = serviceNamereq.MethodName = methodNamereq.Args = argsheaders := make([]byte, 17)//magic numberheaders[0] = mn//versionheaders[1] = version//full length//消息类型headers[6] = byte(msgRequest)//压缩类型headers[7] = byte(c.option.CompressType)//序列化headers[8] = byte(c.option.SerializeType)//请求idbinary.BigEndian.PutUint64(headers[9:], uint64(req.RequestId))serializer, err := loadSerialize(c.option.SerializeType)if err != nil {return nil, err}var body []byteif ProtoBuff == c.option.SerializeType {pReq := &Request{}pReq.RequestId = atomic.AddInt64(&reqId, 1)pReq.ServiceName = serviceNamepReq.MethodName = methodNamelist, err := structpb.NewList(args)log.Println(err)pReq.Args = list.Valuesbody, err = serializer.Serialize(pReq)} else {body, err = serializer.Serialize(req)}fmt.Println(body)if err != nil {return nil, err}log.Println(body)body, err = compress(body, c.option.CompressType)if err != nil {return nil, err}fullLen := 17 + len(body)binary.BigEndian.PutUint32(headers[2:6], uint32(fullLen))_, err = c.conn.Write(headers[:])if err != nil {return nil, err}log.Println(body)log.Println("len:", len(body))err = binary.Write(c.conn, binary.BigEndian, body[:])if err != nil {return nil, err}rspChan := make(chan *MsRpcResponse)go c.readHandle(rspChan)rsp := <-rspChanreturn rsp, nil
}func (c *MsTcpClient) Close() error {if c.conn != nil {return c.conn.Close()}return nil
}func (c *MsTcpClient) readHandle(rspChan chan *MsRpcResponse) {defer func() {if err := recover(); err != nil {log.Println(err)c.conn.Close()}}()for {msg := c.decodeFrame(c.conn)if msg == nil {log.Println("未解析出任何数据")rspChan <- nilreturn}//根据请求if msg.Header.MessageType == msgResponse {if msg.Header.SerializeType == ProtoBuff {rsp := msg.Data.(*Response)asInterface := rsp.Data.AsInterface()marshal, _ := json.Marshal(asInterface)rsp1 := &MsRpcResponse{}json.Unmarshal(marshal, rsp1)rspChan <- rsp1} else {rsp := msg.Data.(*MsRpcResponse)rspChan <- rsp}return}}
}func (*MsTcpClient) decodeFrame(conn net.Conn) *MsRpcMessage {//读取数据 先读取header部分//1+1+4+1+1+1+8 = 17字节headers := make([]byte, 17)_, err := io.ReadFull(conn, headers)if err != nil {log.Println(err)return nil}//magic numbermagicNumber := headers[0]if magicNumber != mn {log.Println("magic number not valid : ", magicNumber)return nil}//versionversion := headers[1]//fullLength := headers[2:6]//mt := headers[6]messageType := MessageType(mt)//压缩类型compressType := headers[7]//序列化类型serializeType := headers[8]//请求idrequestId := headers[9:]//将body解析出来,包装成request 根据请求内容查找对应的服务,完成调用//网络调用 大端fl := int32(binary.BigEndian.Uint32(fullLength))bodyLen := fl - 17body := make([]byte, bodyLen)_, err = io.ReadFull(conn, body)log.Println("读完了")if err != nil {log.Println(err)return nil}//先解压body, err = unCompress(body, CompressType(compressType))if err != nil {log.Println(err)return nil}//反序列化serializer, err := loadSerialize(SerializeType(serializeType))if err != nil {log.Println(err)return nil}header := &Header{}header.MagicNumber = magicNumberheader.FullLength = flheader.CompressType = CompressType(compressType)header.Version = versionheader.SerializeType = SerializeType(serializeType)header.RequestId = int64(binary.BigEndian.Uint64(requestId))header.MessageType = messageTypeif messageType == msgRequest {msg := &MsRpcMessage{}msg.Header = headerif ProtoBuff == SerializeType(serializeType) {req := &Request{}err := serializer.Deserialize(body, req)if err != nil {log.Println(err)return nil}msg.Data = req} else {req := &MsRpcRequest{}err := serializer.Deserialize(body, req)if err != nil {log.Println(err)return nil}msg.Data = req}return msg}if messageType == msgResponse {msg := &MsRpcMessage{}msg.Header = headerif ProtoBuff == SerializeType(serializeType) {rsp := &Response{}err := serializer.Deserialize(body, rsp)if err != nil {log.Println(err)return nil}msg.Data = rsp} else {rsp := &MsRpcResponse{}err := serializer.Deserialize(body, rsp)if err != nil {log.Println(err)return nil}msg.Data = rsp}return msg}return nil
}type MsTcpClientProxy struct {client *MsTcpClientoption TcpClientOption
}func NewMsTcpClientProxy(option TcpClientOption) *MsTcpClientProxy {return &MsTcpClientProxy{option: option}
}func (p *MsTcpClientProxy) Call(ctx context.Context, serviceName string, methodName string, args []any) (any, error) {client := NewTcpClient(p.option)p.client = clienterr := client.Connect()if err != nil {return nil, err}for i := 0; i < p.option.Retries; i++ {result, err := client.Invoke(ctx, serviceName, methodName, args)if err != nil {if i >= p.option.Retries-1 {log.Println(errors.New("already retry all time"))client.Close()return nil, err}continue}client.Close()return result, nil}return nil, errors.New("retry time is 0")
}

对rpc做了初步实现,属于简单实现,并没有处理更为复杂的心跳,超时,连接管理等,需要大家自行去完善
ize(SerializeType(serializeType))
if err != nil {
log.Println(err)
return nil
}
header := &Header{}
header.MagicNumber = magicNumber
header.FullLength = fl
header.CompressType = CompressType(compressType)
header.Version = version
header.SerializeType = SerializeType(serializeType)
header.RequestId = int64(binary.BigEndian.Uint64(requestId))
header.MessageType = messageType

if messageType == msgRequest {msg := &MsRpcMessage{}msg.Header = headerif ProtoBuff == SerializeType(serializeType) {req := &Request{}err := serializer.Deserialize(body, req)if err != nil {log.Println(err)return nil}msg.Data = req} else {req := &MsRpcRequest{}err := serializer.Deserialize(body, req)if err != nil {log.Println(err)return nil}msg.Data = req}return msg
}
if messageType == msgResponse {msg := &MsRpcMessage{}msg.Header = headerif ProtoBuff == SerializeType(serializeType) {rsp := &Response{}err := serializer.Deserialize(body, rsp)if err != nil {log.Println(err)return nil}msg.Data = rsp} else {rsp := &MsRpcResponse{}err := serializer.Deserialize(body, rsp)if err != nil {log.Println(err)return nil}msg.Data = rsp}return msg
}
return nil

}

type MsTcpClientProxy struct {
client *MsTcpClient
option TcpClientOption
}

func NewMsTcpClientProxy(option TcpClientOption) *MsTcpClientProxy {
return &MsTcpClientProxy{option: option}
}

func (p *MsTcpClientProxy) Call(ctx context.Context, serviceName string, methodName string, args []any) (any, error) {
client := NewTcpClient(p.option)
p.client = client
err := client.Connect()
if err != nil {
return nil, err
}
for i := 0; i < p.option.Retries; i++ {
result, err := client.Invoke(ctx, serviceName, methodName, args)
if err != nil {
if i >= p.option.Retries-1 {
log.Println(errors.New(“already retry all time”))
client.Close()
return nil, err
}
continue
}
client.Close()
return result, nil
}
return nil, errors.New(“retry time is 0”)
}


> 对rpc做了初步实现,属于简单实现,并没有处理更为复杂的心跳,超时,连接管理等,需要大家自行去完善

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/508508.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【大厂AI课学习笔记NO.64】机器学习开发框架

机器学习开发框架本质上是一种编程库或工具&#xff0c;目的是能够让开发人员更容易、更快速地构建机器学习模型。 机器学习开发框架封装了大量的可重用代码&#xff0c;可以直接调用&#xff0c;目的是避免“重复造轮子’大幅降低开发人员的开发难度&#xff0c;提高开发效率…

mongodb 图形界面工具 -- Studio 3T(下载、安装、连接mongodb数据库)

目录 mongodb 图形界面工具 -- Studio 3T下载安装第一次使用&#xff1a;注册添加一个连接&#xff08;连接 mongodb 数据库&#xff09;1、点击【添加新连接】&#xff0c;选择【手动配置我的连接设置】2、对 Server 设置连接数据3、连接的用户认证设置&#xff08;创建数据库…

某大型制造企业数字化转型规划方案(附下载)

目录 一、项目背景和目标 二、业务现状 1. 总体应用现状 2. 各模块业务问题 2.1 设计 2.2 仿真 2.3 制造 2.4 服务 2.5 管理 三、业务需求及预期效果 1. 总体业务需求 2. 各模块业务需求 2.1 设计 2.2 仿真 2.3 制造 2.4 服务 2.5 管理 四、…

代码随想录刷题笔记 DAY 35 | 无重叠区间 No.435 | 划分字母区间 No.763 | 合并区间 No.56

文章目录 Day 3501. 无重叠区间&#xff08;No. 435&#xff09;<1> 题目<2> 笔记<3> 代码 02. 划分字母区间&#xff08;No. 763&#xff09;<1> 题目<2> 笔记<3> 代码 03. 合并区间&#xff08;No. 56&#xff09;<1> 题目<2&g…

UE5中实现后处理深度描边

后处理深度描边可以通过取得边缘深度变化大的区域进行描边&#xff0c;一方面可以用来做角色的等距内描边&#xff0c;避免了菲尼尔边缘光不整齐的问题&#xff0c;另一方面可以结合场景扫描等特效使用&#xff0c;达到更丰富的效果&#xff1a; 后来解决了开启TAA十字线和锯齿…

华为配置攻击检测功能示例

配置攻击检测功能示例 组网图形 图1 配置攻击检测功能示例组网图 业务需求组网需求数据规划配置思路配置注意事项操作步骤配置文件 业务需求 企业用户通过WLAN接入网络&#xff0c;以满足移动办公的最基本需求。且在覆盖区域内移动发生漫游时&#xff0c;不影响用户的业务使用。…

Vue3:使用 Composition API 不需要 Pinia

在 Vue.js 开发的动态环境中&#xff0c;在单个组件中处理复杂的业务逻辑可能会导致笨重的文件和维护噩梦。虽然 Pinia 提供集中式状态管理&#xff0c;但仅依赖它来处理复杂的业务逻辑可能会导致代码混乱。本文探讨了使用 Composition API 的替代方法&#xff0c;说明开发人员…

DAP-Link DIY复刻指南

DAP-Link DIY复刻指南 文章目录 DAP-Link DIY复刻指南1. 概述2. 获取工程资源2.1 工具安装2.2 源码拉取2.3 硬件资源获取 3. 工程下载验证3.1 下载bootload3.2 下载 APP3.3 修改IO配置 4. 验证4.1 虚拟串口验证4.2 Keil 无法识别 DAPLink&#xff1f;4.3 keil 可以识别DAPLink但…

Vue2+ElementUI列表、表格组件的封装

Vue2ElementUI列表组件的封装&#xff1a;引言 在日常开发中&#xff0c;我们经常会遇到需要展示列表数据的场景。ElementUI 提供的 el-table 组件是一个功能强大的表格组件&#xff0c;可以满足大部分的需求。但是&#xff0c;在实际应用中&#xff0c;我们往往需要根据业务需…

Java基础 - 7 - 常用API(三)

API&#xff08;全称 Application Programming Interface&#xff1a;应用程序编程接口&#xff09; API就是Java帮我们已经写好的一些程序&#xff0c;如类、方法等&#xff0c;可以直接拿过来用 JDK8 API文档&#xff1a;Java Platform SE 8 一. JDK8之前传统的日期、时间 …

并行和并发的区别

并行和并发的区别是并行指的是多个任务在同一时间点上同时执行&#xff0c;而并发指的是多个任务在同一时间段内交替执行。并行需要多个处理器或者多核处理器&#xff0c;每个任务都有独立的资源&#xff0c;不会互相干扰。并发可以在单核或者多核处理器上实现&#xff0c;多个…

【c++】继承深度解剖

> 作者简介&#xff1a;დ旧言~&#xff0c;目前大二&#xff0c;现在学习Java&#xff0c;c&#xff0c;c&#xff0c;Python等 > 座右铭&#xff1a;松树千年终是朽&#xff0c;槿花一日自为荣。 > 目标&#xff1a;了解什么事继承&#xff0c;基类和派生类的使用和…