从零开始实现分布式服务系统

文章目录

  • 开发前言
  • 分布式模型
  • 系统图解
  • 注册中心模块
  • 基础服务模块
  • 被依赖的服务模块(日志服务)
  • 服务模块(访问服务)
  • 运行效果
  • 开发总结

开发前言

分布式系统具有高可靠性、高性能、可扩展性、灵活性、数据共享、可靠性和地理分布等优点,使得其在各种应用场景下都具有巨大的优势,当然分布式系统实现复杂度要高于单体系统🫠

项目代码使用纯粹的Go语言标准库实现,不借用任何其它第三方库😁

我是醉墨居士,废话不多说,我们现在开始吧🤗

分布式模型

  • Hub & Spoke模型
  • 优点:集中管理,安全性,降低成本
  • 缺点:单点故障,延迟,有限的扩展性

在这里插入图片描述

  • Peer to Peer模型
  • 优点:去中心化,高度可扩展性,资源共享
  • 缺点:管理复杂性,安全性,性能问题

在这里插入图片描述

  • Message Queues模型
  • 优点:解耦合,异步处理,可靠性
  • 缺点:系统复杂度,准确性,消息顺序

在这里插入图片描述

我们将要开发的分布式系统将会取其精华,去其糟粕,使用采用上述模型的混合模式😎

系统图解

在这里插入图片描述

注册中心模块

  • 注册信息
package registryimport ("encoding/json""fmt""io""strings"
)type Registration struct {ServiceName stringServiceAddr stringRequiredServices []string
}func buildRegistration(reader io.ReadCloser) (*Registration, error) {defer reader.Close()data, err := io.ReadAll(reader)if err != nil {return nil, err}registration := new(Registration)err = json.Unmarshal(data, registration)if err != nil {return nil, err}return registration, nil
}func buildServiceInfo(reader io.ReadCloser) ([]string, error) {defer reader.Close()data, err := io.ReadAll(reader)if err != nil {return nil, err}parts := strings.SplitN(string(data), " ", 2)if len(parts) != 2 {return nil, fmt.Errorf("Parse service failed with length %d", len(parts))}return parts, nil
}
  • 注册信息表
package registryimport ("bytes""encoding/json""fmt""io""log""math/rand""net/http""sync"
)type serviceTable struct {serviceInfos map[string][]*Registrationlock *sync.RWMutex
}func newServiceTable() *serviceTable {return &serviceTable{serviceInfos: make(map[string][]*Registration),lock: new(sync.RWMutex),}
}func (t *serviceTable) parseServiceInfos(reader io.ReadCloser) (err error){data, err := io.ReadAll(reader)if err != nil {return err}defer func() {err = reader.Close()}()t.lock.Lock()defer t.lock.Unlock()err = json.Unmarshal(data, &t.serviceInfos)return
}func (t *serviceTable) buildRequiredServiceInfos(registration *Registration) map[string][]*Registration {m := make(map[string][]*Registration, len(registration.RequiredServices))t.lock.RLock()defer t.lock.RUnlock()for _, serviceName := range registration.RequiredServices {m[serviceName] = t.serviceInfos[serviceName]}return m
}func (t *serviceTable) notify(method string, registration *Registration) error {if method != http.MethodPost && method != http.MethodDelete {fmt.Println(method, method == http.MethodPost, method == http.MethodDelete)return fmt.Errorf("Method not allowed with method: %s", method)}t.lock.RLock()defer t.lock.RUnlock()data, err := json.Marshal(registration)if err != nil {return err}for _, registrations := range t.serviceInfos {for _, reg := range registrations {for _, requiredServiceName := range reg.RequiredServices {if requiredServiceName == registration.ServiceName {req, err := http.NewRequest(method, "http://" + reg.ServiceAddr + "/services", bytes.NewReader(data))if err != nil {continue}log.Println("update url: ", reg.ServiceAddr + "/services")http.DefaultClient.Do(req)}}}}return nil
}func (t *serviceTable) add(registration *Registration) {t.lock.Lock()defer t.lock.Unlock()log.Printf("Service table add %s with address %s\n", registration.ServiceName, registration.ServiceAddr)if registrations, ok := t.serviceInfos[registration.ServiceName]; ok {registrations = append(registrations, registration)} else {t.serviceInfos[registration.ServiceName] = []*Registration{registration}}
}func (t *serviceTable) remove(registration *Registration) {t.lock.Lock()defer t.lock.Unlock()log.Printf("Service table remove %s with address %s\n", registration.ServiceName, registration.ServiceAddr)if registrations, ok := t.serviceInfos[registration.ServiceName]; ok {for i := len(registrations) - 1; i >= 0; i-- {if registrations[i].ServiceAddr == registration.ServiceAddr {registrations = append(registrations[:i], registrations[i+1:]...)}}}
}func (t *serviceTable) get(serviceName string) *Registration {t.lock.RLock()defer t.lock.RUnlock()regs := t.serviceInfos[serviceName]return regs[rand.Intn(len(regs))]
}
  • 注册服务
package registryimport ("encoding/json""net/http""time"
)const (serviceName = "Registry Service"serviceAddr = "127.0.0.1:20000"
)type RegistryService struct {serviceInfos *serviceTableheartBeatWorkerNumber intheartBeatAttempCount intheartBeatAttempDuration time.DurationheartBeatCheckDuration time.Duration
}func Default() *RegistryService {return New(3, 3, time.Second, 30 * time.Second)
}func New(heartBeatWorkerNumber, heartBeatAttempCount int, heartBeatAttempDuration, heartBeatCheckDuration time.Duration) *RegistryService {return &RegistryService{serviceInfos: newServiceTable(),heartBeatWorkerNumber: heartBeatWorkerNumber,heartBeatAttempCount: heartBeatAttempCount,heartBeatAttempDuration: heartBeatAttempDuration,heartBeatCheckDuration: heartBeatCheckDuration,}
}func (s *RegistryService) Run() error {go s.heartBeat()http.HandleFunc("/services", func(w http.ResponseWriter, r *http.Request) {statusCode := http.StatusOKswitch r.Method {case http.MethodPost:registration, err := buildRegistration(r.Body)if err != nil {statusCode = http.StatusInternalServerErrorgoto END}err = s.regist(registration)if err != nil {statusCode = http.StatusInternalServerErrorgoto END}serviceInfos := s.serviceInfos.buildRequiredServiceInfos(registration)data, err := json.Marshal(&serviceInfos)if err != nil {statusCode = http.StatusInternalServerErrorgoto END}defer w.Write(data)case http.MethodDelete:registration, err := buildRegistration(r.Body)if err != nil {statusCode = http.StatusInternalServerErrorgoto END}s.unregist(registration)if err != nil {statusCode = http.StatusInternalServerErrorgoto END}default:statusCode = http.StatusMethodNotAllowedgoto END}END:w.WriteHeader(statusCode)})return http.ListenAndServe(serviceAddr, nil)
}func (s *RegistryService) heartBeat() {channel := make(chan *Registration, 1)for i := 0; i < s.heartBeatWorkerNumber; i++ {go func() {for reg := range channel {for j := 0; j < s.heartBeatAttempCount; j++ {resp, err := http.Get("http://" + reg.ServiceAddr + "/heart-beat")if err == nil && resp.StatusCode == http.StatusOK {goto NEXT}time.Sleep(s.heartBeatAttempDuration)}s.unregist(reg)NEXT:}}()}for {s.serviceInfos.lock.RLock()for _, registrations := range s.serviceInfos.serviceInfos {for i := len(registrations) - 1; i >= 0; i-- {channel <- registrations[i]}}s.serviceInfos.lock.RUnlock()time.Sleep(s.heartBeatCheckDuration)}
}func (s *RegistryService) regist(registration *Registration) error {s.serviceInfos.add(registration)return s.serviceInfos.notify(http.MethodPost, registration)
}func (s *RegistryService) unregist(registration *Registration) error {s.serviceInfos.remove(registration)return s.serviceInfos.notify(http.MethodDelete, registration)
}
  • 注册服务客户端接口
package registryimport ("bytes""encoding/json""fmt""net/http"
)func registerMonitorHandler() {http.HandleFunc("/services", func(w http.ResponseWriter, r *http.Request) {defer r.Body.Close()switch r.Method {case http.MethodPost:registration, err := buildRegistration(r.Body)if err != nil {w.WriteHeader(http.StatusInternalServerError)return}provider.add(registration)fmt.Printf("add service %s\n", registration.ServiceName)case http.MethodDelete:registration, err := buildRegistration(r.Body)if err != nil {w.WriteHeader(http.StatusInternalServerError)return}provider.remove(registration)fmt.Printf("remove service %s\n", registration.ServiceName)default:w.WriteHeader(http.StatusMethodNotAllowed)return}w.WriteHeader(http.StatusOK)})http.HandleFunc("/heart-beat", func(w http.ResponseWriter, r *http.Request) {w.WriteHeader(http.StatusOK)})
}func RegistService(registration *Registration) error {registerMonitorHandler()data, err := json.Marshal(registration)if err != nil {return err}resp, err := http.Post("http://" + serviceAddr + "/services", "application/json", bytes.NewReader(data))if err != nil {return err}if resp.StatusCode != http.StatusOK {return fmt.Errorf("Regist %s error with code %d", registration.ServiceName, resp.StatusCode)}err = provider.parseServiceInfos(resp.Body)if err != nil {return err}return nil
}func UnregistService(registration *Registration) error {data, err := json.Marshal(registration)if err != nil {return err}req, err := http.NewRequest(http.MethodDelete, "http://" + serviceAddr + "/services", bytes.NewReader(data))if err != nil {return err}resp, err := http.DefaultClient.Do(req)if err != nil {return err}if resp.StatusCode != http.StatusOK {return fmt.Errorf("Unregist %s error with code %d", registration.ServiceName, resp.StatusCode)}return nil
}var provider = newServiceTable()func Get(serviceName string) *Registration {return provider.get(serviceName)
}
  • 服务入口
package mainimport ("log""services/registry"
)func main() {registryService := registry.Default()err := registryService.Run()if err != nil {log.Fatalln(err)}
}

基础服务模块

package serviceimport ("context""fmt""net/http""services/registry"
)type Service interface {Init()
}func Run(registration *registry.Registration) (err error) {err = registry.RegistService(registration)if err != nil {return err}defer func() {err = registry.UnregistService(registration)}()srv := http.Server{Addr: registration.ServiceAddr}go func() {fmt.Println("Press any key to stop.")var s stringfmt.Scan(&s)srv.Shutdown(context.Background())}()err = srv.ListenAndServe()if err != nil {return err}return nil
}

被依赖的服务模块(日志服务)

  • 业务服务
package logserviceimport ("io""log""net/http""os"
)type logService struct {destination stringlogger *log.Logger
}func Init(destination string) {s := &logService{destination: destination,}s.logger = log.New(s, "Go:", log.Ltime | log.Lshortfile)s.register()
}func (s *logService)Write(data []byte) (int, error) {file, err := os.OpenFile(s.destination, os.O_CREATE | os.O_APPEND | os.O_WRONLY, 0600)if err != nil {return 0, err}defer file.Close()return file.Write(data)
}func (s *logService)register() {http.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) {if r.Method != http.MethodPost {w.WriteHeader(http.StatusMethodNotAllowed)return}data, err := io.ReadAll(r.Body)if err != nil || len(data) == 0 {w.WriteHeader(http.StatusBadRequest)return}s.logger.Println(string(data))})
}
  • 客户端接口
package logserviceimport ("bytes""fmt""net/http""services/registry"
)func Println(registration *registry.Registration, s string) error {resp, err := http.Post("http://"+registration.ServiceAddr+"/log", "text/plain", bytes.NewReader([]byte(s)))if err != nil {return err}if resp.StatusCode != http.StatusOK {return fmt.Errorf("Response Error with code: %d", resp.StatusCode)}return nil
}
  • 服务入口
package mainimport ("log""services/logservice""services/registry""services/service"
)func main() {logservice.Init("./services.log")err := service.Run(&registry.Registration{ServiceName:      "LogService",ServiceAddr:      "127.0.0.1:20002",RequiredServices: make([]string, 0),})if err != nil {log.Fatalln(err)}
}

服务模块(访问服务)

  • 业务服务
package visistserviceimport ("log""net/http""services/logservice""services/registry""strconv""sync/atomic"
)type visistService struct {visistCount atomic.Int32
}func Init() {s := &visistService{visistCount: atomic.Int32{},}s.register()
}func (s *visistService) register() {http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {s.visistCount.Add(1)count := strconv.Itoa(int(s.visistCount.Load()))err := logservice.Println(registry.Get("LogService"), count)if err != nil {w.WriteHeader(http.StatusInternalServerError)log.Printf("Log service println error: %s\n", err)return}w.WriteHeader(http.StatusOK)w.Write([]byte(count))})
}
  • 服务入口
package mainimport ("log""services/registry""services/service""services/visistservice"
)func main() {visistservice.Init()err := service.Run(&registry.Registration{ServiceName:      "VisistService",ServiceAddr:      "127.0.0.1:20003",RequiredServices: []string{"LogService"},})if err != nil {log.Fatalln(err)}
}

运行效果

依次运行注册服务,日志服务,浏览服务
在这里插入图片描述
运行完毕之后,访问http://127.0.0.1:20003,返回访问量
在这里插入图片描述

日志记录对应访问量数据
在这里插入图片描述
这里只是用了一个简单的示例,你可以使用这套基础组件,然后让服务变得更加复杂,更加丰富。

开发总结

恭喜你,我们一起完成了简易分布式系统的开发,麻雀虽小,五脏俱全😉
希望这个项目能让你有所收获😊
如果有什么错误,请你评论区或者私信我指出,让我们一起进步✌️

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/284660.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一款开源免费美观的WinForm UI控件库 - ReaLTaiizor

前言 今天推荐一款基于MIT license开源、免费、美观的.NET WinForm UI控件库&#xff1a;ReaLTaiizor。 什么是WinForm&#xff1f; WinForm是一个传统的桌面应用程序框架&#xff0c;它基于 Windows 操作系统的原生控件和窗体。通过简单易用的 API&#xff0c;开发者可以快速…

pycharm通过ssh连接远程服务器的docker容器进行运行和调试代码

pycharm连接远程服务器的docker容器通常有两种方法&#xff1a; 第一种&#xff1a;pycharm通过ssh连接已在运行中的docker容器 第二种&#xff1a;pycharm连接docker镜像&#xff0c;pycharm运行代码再自动创建容器 第一种方法比较通用简单&#xff0c;作者比较推崇。 条件…

频谱论文:基于张量Tucker分解的频谱地图构建算法

#频谱# [1]陈智博,胡景明,张邦宁 郭道省.(2023).基于张量Tucker分解的频谱地图构建算法.电子与信息学报(11),4161-4169. &#xff08;陆军工程大学&#xff09; 研究内容 将动态电磁环境的时变频谱地图建模为3维频谱张量&#xff0c;通过张量Tucker分解提取出具有物理意义的核…

Vue 自定义搜索输入框SearchInput

效果如下&#xff1a; 组件代码 <template><div class"search-input flex flex-space-between flex-center-cz"><input type"text" v-model"value" :ref"inpuName" :placeholder"placeholder" keyup.enter&…

双向链表原来是这样实现的!

文章目录 前言1. 双向链表的结构2. 双链表的定义和结构3. 定义结构体(ListNode)2.创建返回链表的头结点CreateList函数实现: 3.初始化双向链表ListCreate定义函数&#xff1a;实现函数&#xff1a; 4. 双向链表打印(ListPrint)定义函数&#xff1a;实现函数&#xff1a; 5. 尾插…

Python---多任务的介绍

1. 提问 利用现学知识能够让两个函数或者方法同时执行吗? 不能&#xff0c;因为之前所写的程序都是单任务的&#xff0c;也就是说一个函数或者方法执行完成另外一个函数或者方法才能执行&#xff0c;要想实现这种操作就需要使用多任务。 多任务的最大好处是充分利用CPU资源&…

算法学习——栈与队列

栈与队列 栈与队列理论基础用栈实现队列思路代码 用队列实现栈思路代码 删除字符串中的所有相邻重复项思路代码 有效的括号思路代码 逆波兰表达式求值思路代码 滑动窗口最大值思路代码未完待续 前 K 个高频元素思路代码拓展 总结栈在系统中的应用括号匹配问题字符串去重问题逆波…

redis之五种基本数据类型

redis存储任何类型的数据都是以key-value形式保存&#xff0c;并且所有的key都是字符串&#xff0c;所以讨论基础数据结构都是基于value的数据类型 常见的5种数据类型是&#xff1a;String、List、Set、Zset、Hash 一) 字符串(String) String是redis最基本的类型&#xff0c;v…

【SpringBoot篇】基于布隆过滤器,缓存空值,解决缓存穿透问题 (商铺查询时可用)

文章目录 &#x1f354;什么是缓存穿透&#x1f384;解决办法⭐缓存空值处理&#x1f388;优点&#x1f388;缺点&#x1f38d;代码实现 ⭐布隆过滤器&#x1f38d;代码实现 &#x1f354;什么是缓存穿透 缓存穿透是指在使用缓存机制时&#xff0c;大量的请求无法从缓存中获取…

docker-compose介绍和用法

docker-compose介绍和用法详解 1、docker-compose介绍2、docker-compose build3、docker-compose down4、docker-compose up -d 1、docker-compose介绍 Docker Compose是一个用于快速配置多个Docker容器的工具。它是一个定义和运行多容器的Docker应用工具&#xff0c;通过YAML…

设计模式-GOF对各个模式的定义

以下内容是对设计模式之父GOF的著作《设计模式——可复用面向对象软件的基础》定义的摘抄 1 抽象工厂 意图 提供一个接口以创建一系列相关或相互依赖的对象&#xff0c;而无须指定它们具体的类。 适用性 在以下情况下使用抽象工厂模式&#xff1a; 一个系统要独立于它的产…

docker基本命令

1.docker命令图解 2. 从仓库拉取镜像 #下载最新版 docker pull nginx # 镜像名:版本名&#xff08;标签&#xff09; docker pull nginx:1.20.1docker rmi 镜像名:版本号/镜像id3. 容器启动及停止 docker run [OPTIONS] IMAGE [COMMAND] [ARG...] docker run [设置项] 镜…