关于ASP.NET Core WebSocket实现集群的思考

前言

    提到WebSocket相信大家都听说过,它的初衷是为了解决客户端浏览器与服务端进行双向通信,是在单个TCP连接上进行全双工通讯的协议。在没有WebSocket之前只能通过浏览器到服务端的请求应答模式比如轮询,来实现服务端的变更响应到客户端,现在服务端也可以主动发送数据到客户端浏览器。WebSocket协议和Http协议平行,都属于TCP/IP四层模型中的第四层应用层。由于WebSocket握手阶段采用HTTP协议,所以也需要进行跨域处理。它的协议标识是wswss对应了常规标识和安全通信协议标识。本文重点并不是介绍WebSocket协议相关,而是提供一种基于ASP.NET Core原生WebSocket的方式实现集群的实现思路。关于这套思路其实很早之前我就构思过了,只是之前一直没有系统的整理出来,本篇文章就来和大家分享一下,由于主要是提供一种思路,所以涉及到具体细节或者业务相关的可能没有体现出来,还望大家理解。

实现

咱们的重点关键字就是两个WebSocket集群,实现的框架便是基于ASP.NET Core,我也基于golang实现了一套,本文涉及到的相关源码和golang版本的实现都已上传至我的github,具体仓库地址可以转到文末自行跳转到#示例源码中查看。既然涉及到集群,这里咱们就用nginx作为反向代理,来搭建一个集群实例。大致的示例结构如下图所示`

redis在这里扮演的角色呢,是用来处理Server端的消息相互传递用的,主要是使用的redis的pub/sub`功能来实现的,这里便涉及到几个核心问题

  • 首先,集群状态每个用户被分发到具体的哪台服务器上是不得而知的
  • 其次,处在不同Server端的不同用户间的相互通信是需要一个传递媒介
  • 最后,针对不同的场景比如单发消息、分组消息、全部通知等要有不同的处理策略

这里需要考虑的是,如果需要搭建实时通信服务器的话,需要注意集群的隔离性,主要是和核心业务进行隔离,毕竟WebSocket需要保持长链接、且消息的大小需要评估。

上面提到了redis的主要功能就是用来传递消息用的,毕竟每个server服务器是无状态的。这当然不是必须的,任何可以进行消息分发的中间件都可以,比如消息队列rabbitmq、kafka、rocketmq、mqtt等,甚至只要能把要处理的消息存储起来都可以比如缓存甚至是关系型数据库等等。这压力使用redis主要是因为操作起来简单、轻量级、灵活,让大家关注点在思路上,而不是使用中案件的代码上。

nginx配置

通过上面的图我们可以看到,我们这里构建集群示例使用的nginx,如果让nginx支持WebSocket的话,需要额外的配置,这个在网上有很多相关的文章介绍,这里就来列一下咱们示例的nginx配置,在配置文件nginx.conf

//上游服务器地址也就是websocket服务的真实地址
upstream wsbackend {server 127.0.0.1:5001;server 127.0.0.1:5678;
}server {listen       5000;server_name  localhost;location ~/chat/{//upstream地址proxy_pass http://wsbackend;proxy_connect_timeout 60s; proxy_read_timeout 3600s;proxy_send_timeout 3600s;//记得转发避免踩坑proxy_set_header Host $host;proxy_http_version 1.1; //http升级成websocket协议的头标识proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "Upgrade";}
}

这套配置呢,在搜索引擎上能收到很多,不过不妨碍我把使用的粘贴出来。这一套亲测有效,也是我使用的配置,请放心使用。个人认为如果是线上环境采用的负载均衡策略可以选择ip_hash的方式,保证同一个ip的客户端用户可以分发到一台WebSocket实例中去,这样的话能尽量避免使用redis的用户频道做消息传递。好了,接下来准备开始展示具体实现的代码了。

一对一发送

首先介绍的就是一对一发送的情况,也就是我把消息发给你,聊天的时候私聊的情况。这里呢涉及到两种情况

  • 如果你需要通信的客户端和你连接在一个Server端里,这样的话可以直接在链接里找到这个端的通信实例直接发送。
  • 如果你需要通信的客户端和你不在一个Server端里,这个时候咱们就需要借助redis的pub/sub的功能,把消息传递给另一个Server端。

咱们通过一张图大致的展示一下它的工作方式

解释一下,每个客户端注册到WebSocket服务里的时候会在redis里订阅一个user:用户唯一标识的频道,这个频道用于接收和当前WebSocket连接不在一个服务端的其他WebSocket发送过来的消息。每次发送消息的时候你会知道你要发送给谁,不在当前服务器的话则发送到redis的user:用户唯一标识频道,这样的话目标WebSocket就能收到消息了。首先是注入相关的依赖项,这里我使用的redis客户端是freeredis,主要是因为操作起来简单,具体实现代码如下

var builder = WebApplication.CreateBuilder(args);
//注册freeredis
builder.Services.AddSingleton(provider => {var logger = provider.GetService<ILogger<WebSocketChannelHandler>>();RedisClient cli = new RedisClient("127.0.0.1:6379");cli.Notice += (s, e) => logger?.LogInformation(e.Log);return cli;
});
//注册WebSocket具体操作的类
builder.Services.AddSingleton<WebSocketHandler>();
builder.Services.AddControllers();var app = builder.Build();var webSocketOptions = new WebSocketOptions
{KeepAliveInterval = TimeSpan.FromMinutes(2)
};
//注册WebSocket中间件
app.UseWebSockets(webSocketOptions);app.MapGet("/", () => "Hello World!");
app.MapControllers();app.Run();

接下来我们定义一个Controller用来处理WebSocket请求

public class WebSocketController : ControllerBase
{private readonly ILogger<WebSocketController> _logger;private readonly WebSocketHandler _socketHandler;public WebSocketController(ILogger<WebSocketController> logger, WebSocketHandler socketHandler, WebSocketChannelHandler webSocketChannelHandler){_logger = logger;_socketHandler = socketHandler;}//这里的id代表当前连接的客户端唯一标识比如用户唯一标识[HttpGet("/chat/user/{id}")]public async Task ChatUser(string id){//判断是否是WebSocket请求if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($"user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();//处理请求相关await _socketHandler.Handle(id, webSocket);}else{HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}}
}

这里的WebSocketHandler是用来处理具体逻辑用的,咱们看一下相关代码

public class WebSocketHandler:IDisposable
{//存储当前服务用户的集合private readonly UserConnection UserConnection = new();//redis频道前缀private readonly string userPrefix = "user:";//用户对应的redis频道private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();private readonly ILogger<WebSocketHandler> _logger;//redis客户端private readonly RedisClient _redisClient;public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient){_logger = logger;_redisClient = redisClient;}public async Task Handle(string id, WebSocket webSocket){//把当前用户连接存储起来_ = UserConnection.GetOrAdd(id, webSocket);//订阅一个当前用户的频道await SubMsg($"{userPrefix}{id}");var buffer = new byte[1024 * 4];//接收发送过来的消息,这个方法是阻塞的,如果没收到消息则一直阻塞var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);//循环接收消息while (webSocket.State == WebSocketState.Open){try{//因为缓冲区长度是固定的所以要获取实际长度string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');//接收的到消息转换成实体MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(msg);//发送到其他客户端的数据byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}");_logger.LogInformation($"user {id} send:{msgBody.Msg}");//判断目标客户端是否在当前当前服务,如果在当前服务直接扎到目标连接直接发送if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);}}else{//如果要发送的目标端不在当前服务,则发送给目标redis端的频道ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg };//目标的redis频道_redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody));}//继续阻塞循环接收消息receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}//循环结束意味着当前端已经退出//从当前用户的集合移除当前用户_ = UserConnection.TryRemove(id, out _);//关闭当前WebSocket连接await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);//在当前订阅集合移除当前用户_disposables.TryRemove($"{userPrefix}{id}", out var disposable);//关闭当前用户的通道disposable.Dispose();}private async Task SubMsg(string channel){//订阅当前用户频道var sub = _redisClient.Subscribe(channel,  async (channel, data) => {//接收过来当前频道数据,说明发送端不在当前服务ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}");//在当前服务找到目标的WebSocket连接并发送消息if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}}});//把redis订阅频道添加到集合中_disposables.TryAdd(channel, sub);}//程序退出的时候取消当前服务订阅的redis频道public void Dispose(){foreach (var disposable in _disposables){disposable.Value.Dispose();}_disposables.Clear();}
}

这里涉及到几个辅助相关的类,其中UserConnection类是存储注册到当前服务的连接,MsgBody类用来接受客户端发送过来的消息,ChannelMsgBody是用来发送redis频道的相关消息,因为要把相关消息通过redis发布出去,咱们列一下这几个类的相关代码

//注册到当前服务的连接
public class UserConnection : IEnumerable<KeyValuePair<string, WebSocket>>
{//存储用户唯一标识和WebSocket的对应关系private ConcurrentDictionary<string, WebSocket> _users = new ConcurrentDictionary<string, WebSocket>();//当前服务的用户数量public int Count => _users.Count;public WebSocket GetOrAdd(string userId, WebSocket webSocket){return _users.GetOrAdd(userId, webSocket);}public bool TryGetValue(string userId, out WebSocket webSocket){return _users.TryGetValue(userId, out webSocket);}public bool TryRemove(string userId, out WebSocket webSocket){return _users.TryRemove(userId, out webSocket);}public void Clear(){_users.Clear();}public IEnumerator<KeyValuePair<string, WebSocket>> GetEnumerator(){return _users.GetEnumerator();}IEnumerator IEnumerable.GetEnumerator(){return this.GetEnumerator();}
}//客户端消息
public class MsgBody
{//目标用户标识public string Id { get; set; }//要发送的消息public string Msg { get; set; }
}//频道订阅消息
public class ChannelMsgBody
{//用户标识public string FromId { get; set; }//目标用户标识,也就是要发送给谁public string ToId { get; set; }//要发送的消息public string Msg { get; set; }
}

这样的话关于一对一发送消息的相关逻辑就实现完成了,启动两个Server端,由于nginx默认的负载均衡策略是轮询,所以注册两个用户的话会被分发到不同的服务里去

Postman连接三个连接唯一标识分别是1、2、3,模拟一下消息发送,效果如下,发送效果

接收效果

群组发送

上面我们展示了一对一发送的情况,接下来我们来看一下,群组发送的情况。群组发送的话就是只要大家都加入一个群组,只要客户端在群组里发送一条消息,则注册到当前群组内的所有客户端都可以收到消息。相对于一对一的情况就是如果当前WebSocket服务端如果存在用户加入某个群组,则当前当前WebSocket服务端则可以订阅一个group:群组唯一标识的redis频道,集群中的其他WebSocket服务器通过这个redis频道接收群组消息,通过一张图描述一下
群组的实现方式相对于一对一要简单一点

  • 发送端可以不用考虑当前服务中的客户端连接,一股脑的交给redis把消息发布出去
  • 如果有WebSocket服务中的用户订阅了当前分组则可以接受消息,获取组内的用户循环发送消息

展示一下代码实现的方式,首先是定义一个action用于表示群组的相关场景

//包含两个标识一个是组别标识一个是注册到组别的用户
[HttpGet("/chat/group/{groupId}/{userId}")]
public async Task ChatGroup(string groupId, string userId)
{if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($"group:{groupId} user:{userId}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();//调用HandleGroup处理群组相关的消息await _socketHandler.HandleGroup(groupId, userId, webSocket);}else{HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}
}

接下来看一下HandleGroup的相关逻辑,还是在WebSocketHandler类中,看一下代码实现

public class WebSocketHandler:IDisposable
{private readonly UserConnection UserConnection = new();private readonly GroupUser GroupUser = new();private readonly SemaphoreSlim _lock = new(1, 1);private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();private readonly string groupPrefix = "group:";private readonly ILogger<WebSocketHandler> _logger;private readonly RedisClient _redisClient;public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient){_logger = logger;_redisClient = redisClient;}public async Task HandleGroup(string groupId, string userId, WebSocket webSocket){//因为群组的集合可能会存在很多用户一起访问所以限制访问数量await _lock.WaitAsync();//初始化群组容器 群唯一标识为key 群员容器为valuevar currentGroup = GroupUser.Groups.GetOrAdd(groupId, new UserConnection { });//当前用户加入当前群组_ = currentGroup.GetOrAdd(userId, webSocket);//只有有当前WebSocket服务的第一个加入当前组的时候才去订阅群组频道//如果不限制的话则会出现如果当前WebSocket服务有多个用户在一个组内则会重复收到redis消息if (currentGroup.Count == 1){//订阅redis频道await SubGroupMsg($"{groupPrefix}{groupId}");}_lock.Release();var buffer = new byte[1024 * 4];//阻塞接收WebSocket消息var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);//服务不退出的话则一直等待接收while (webSocket.State == WebSocketState.Open){try{string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');_logger.LogInformation($"group 【{groupId}】 user 【{userId}】 send:{msg}");//组装redis频道发布的消息,目标为群组标识ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = userId, ToId = groupId, Msg = msg };//通过redis发布消息_redisClient.Publish($"{groupPrefix}{groupId}", JsonConvert.SerializeObject(channelMsgBody));receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}//如果客户端退出则在当前群组集合删除当前用户_ = currentGroup.TryRemove(userId, out _);await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);}private async Task SubGroupMsg(string channel){var sub = _redisClient.Subscribe(channel, async (channel, data) => {ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg}");//在当前WebSocket服务器找到当前群组里的用户GroupUser.Groups.TryGetValue(msgBody.ToId, out var currentGroup);//循环当前WebSocket服务器里的用户发送消息foreach (var user in currentGroup){//不用给自己发送了if (user.Key == msgBody.FromId){continue;}if (user.Value.State == WebSocketState.Open){await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}}});//把当前频道加入订阅集合_disposables.TryAdd(channel, sub);}
}

这里涉及到了GroupUser类,是来存储群组和群组用户的对应关系的,定义如下

public class GroupUser
{//key为群组的唯一标识public ConcurrentDictionary<string, UserConnection> Groups = new ConcurrentDictionary<string, UserConnection>();
}

演示一下把两个用户添加到一个群组内,然后发送接收消息的场景,用户u1发送

用户u2接收

发送所有人

发送给所有用户的逻辑比较简单,不用考虑到用户限制,只要用户连接到了WebSocket集群则都可以接收到这个消息,大致工作方式如下图所示

这个比较简单,咱们直接看实现代码,首先是定义一个地址,用于发布消息

//把用户注册进去
[HttpGet("/chat/all/{id}")]
public async Task ChatAll(string id)
{if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($"all user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();await _socketHandler.HandleAll(id, webSocket);}else{HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}
}

具体的实现逻辑还是在HandleGroup类里,是HandleAll方法,看一下具体实现

public class WebSocketHandler:IDisposable
{private readonly UserConnection AllConnection = new();private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();private readonly string all = "all";private readonly ILogger<WebSocketHandler> _logger;private readonly RedisClient _redisClient;public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient){_logger = logger;_redisClient = redisClient;}public async Task HandleAll(string id, WebSocket webSocket){await _lock.WaitAsync();//把用户加入用户集合_ = AllConnection.GetOrAdd(id, webSocket);//WebSocket集群中的每个服务只定义一次if (AllConnection.Count == 1){await SubAllMsg(all);}_lock.Release();var buffer = new byte[1024 * 4];//阻塞接收信息var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);while (webSocket.State == WebSocketState.Open){try{string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');_logger.LogInformation($"user {id} send:{msg}");//获取接收信息ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, Msg = msg };//把消息通过redis发布到集群中的其他服务_redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody));receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}//用户退出则删除集合中的当前用户信息_ = AllConnection.TryRemove(id, out _);await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);}private async Task SubAllMsg(string channel){var sub = _redisClient.Subscribe(channel, async (channel, data) => {ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{msgBody.FromId}】 send all:{msgBody.Msg}");//接收到消息后遍历用户集合把消息发送给所有用户foreach (var user in AllConnection){   //如果包含当前用户跳过if (user.Key == msgBody.FromId){continue;}if (user.Value.State == WebSocketState.Open){await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}}});_disposables.TryAdd(channel, sub);}
}

效果在这里就不展示了,和群组的效果是类似的,只是一个是部分用户,一个是全部的用户。

整合到一起

上面我们分别展示了一对一、群组、所有人的场景,但是实际使用的时候,每个用户只需要注册到WebSocket集群一次也就是保持一个连接即可,而不是一对一一个连接、注册群组一个连接、所有消息的时候一个连接。所以我们需要把上面的演示整合一下,一个用户只需要连接到WebSocket集群一次即可,至于发送给谁,加入什么群组,接收全部消息等都是连接后通过一些标识区分的,而不必每个类型的操作都注册一次,就和微信和QQ一样我只要登录了即可,至于其他操作都是靠数据标识区分的。接下来咱们就整合一下代码达到这个效果,大致的思路是

  • 用户连接到WebSocket集群,把用户和连接保存到当前WebSocket服务器的用户集合中去。
  • 一对一发送的时候,只需要在具体的服务器中找到具体的客户端发送消息
  • 群组的时候,先把当前用户标识加入群组集合即可,接收消息的时候根据群组集合里的用户标识去用户集合里去拿具体的WebSocket连接发送消息
  • 全员消息的时候,直接遍历集群中的每个WebSocket服务里的用户集合里的WebSocket连接训话发送消息

这样的话就保证了每个客户端用户在集群中只会绑定一个连接,首先还是单独定义一个action,用于让客户端用户连接上来,具体实现代码如下所示

public class WebSocketChannelController : ControllerBase
{private readonly ILogger<WebSocketController> _logger;private readonly WebSocketChannelHandler _webSocketChannelHandler;public WebSocketChannelController(ILogger<WebSocketController> logger, WebSocketChannelHandler webSocketChannelHandler){_logger = logger;_webSocketChannelHandler = webSocketChannelHandler;}//只需要把当前用户连接到服务即可[HttpGet("/chat/channel/{id}")]public async Task Channel(string id){if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($"user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();await _webSocketChannelHandler.HandleChannel(id, webSocket);}else{HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}}
}

接下来看一下WebSocketChannelHandler类的HandleChannel方法实现,用于处理不同的消息,比如一对一、群组、全员消息等不同类型的消息

public class WebSocketChannelHandler : IDisposable
{//用于存储当前WebSocket服务器链接上来的所有用户对应关系private readonly UserConnection UserConnection = new();//用于存储群组和用户关系,用户集合采用HashSet保证每个用户只加入一个群组一次private readonly ConcurrentDictionary<string, HashSet<string>> GroupUser = new ConcurrentDictionary<string, HashSet<string>>();private readonly SemaphoreSlim _lock = new(1, 1);//存放redis订阅实例private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();//一对一redis频道前缀private readonly string userPrefix = "user:";//群组redis频道前缀private readonly string groupPrefix = "group:";//全员redis频道private readonly string all = "all";private readonly ILogger<WebSocketHandler> _logger;private readonly RedisClient _redisClient;public WebSocketChannelHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient){_logger = logger;_redisClient = redisClient;}public async Task HandleChannel(string id, WebSocket webSocket){await _lock.WaitAsync();//每次连接进来就添加到用户集合_ = UserConnection.GetOrAdd(id, webSocket);//每个WebSocket服务实例只需要订阅一次全员消息频道await SubMsg($"{userPrefix}{id}");if (UserConnection.Count == 1){await SubAllMsg(all);}_lock.Release();var buffer = new byte[1024 * 4];//接收客户端消息var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);while (webSocket.State == WebSocketState.Open){try{string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');//读取客户端消息ChannelData channelData = JsonConvert.DeserializeObject<ChannelData>(msg);//判断消息类型switch (channelData.Method){//一对一case "One":await HandleOne(id, channelData.MsgBody, receiveResult);break;//把用户加入群组case "UserGroup":await AddUserGroup(id, channelData.Group, webSocket);break;//处理群组消息case "Group":await HandleGroup(channelData.Group, id, webSocket, channelData.MsgBody);break;//处理全员消息default:await HandleAll(id, channelData.MsgBody);break;}receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);//在群组中移除当前用户foreach (var users in GroupUser.Values){lock (users){users.Remove(id);}}//当前客户端用户退出则移除连接_ = UserConnection.TryRemove(id, out _);//取消用户频道订阅_disposables.Remove($"{userPrefix}{id}", out var sub);sub?.Dispose();}public void Dispose(){foreach (var disposable in _disposables){disposable.Value.Dispose();}_disposables.Clear();}
}

这里涉及到了ChannelData类是用于接收客户端消息的类模板,具体定义如下

public class ChannelData
{//消息类型 比如一对一 群组 全员public string Method { get; set; }//群组标识public string Group { get; set; }//消息体public object MsgBody { get; set; }
}

类中并不会包含当前用户信息,因为连接到当前服务的时候已经提供了客户端唯一标识。结合上面的处理代码我们可以看出,客户端用户连接到WebSocket实例之后,先注册当前用户的redis订阅频道并且当前实例仅注册一次全员消息的redis频道,用于处理非当前实例注册客户端的一对一消息处理和全员消息处理,然后等待接收客户端消息,根据客户端消息的消息类型来判断是进行一对一、群组、或者全员的消息类型处理,它的工作流程入下图所示
由代码和上面的流程图可知,它根据不同的标识去处理不同类型的消息,接下来我们可以看下每种消息类型的处理方式。

一对一处理

首先是一对一的消息处理情况,看一下具体的处理逻辑,首先是一对一发布消息

 private async Task HandleOne(string id, object msg, WebSocketReceiveResult receiveResult){MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(JsonConvert.SerializeObject(msg));byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}");_logger.LogInformation($"user {id} send:{msgBody.Msg}");//判断目标用户是否在当前WebSocket服务器if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);}}else{//如果不在当前服务器,则直接把消息发布到具体的用户频道去,由具体用户去订阅ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg };_redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody));}
}

接下来是用于处理订阅其他用户发送过来消息的逻辑,这个和整合之前的逻辑是一致的,在当前服务器中找到用户对应的连接,发送消息

private async Task SubMsg(string channel)
{var sub = _redisClient.Subscribe(channel, async (channel, data) =>{ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}");if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}else{_ = UserConnection.TryRemove(msgBody.FromId, out _);}}});//把订阅实例加入集合_disposables.TryAdd(channel, sub);
}

如果给某个用户发送消息则可以使用如下的消息格式

{"Method":"One", "MsgBody":{"Id":"2","Msg":"Hello"}}

Method为One代表着是私聊一对一的情况,消息体内Id为要发送给的具体用户标识和消息体。

群组处理

接下来看群组处理方式,这个和之前的逻辑是有出入的,首先是用户要先加入到某个群组然后才能接收群组消息或者在群组中发送消息,之前是一个用户对应多个连接,整合了之后集群中每个用户只关联唯一的一个WebSocket连接,首先看用户加入群组的逻辑

private async Task AddUserGroup(string user, string group, WebSocket webSocket)
{//获取群组信息var currentGroup = GroupUser.GetOrAdd(group, new HashSet<string>());lock (currentGroup){//把用户标识加入当前组_ = currentGroup.Add(user);}//每个组的redis频道,在每台WebSocket服务器实例只注册一次订阅if (currentGroup.Count == 1){//订阅当前组消息await SubGroupMsg($"{groupPrefix}{group}");}string addMsg = $"user 【{user}】 add  to group 【{group}】";byte[] sendByte = Encoding.UTF8.GetBytes(addMsg);await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);//如果有用户加入群组,则通知其他群成员ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = user, ToId = group, Msg = addMsg };_redisClient.Publish($"{groupPrefix}{group}", JsonConvert.SerializeObject(channelMsgBody));
}

用户想要在群组内发消息,则必须先加入到一个具体的群组内,具体的加入群组的格式如下

{"Method":"UserGroup", "Group":"g1"}

Method为UserGroup代表着用户加入群组的业务类型,Group代表着你要加入的群组唯一标识。接下来就看下,用户发送群组消息的逻辑了

private async Task HandleGroup(string groupId, string userId, WebSocket webSocket, object msgBody)
{//判断群组是否存在var hasValue = GroupUser.TryGetValue(groupId, out var users);if (!hasValue){byte[] sendByte = Encoding.UTF8.GetBytes($"group【{groupId}】 not exists");await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);return;}//只有加入到当前群组,才能在群组内发送消息if (!users.Contains(userId)){byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{userId}】 not in 【{groupId}】");await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);return;}_logger.LogInformation($"group 【{groupId}】 user 【{userId}】 send:{msgBody}");//发送群组消息ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = userId, ToId = groupId, Msg = msgBody.ToString() };_redisClient.Publish($"{groupPrefix}{groupId}", JsonConvert.SerializeObject(channelMsgBody));
}

加入群组之后则可以发送和接收群组内的消息了,给群组发送消息的格式如下

{"Method":"Group", "Group":"g1", "MsgBody":"Hi All"}

Method为Group代表着用户加入群组的业务类型,Group则代表你要发送到具体的群组的唯一标识,MsgBody则是发送到群组内的消息。最后再来看下订阅群组内消息的情况,也就是处理群组消息的逻辑

private async Task SubGroupMsg(string channel)
{var sub = _redisClient.Subscribe(channel, async (channel, data) =>{//接收群组订阅消息ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg}");//获取当前服务器实例中当前群组的所有用户连接GroupUser.TryGetValue(msgBody.ToId, out var currentGroup);foreach (var user in currentGroup){if (user == msgBody.FromId){continue;}//通过群组内的用户标识去用户集合获取用户集合里的用户唯一连接发送消息if (UserConnection.TryGetValue(user, out var targetSocket) && targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}else{currentGroup.Remove(user);}}});_disposables.TryAdd(channel, sub);
}
全员消息处理

全员消息处理相对来说思路比较简单,因为当服务启动的时候就会监听redis的全员消息频道,这样的话具体的实现也就只包含发送和接收全员消息了,首先看一下全员消息发送的逻辑

private async Task HandleAll(string id, object msgBody)
{_logger.LogInformation($"user {id} send:{msgBody}");//直接给redis的全员频道发送消息ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, Msg = msgBody.ToString() };_redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody));
}

全员消息的发送数据格式如下所示

{"Method":"All", "MsgBody":"Hello All"}

Method为All代表着全员消息类型,MsgBody则代表着具体消息。接收消息出里同样很简单,订阅redis全员消息频道,然后遍历当前WebSocket服务器实例内的所有用户获取连接发送消息,具体逻辑如下

private async Task SubAllMsg(string channel)
{var sub = _redisClient.Subscribe(channel, async (channel, data) =>{ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{msgBody.FromId}】 send all:{msgBody.Msg}");//获取当前服务器实例内所有用户的连接foreach (var user in UserConnection){//不给自己发送消息,因为发送的时候可以通过具体的业务代码处理if (user.Key == msgBody.FromId){continue;}//给每个用户发送消息if (user.Value.State == WebSocketState.Open){await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}else{_ = UserConnection.TryRemove(user.Key, out _);}}});_disposables.TryAdd(channel, sub);
}

示例源码

由于篇幅有限,没办法设计到全部的相关源码,因此在这里贴出来github相关的地址,方便大家查看和运行源码。相关的源码我这里实现了两个版本,一个是基于asp.net core的版本,一个是基于golang的版本。两份源码的实现思路是一致的,所以这两份代码可以运行在一套集群示例里,配置在一套nginx里,并且连接到同一个redis实例里即可

  • asp.net core源码示例 https://github.com/softlgl/WebsocketCluster
  • golang源码示例 https://github.com/softlgl/websocket-cluster

仓库里还涉及到本人闲暇之余开源的其他仓库,由于本人能力有限难登大雅之堂,就不做广告了,有兴趣的同学可以自行浏览一下。

总结

    本文基于ASP.NET Core框架提供了一个基于WebSocket做集群的示例,由于思想是通用的,所以基于这个思路楼主也实现了golang版本。其实在之前就想自己动手搞一搞关于WebSocket集群方面的设计,本篇文章算是对之前想法的一个落地操作。其核心思路文章已经做了相关介绍,由于这些只是博主关于构思的实现,可能有很多细节尚未体现到,还希望大家多多理解。其核心思路总结一下

  • 首先是,利用可以构建WebSocket服务的框架,在当前服务实例中保存当前客户端用户和WebSocket的连接关系
  • 如果消息的目标客户端不在当前服务器,可以利用redis频道、消息队列相关、甚至是数据库类的共享回话发送的消息,由目标服务器获取目标是否属于自己的ws会话
  • 本文设计的思路使用的是无状态的方式,即WebSocket服务实例之间不存在直接的消息通信和相互的服务地址存储,当然也可以利用redis等存储在线用户信息等,这个可以参考具体业务自行设计

读万卷书,行万里路。在这个时刻都在变化点的环境里,唯有不断的进化自己,多接触多尝试不用的事物,多扩展自己的认知思维,方能构建自己的底层逻辑。毕竟越底层越抽象,越通用越抽象。面对未知的挑战,自身作为自己坚强的后盾,可能才会让自己更踏实。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/617004.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Java探索之旅】方法重载 递归

&#x1f3a5; 屿小夏 &#xff1a; 个人主页 &#x1f525;个人专栏 &#xff1a; Java编程秘籍 &#x1f304; 莫道桑榆晚&#xff0c;为霞尚满天&#xff01; 文章目录 &#x1f4d1;前言一、方法重载1.1 为什么要有方法重载1.2 方法重载的概念与使用1.3 方法签名 二、递归2…

全国各省环境规制强度数据(2004-2022年)

01、数据简介 以保护环境为目的&#xff0c;对各种环境污染行为进行规制&#xff0c;政府相关政策规制&#xff0c;是社会性规制的重要内容&#xff0c;包含大气、水、废弃物、噪声污染等外部行为&#xff0c;对这些行为进行规制就是要将整个社会为其承担的成本转化为其自身承…

最新视频理解大模型之MiniGPT4-video

前言 随着大模型的爆火&#xff0c;多模态大模型也随之卷了起来&#xff0c;基本每隔一小段时间就会冒出一个新模型。 今天给大家带来一个最新发现的关于视频理解的多模态大模型。 它的名字是MiniGPT4-video&#xff0c;可以看的出来其是MiniGPT4的一个分支&#xff1b;Mini…

编曲知识18:EQ均衡器 齿音处理 呼吸音处理 口水音处理

EQ均衡器 齿音处理 呼吸音处理 口水音处理小鹅通-专注内容付费的技术服务商https://app8epdhy0u9502.pc.xiaoe-tech.com/live_pc/l_66151c90e4b092c1187ac699?course_id=course_2XLKtQnQx9GrQHac7OPmHD9tqbv 均衡器 均衡器 Equalizer(简称EQ) 人耳接受频率:20hz—20khz …

几种常见系统的下载地址,centos、debian、ubuntu、docker

以下是几种常见系统的安装及下载地址&#xff1a; CentOS&#xff1a; 安装&#xff1a;CentOS的安装通常涉及从官方网站下载ISO镜像文件&#xff0c;并使用这些镜像文件创建可启动的USB驱动器或CD/DVD来安装。在安装过程中&#xff0c;您需要选择所需的软件包和配置系统设置…

Qt控件---布局管理类

文章目录 QVBoxLayout&#xff08;垂直布局&#xff09;QHBoxLayout&#xff08;水平布局&#xff09;QGridLayout&#xff08;网格布局&#xff09;拉伸 QFormLayout&#xff08;表单布局&#xff09;QSpacerItem&#xff08;空白&#xff09; QVBoxLayout&#xff08;垂直布局…

【算法】分治-快排

个人主页 &#xff1a; zxctscl 如有转载请先通知 题目 前言1. 75. 颜色分类1.1 分析1.2 代码 2. 912. 排序数组2.1 分析2.2 代码 3. 215. 数组中的第K个最大元素3.1 分析3.2 代码 4. LCR 159. 库存管理 III4.1 分析4.2 代码 前言 分治就是分而治之 1. 75. 颜色分类 1.1 分析…

MySQL Innodb 中的排它锁、共享锁、意向锁、记录锁、间隙锁、临键锁、死锁讲解

一、MySQL 锁机制 MySQL作为流行的关系型数据库管理系统之一&#xff0c;在处理并发访问时&#xff0c;锁起着至关重要的作用。锁的使用可以确保数据的完整性&#xff0c;同时也是实现并发操作的必备工具。在MySQL Innodb 引擎中锁可以理解为两个方向的东西&#xff0c;一个是…

MES生产管理系统:私有云、公有云与本地化部署的比较分析

随着信息技术的迅猛发展&#xff0c;云计算作为一种新兴的技术服务模式&#xff0c;已经深入渗透到企业的日常运营中。在众多部署方式中&#xff0c;私有云、公有云和本地化部署是三种最为常见的选择。它们各自具有独特的特点和适用场景&#xff0c;并在不同程度上影响着企业的…

lanqiao.602 迷宫

题目&#xff1a; 代码&#xff1a; #include<iostream> #include<cstring> #include<algorithm> #include<queue> using namespace std; char mp[31][51]; //稍微开大一点 char k[4]{D,L,R,U}; //按字典序记录路径 int dirx[]{1,0,0,-1},d…

【ARM 裸机】汇编 led 驱动之原理分析

1、我们为什么要学习汇编&#xff1f;&#xff1f;&#xff1f; 之前我们或许接触过 STM32 以及其他的 32 位的 MCU ,都是基于 C 语言环境进行编程的&#xff0c;都没怎么注意汇编&#xff0c;是因为 ST 公司早已将启动文件写好了&#xff0c;新建一个 STM32 工程的时候&#…

transformer上手(3) —— 开箱即用的 pipelines

1 开箱即用的 pipelines Transformers 库将目前的 NLP 任务归纳为几下几类&#xff1a; 文本分类&#xff1a;例如情感分析、句子对关系判断等&#xff1b;对文本中的词语进行分类&#xff1a;例如词性标注 (POS)、命名实体识别 (NER) 等&#xff1b;文本生成&#xff1a;例如…