gRPC 服务注册实现以及原理讲解

一、注册中心

为了高可用,生产环境我们的服务都是以集群的方式对外提供服务,集群的ip随时可能变化,比如重启,发布,扩容等。我们也就需要用一本 “通讯录”去保存和更新服务的节点,而这个通讯录就是注册中心,注册中心在整个微服务架构中是必要的。既然是微服务机构,那么必然有客户端和服务端。对于注册中心来说,服务端启动完成的时候注册服务,客户端就是订阅服务。

 

二、服务注册

在服务提供方启动的时候,应该通过某种形式(比如调用API、产生上线事件消息、在Etcd ,Zookeeper,数据库)把自己服务的信息通知给服务注册中心,注册中心将这个服务节点的IP和接口保存下来

三、服务订阅

在服务调用方启动的时候,去注册中心查找并订阅服务提供方的IP,然后缓存在缓存。其中这个过程是把一个服务标识(服务名)转化服务实际位置(ip地址)的过程。我们不仅需要根据客户端去查找服务而起我们也需要监听注册中心发生的事件(update,delete)更新本地的服务列表

四、gRPC的服务注册和发现

1、注册中心的接口

注册中心我们采用Etcd,我们实现了 Registry 就实现了注册中心,Register 服务注册,UnRegister 服务的取消注册,这两个方法是服务端用的。ListServices 根据 serviceName 到注册中心获取服务,Subscribe 是注册中心发生事件通知服务端更新服务列表,这两个方法是客户端用的

type Registry interface {Register(ctx context.Context, si ServiceInstance) errorUnRegister(ctx context.Context, si ServiceInstance) errorListServices(ctx context.Context, serviceName string) ([]ServiceInstance, error)Subscribe(serviceName string) (<-chan Event, error)io.Closer
}

 

2、保存到Etcd注册中心的数据字段
type ServiceInstance struct {Name    stringAddress stringWeight  int32Group   string
}
​
type Event struct {
}
3、实现注册定义的接口
type Registry struct {c      *clientv3.Client //etcdsess   *concurrency.Session  //租约 canels []func()mutex  sync.Mutex
}
​
func NewRegistry(c *clientv3.Client) (*Registry, error) {session, err := concurrency.NewSession(c)if err != nil {return nil, err}return &Registry{c:    c,sess: session,}, nil
}
​
func (r *Registry) Register(ctx context.Context, si registry.ServiceInstance) error {marshal, err := json.Marshal(si) if err != nil {return err}// 写入到etcd,并设置租约,客户端就可以自动续约_, err = r.c.Put(ctx, r.instanceKey(si), string(marshal), clientv3.WithLease(r.sess.Lease()))if err != nil {return err}return nil
}
​
func (r *Registry) UnRegister(ctx context.Context, si registry.ServiceInstance) error {_, err := r.c.Delete(ctx, r.instanceKey(si))if err != nil {return err}return nil
}
​
func (r *Registry) ListServices(ctx context.Context, serviceName string) ([]registry.ServiceInstance, error) {val, err := r.c.Get(ctx, r.ServiceKey(serviceName), clientv3.WithPrefix())if err != nil {return nil, err}res := make([]registry.ServiceInstance, 0, len(val.Kvs))for _, v := range val.Kvs {var t registry.ServiceInstanceerr := json.Unmarshal(v.Value, &t)if err != nil {continue}res = append(res, t)}return res, nil
}
​
func (r *Registry) Subscribe(serviceName string) (<-chan registry.Event, error) {ctx, cancel := context.WithCancel(context.Background())r.mutex.Lock()r.canels = append(r.canels, cancel)r.mutex.Unlock()defer cancel()ctx = clientv3.WithRequireLeader(ctx)re := r.c.Watch(ctx, r.ServiceKey(serviceName), clientv3.WithPrefix())res := make(chan registry.Event)go func() {for {select {case t := <-re:if t.Err() != nil {return}if t.Canceled {return}for range t.Events {res <- registry.Event{}}case <-ctx.Done():return}}}()return res, nil
}
​
func (r *Registry) Close() error {err := r.sess.Close()r.mutex.Lock()cancels := r.canelsr.canels = nilr.mutex.Unlock()for _, cancel := range cancels {cancel()}return err
}
​
func (r *Registry) instanceKey(si registry.ServiceInstance) string {s := strings.Builder{}s.WriteString("/micro")s.WriteString("/" + si.Name)s.WriteString("/" + si.Address)return s.String()
}
​
func (r *Registry) ServiceKey(servername string) string {s := strings.Builder{}s.WriteString("/micro")s.WriteString("/" + servername)return s.String()
}
五、服务端服务注册

服务端服务注册是在启动服务的时候进行注册的,所以我们对server 进行了封装

type usermsg struct {gen.UnimplementedUserMesServiceServer
}
​
func (u usermsg) GetById(ctx context.Context, req *gen.GetByIdReq) (*gen.GetByIDResp, error) {return &gen.GetByIDResp{User: &gen.User{Id:   1,Name: "test",},}, nil
}
​
func TestUsermsg(t *testing.T) {us := &usermsg{}client, err2 := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379",},})assert.NoError(t, err2)registry, err2 := etcd.NewRegistry(client)assert.NoError(t, err2)newServer, err2 := micro.NewServer("user-server",micro.WithServerRegistryOption(registry),micro.WithRegisterTimeOut(10*time.Second),)assert.NoError(t, err2)gen.RegisterUserMesServiceServer(newServer.Server, us)//server newServer.Start("localhost:8082")
}
type Server struct {name            stringregister        registry.RegistryregisterTimeout time.Duration*grpc.Server//权重weight int32
​group string
}
​
type ServerOption func(server *Server)
​
func WithServerRegistryOption(registry registry.Registry) ServerOption {return func(server *Server) {server.register = registry}
}
​
func WithRegisterTimeOut(timeout time.Duration) ServerOption {return func(server *Server) {server.registerTimeout = timeout}
}
​
func WithRegisterWeight(weight int32) ServerOption {return func(server *Server) {server.weight = weight}
}
​
func WithRegisterGroup(group string) ServerOption {return func(server *Server) {server.group = group}
}
​
func NewServer(name string, opts ...ServerOption) (*Server, error) {t := &Server{name:   name,Server: grpc.NewServer(),}for _, o := range opts {o(t)}return t, nil
}
​
func (s *Server) Start(addr string) error {listen, err := net.Listen("tcp", addr)if err != nil {return err}if s.register != nil {ctx, cancel := context.WithTimeout(context.Background(), s.registerTimeout)defer cancel()err := s.register.Register(ctx, registry.ServiceInstance{Name:    s.name,Address: listen.Addr().String(),Weight:  s.weight,Group:   s.group,})if err != nil {return err}}err = s.Serve(listen)return err
}
​
func (s *Server) Close() error {if s.register != nil {err := s.register.Close()if err != nil {return err}}s.GracefulStop()return nil
}
六、服务的发现

gRPC提供了自定义的Resolver的能力实现服务发现的,所以我们只要实现 Resolver 就实现了客户端的服务发现,自定义的Resolver需要实现Builder接口。gRPC 对接口定义如下。

先说下 Scheme() 方法的作用,该方法返回一个stirng。注册的 Resolver 会被保存在一个全局的变量m中,m是一个map,这个map的key即为 Scheme() 方法返回的字符串。也就是多个Resolver是通过Scheme来进行区分的,所以我们定义 Resolver 的时候 Scheme 不要重复,否则 Resolver 就会被覆盖。

// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)Scheme() string
}

那么我们实现Build 方法就可以完成客户端服务发现,代码如下:

type Builder struct {registry registry.Registrytimeout  time.Durationclose    chan struct{}
}
​
type itemConfigTimeout func(builder *Builder)
​
func WithItemConfigTimeout(timeout time.Duration) itemConfigTimeout {return func(builder *Builder) {builder.timeout = timeout}
}
​
func NewRegistryBuilder(r registry.Registry, opts ...itemConfigTimeout) (*Builder, error) {t := &Builder{registry: r,timeout:  10 * time.Second,close:    make(chan struct{}),}for _, opt := range opts {opt(t)}return t, nil
}
​
func (b *Builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {re := &Resolver{t:         target,registry:  b.registry,cc:        cc,timeout:   b.timeout,close:     b.close,closeOnce: &sync.Once{},}re.ResolveNow(resolver.ResolveNowOptions{})go re.watch()return re, nil
}
​
func (b *Builder) Scheme() string {return "registry"
}
​
type Resolver struct {t         resolver.Targetregistry  registry.Registrycc        resolver.ClientConntimeout   time.Durationclose     chan struct{}closeOnce *sync.Once
}
​
func (r *Resolver) ResolveNow(options resolver.ResolveNowOptions) {r.resolve()
}
​
func (r *Resolver) watch() error {subscribe, err := r.registry.Subscribe(r.t.Endpoint())if err != nil {return err}for {select {case <-subscribe:r.resolve()case <-r.close:return nil}}
}
​
func (r *Resolver) resolve() {ctx, cancel := context.WithTimeout(context.Background(), r.timeout)defer cancel()servicesInstance, err := r.registry.ListServices(ctx, r.t.Endpoint())if err != nil {r.cc.ReportError(err)}re := make([]resolver.Address, 0, len(servicesInstance))for _, add := range servicesInstance {re = append(re, resolver.Address{Addr: add.Address, Attributes: weight})}state := resolver.State{Addresses: re}err = r.cc.UpdateState(state)if err != nil {r.cc.ReportError(err)}
}
​
func (r *Resolver) Close() {r.closeOnce.Do(func() {close(r.close)})
}
七、client 端与Build 结合实现服务发现,完成一次gRPC 请求
client, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379",},
})
assert.NoError(t, err)
registry, err3 := etcd.NewRegistry(client)
assert.NoError(t, err3)
builder, err2 := NewRegistryBuilder(registry, WithItemConfigTimeout(20*time.Second))
assert.NoError(t, err2)
dial, err := grpc.Dial("registry://user-server", grpc.WithInsecure(),grpc.WithResolvers(builder),
)
assert.NoError(t, err)
clien := gen.NewUserMesServiceClient(dial)
ctx, cancel := context.WithTimeout(context.Background(), 10000*time.Second)
defer cancel()
ctx = context.WithValue(ctx, "group", "A")
res, err := clien.GetById(ctx, &gen.GetByIdReq{Id: 12323})
assert.NoError(t, err)
fmt.Println(res)
八、gRPC 是怎么实现的,深入底层源码讲解
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {//这册一个http2.0的调用,此时并没有调用,而是注册一个回掉函数,等到找到了Resolver的时候完成回掉发送一次http2.0 的请求cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID)//根据 Scheme 找到对应的 resolver,如果多个resolver,他们的Scheme相同就可能被覆盖if err := cc.parseTargetAndFindResolver(); err != nil {return nil, err}// 通过找到 parseTargetAndFindResolver找到resolver 完成方法调用,最终发送一次http2.0 的请求最终if err := cc.exitIdleMode(); err != nil {return nil, err}
}

未完待续......

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/209301.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用Python的turtle模块创建一幅哆啦A梦

1.1引言&#xff1a; 在Python中&#xff0c;turtle模块是一个非常有趣且强大的工具&#xff0c;它允许我们以一个可视化和互动的方式学习编程。通过调用各种命令&#xff0c;我们可以引导turtle画出一个指定的图形。在本博客中&#xff0c;我们将使用turtle模块来绘制一幅哆啦…

气候变化和人类活动对中国植被固碳的贡献量化数据月度合成产品

简介&#xff1a; 气候变化和人类活动对中国植被固碳的贡献量化数据月度合成产品包括中国2001~2018年地表短波波段反照率、植被光合有效辐射吸收比、叶面积指数、森林覆盖度和非森林植被覆盖度、地表温度、地表净辐射、地表蒸散发、地上部分自养呼吸、地下部分自养呼吸、总初级…

ts实现合并数组对象中key相同的数据

背景 在平常的业务中&#xff0c;后端同学会返回以下类似的结构数据 // 后端返回的数据结构 [{ id: 1, product_id: 1, pid_name: "Asia", name: "HKG01" },{ id: 2, product_id: 1, pid_name: "Asia", name: "SH01" },{ id: 3, pro…

2023亚太杯数学建模C题思路代码 - 我国新能源电动汽车的发展趋势

1 赛题 问题C 我国新能源电动汽车的发展趋势 新能源汽车是指以先进技术原理、新技术、新结构的非常规汽车燃料为动力来源( 非常规汽车燃料指汽油、柴油以外的燃料&#xff09;&#xff0c;将先进技术进行汽车动力控制和驱动相结 合的汽车。新能源汽车主要包括四种类型&#x…

华大基因认知障碍基因检测服务,助力认知障碍疾病防控

认知障碍是一种严重的神经系统疾病&#xff0c;对人类的脑健康产生了重大影响。据报告显示&#xff0c;在我国65岁以上的人群中&#xff0c;存在轻度认知障碍的患者约为3,800万&#xff0c;而中重度痴呆患者则约为1,500万&#xff0c;患病人口数量庞大。这种疾病不仅会对患者的…

电线电缆行业生产管理怎么数字化?

行业介绍 随着市场环境的变化和现代生产管理理念的不断更新&#xff0c;电缆的生产模式也在发生转变&#xff0c;批量小&#xff0c;规格多&#xff0c;交期短的新型制造需求逐年上升&#xff0c;所以企业车间管理的重要性越发凸显&#xff0c;作为企业良性运营的关键&#xf…

div中添加el-loading(局部loading的使用)

效果&#xff1a;在div中实现el-loading <div class"content-main">{{ hotList }}</div>getHotList(columnType) {this.$nextTick(() > {var loading this.$loading({lock: true,text: "努力加载中...",spinner: "el-icon-loading&qu…

中年人怎么发展?持续发展?

现在ai这么火&#xff0c;就像当年的xxx&#xff0c;如果没有抓住&#xff0c;会xxx吗&#xff1f; 为了ai&#xff0c;多学学python也是也是好的啊。 在学习之余&#xff0c;还是想做做自媒体的。不求马上赚到钱。我的想法是&#xff0c;现在每天下班回家都是刷刷抖音&#…

记录一次因内存不足而导致hiveserver2和namenode进程宕机的排查

背景 最近发现集群主节点总有进程宕机&#xff0c;定位了大半天才找到原因&#xff0c;分享一下 排查过程 查询hiveserver2和namenode日志&#xff0c;都是正常的&#xff0c;突然日志就不记录了&#xff0c;直到我重启之后又恢复工作了。 排查各种日志都是正常的&#xff0…

geemap学习笔记012:如何搜索Earth Engine Python脚本

前言 本节主要是介绍如何查询Earth Engine中已经集成好的Python脚本案例。 1 导入库 !pip install geemap #安装geemap库 import ee import geemap2 搜索Earth Engine Python脚本 很简单&#xff0c;只需要一行代码。 geemap.ee_search()使用方法 后记 大家如果有问题需…

AI写代码 可以代替人工吗?

近年AI技术非常火热&#xff0c;有人就说&#xff0c;用AI写代码程序员不就都得下岗吗&#xff1f;对此我的回答是否定的&#xff0c;因为AI虽然已经有了编写代码的能力&#xff0c;但它现在的水平大多还仅限于根据业务需求搭建框架&#xff0c;而具体的功能实现还尚且稚嫩&…

字符串和内存函数(2)

文章目录 2.13 memcpy2.14 memmove2.15 memcmp2.16 memset 2.13 memcpy void* memcpy(void* destination, const void* source, size_t num); 函数memcpy从source的位置开始向后复制num个字节的数据到destination的内存位置。这个函数在遇到 ‘\0’ 的时候并不会停下来。如果so…