Golang 搭建 WebSocket 应用(八) - 完整代码

本文应该是本系列文章最后一篇了,前面留下的一些坑可能后面会再补充一下,但不在本系列文章中了。

整体架构

再来回顾一下我们的整体架构:

在这里插入图片描述

在我们的 demo 中,包含了以下几种角色:

  1. 客户端:一般是浏览器,用于接收消息;
  2. Hub:消息中心,用于管理所有的客户端连接,以及将消息推送给客户端;
  3. 调用 /send 发送消息的应用:用于将消息发送给 Hub,然后由 Hub 将消息推送给客户端。

然后,每一个 WebSocket 连接都有一个关联的读协程和写协程,
用于读取客户端发送的消息,以及将消息推送给客户端。

目录结构

├── LICENSE  // 协议
├── Makefile // 一些常用的命令
├── README.md
├── authenticator.go      // 认证器
├── authenticator_test.go // 认证器测试
├── bytes.go // 字符串和 []byte 之间转换的辅助方法
├── client.go // WebSocket 客户端
├── go.mod    // 项目依赖
├── go.sum    // 项目依赖
├── hub.go    // 消息中心
├── main.go   // 程序入口
├── message   // 消息记录器
│   ├── db_logger.go
│   ├── db_logger_test.go
│   ├── log.go
│   └── stdout_logger.go
├── server.go // HTTP 服务
└── server_test.go // HTTP 接口的测试

运行

注:需要 Go 1.20 或以上版本

  1. 下载依赖:

可以使用七牛云的代理加速下载。

go mod tidy
  1. 启动 WebSocket 服务端:
go run main.go

Hub 代码

最终,我们的 Hub 代码演进成了下面这样:

// bufferSize 通道缓冲区、map 初始化大小
const bufferSize = 128// Handler 错误处理函数
type Handler func(log message.Log, err error)// Hub 维护了所有的客户端连接
type Hub struct {// 注册请求register chan *Client// 取消注册请求unregister chan *Client// 记录 uid 跟 client 的对应关系userClients map[string]*Client// 互斥锁,保护 userClients 以及 clients 的读写sync.RWMutex// 消息记录器messageLogger message.Logger// 错误处理器errorHandler Handler// 验证器authenticator Authenticator// 等待发送的消息数量pending atomic.Int64
}// 默认的错误处理器
func defaultErrorHandler(log message.Log, err error) {res, _ := json.Marshal(log)fmt.Printf("send message: %s, error: %s\n", string(res), err.Error())
}func newHub() *Hub {return &Hub{register:      make(chan *Client),unregister:    make(chan *Client),userClients:   make(map[string]*Client, bufferSize),RWMutex:       sync.RWMutex{},messageLogger: &message.StdoutMessageLogger{},errorHandler:  defaultErrorHandler,authenticator: &JWTAuthenticator{},}
}// 注册、取消注册请求处理
func (h *Hub) run() {for {select {case client := <-h.register:h.Lock()h.userClients[client.uid] = clienth.Unlock()case client := <-h.unregister:h.Lock()close(client.send)delete(h.userClients, client.uid)h.Unlock()}}
}// 返回 Hub 的当前的关键指标
func metrics(hub *Hub, w http.ResponseWriter) {pending := hub.pending.Load()connections := len(hub.userClients)_, _ = w.Write([]byte(fmt.Sprintf("# HELP connections 连接数\n# TYPE connections gauge\nconnections %d\n", connections)))_, _ = w.Write([]byte(fmt.Sprintf("# HELP pending 等待发送的消息数量\n# TYPE pending gauge\npending %d\n", pending)))
}

其中:

  • Hub 中的 registerunregister 通道用于处理客户端的注册和取消注册请求;
  • Hub 中的 userClients 用于记录 uidClient 的对应关系;
  • Hub 中的 messageLogger 用于记录消息;
  • Hub 中的 errorHandler 用于处理错误;
  • Hub 中的 authenticator 用于验证客户端的身份;
  • Hub 中的 pending 用于记录等待发送的消息数量。

目前实现存在的问题:

  • registerunregister 通道被消费的时候需要加锁,这样会导致 registerunregister 变成串行的,性能不好;
  • userClients 也是需要加锁的,这样会导致 userClients 的读写也是串行的,性能不好;

对于这两个问题,前面我们讨论过,一种可行的办法分段 map,然后对每一个 map 都有一个对应的 sync.Mutex 互斥锁来保证其读写的安全。

Client 代码

Client 比较关键的方法是:

  • writePump:负责将消息推送给客户端。
  • serveWs:处理 WebSocket 连接请求。
  • send:处理消息发送请求。

writePump

这个方法会从 send 通道中获取消息,然后推送给客户端。
推送失败会调用 errorHandler 处理错误。
推送成功会将 pending 减一。

// writePump 负责推送消息给 WebSocket 客户端
//
// 该方法在一个独立的协程中运行,我们保证了每个连接只有一个 writer。
// Client 会从 send 请求中获取消息,然后在这个方法中推送给客户端。
func (c *Client) writePump() {defer func() {_ = c.conn.Close()}()// 从 send 通道中获取消息,然后推送给客户端for {messageLog, ok := <-c.send// 设置写超时时间_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))// c.send 这个通道已经被关闭了if !ok {c.hub.pending.Add(int64(-1 * len(c.send)))return}if err := c.conn.WriteMessage(websocket.TextMessage, StringToBytes(messageLog.Message)); err != nil {c.hub.errorHandler(messageLog, err)c.hub.pending.Add(int64(-1 * len(c.send)))return}c.hub.pending.Add(int64(-1))}
}

serveWs

serveWs 方法会处理 WebSocket 连接请求,然后将其注册到 Hub 中。
在连接的时候会对客户端进行认证,认证失败会断开连接。
最后会启动读写协程。

// serveWs 处理 WebSocket 连接请求
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {// 升级为 WebSocket 连接conn, err := upgrader.Upgrade(w, r, nil)if err != nil {w.WriteHeader(http.StatusBadRequest)_, _ = w.Write([]byte(fmt.Sprintf("upgrade error: %s", err.Error())))return}// 认证失败的时候,返回错误信息,并断开连接uid, err := hub.authenticator.Authenticate(r)if err != nil {_ = conn.SetWriteDeadline(time.Now().Add(time.Second))_ = conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("authenticate error: %s", err.Error())))_ = conn.Close()return}// 注册 Clientclient := &Client{hub: hub, conn: conn, send: make(chan message.Log, bufferSize), uid: uid}client.conn.SetCloseHandler(closeHandler)// register 无缓冲,下面这一行会阻塞,直到 hub.run 中的 <-h.register 语句执行// 这样可以保证 register 成功之后才会启动读写协程client.hub.register <- client// 启动读写协程go client.writePump()go client.readPump()
}

send

send 是一个 http 接口,用于处理消息发送请求。
它会从 Hub 中获取 uid 对应的 Client,然后将消息发送给客户端。

// send 处理消息发送请求
func send(hub *Hub, w http.ResponseWriter, r *http.Request) {uid := r.FormValue("uid")if uid == "" {w.WriteHeader(http.StatusBadRequest)_, _ = w.Write([]byte("uid is required"))return}// 从 hub 中获取 uid 关联的 clienthub.RLock()client, ok := hub.userClients[uid]hub.RUnlock()if !ok {w.WriteHeader(http.StatusBadRequest)_, _ = w.Write([]byte(fmt.Sprintf("client not found: %s", uid)))return}// 记录消息messageLog := message.Log{Uid: uid, Message: r.FormValue("message")}_ = hub.messageLogger.Log(messageLog)// 发送消息client.send <- messageLog// 增加等待发送的消息数量hub.pending.Add(int64(1))
}

github

完整代码可以在 github 上进行查看:https://github.com/eleven26/go-pusher

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/419153.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

XSS漏洞:利用多次提交技巧实现存储型XSS

目录 搭建环境 XSS攻击 测试 xss系列往期文章&#xff1a; 初识XSS漏洞-CSDN博客 利用XSS漏洞打cookie-CSDN博客 XSS漏洞&#xff1a;xss-labs靶场通关-CSDN博客 XSS漏洞&#xff1a;prompt.mi靶场通关-CSDN博客 XSS漏洞&#xff1a;xss.haozi.me靶场通关-CSDN博客 本…

磁盘分区机制

lsblk查看分区 Linux分区 挂载的经典案例 1. 虚拟机增加磁盘 点击这里&#xff0c;看我的这篇文章操作 添加之后&#xff0c;需要重启系统&#xff0c;不重启在系统里看不到新硬盘哦 出来了&#xff0c;但还没有分区 2. 分区 还没有格式化 3. 格式化磁盘 4. 挂载 5. 卸载…

汇编语言----X86汇编指令

目录 1.汇编指令的构成 2.X86架构CPU中包含的寄存器 3.常见的x86汇编指令 &#xff08;1&#xff09;算数运算 &#xff08;2&#xff09;逻辑运算 &#xff08;3&#xff09;其他 4.AT&T格式 5.选择语句&#xff08;分支结构&#xff09; 6.循环语句 &#xff0…

huggingface学习 | 云服务器使用hf_hub_download下载huggingface上的模型文件

系列文章目录 huggingface学习 | 云服务器使用git-lfs下载huggingface上的模型文件 文章目录 系列文章目录一、hf_hub_download介绍二、找到需要下载的huggingface文件三、准备工作及下载过程四、全部代码 一、hf_hub_download介绍 hf_hub_download是huggingface官方支持&…

Linux中的共享内存

定义&#xff1a; 共享内存允许两个或者多个进程共享物理内存的同一块区域&#xff08;通常被称为段&#xff09;。由于一个共享内存段会称为一个进程用户空间的一部分&#xff0c;因此这种 IPC 机制无需内核介入。所有需要做的就是让一个进程将数 据复制进共享内存中&#xff…

力扣精选算法100题——串联所有单词的字串(滑动窗口专题)

本题链接——串联所有单词的字串 本题和找到字符串中所有字母异位词题目非常相似&#xff0c;思路都是一样。通过自己的大脑能发现其中的相似之处。 第一步&#xff1a;了解题意 就按实例来分析吧&#xff0c;这样更通俗易懂。 words["ab","cd","ef…

mysql从库重新搭建的流程

背景 生产环境上的主从集群&#xff0c;因为一些异常原因&#xff0c;导致主从同步失败。现记录下通过重做mysql从库的方式来解决&#xff0c;重做过程不影响主库。 步骤 1、在主库上的操作步骤 备份主库所有数据&#xff0c;并将dump.sql文件拷贝到从库/tmp目录 mysqldump …

Flutter 综述

Flutter 综述 1 介绍1.1 概述1.2 重要节点1.3 移动开发中三种跨平台框架技术对比1.4 flutter 技术栈1.5 IDE1.6 Dart 语言1.7 应用1.8 框架 2 Flutter的主要组成部分3 资料书籍 《Flutter实战第二版》Dart 语言官网Flutter中文开发者社区flutter 官网 4 搭建Flutter开发环境参考…

vue3-模版引用

模版引用 ref 属性 场景&#xff1a;需要直接访问底层 DOM 元素。 方法&#xff1a;使用特殊的 ref 属性。 <input ref"input">ref 属性 允许我们在一个特定的 DOM 元素或子组件实例被挂载后&#xff0c;获得对它的直接引用。 访问模板引用 小 Demo: 当 i…

Go 知识iota

Go 知识iota 1. 介绍2. 特性3. 原理4. 你真的理解了吗 1. 介绍 iota 是一个预定义的标识符&#xff0c;用于声明枚举常量。它在 const 声明中使用&#xff0c;表示连续的未类型化整数。其值从0开始&#xff0c;const声明块每增加一行&#xff0c;iota的值就会自增1&#xff0c…

优化您的服务请求,增强用户体验和服务交付

您的服务请求模板是否像一个复杂的迷宫&#xff0c;给您的团队带来延误和困惑&#xff1f;您的技术人员是否厌倦了为了解最终用户的需求而与他们来回奔波&#xff1f;强大且可定制的请求模板可能正是您所需要的&#xff01; 服务交付团队&#xff08;尤其是 IT&#xff09;的…

javascript入门分享(附:javascript基础入门视频教程)

javascript入门分享&#xff08;附&#xff1a;javascript基础入门视频教程&#xff09; 一、javascript入门了解 JavaScript&#xff08;简称“JS”&#xff09;是一种具有函数优先的轻量级&#xff0c;解释型或即时编译型的编程语言。 虽然它是作为开发Web页面的脚本语言而出…