spring boot 实现直播聊天室

spring boot 实现直播聊天室

技术方案:

  • spring boot
  • websocket
  • rabbitmq

使用 rabbitmq 提高系统吞吐量

引入依赖

<dependencies><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.42</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.23</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId></dependency>
</dependencies>

websocket 实现

MHttpSessionHandshakeInterceptor

参数拦截

/*** @Date: 2023/12/8 14:52* websocket 握手拦截* 1. 参数拦截(header或者 url 参数)* 2. token 校验*/
@Slf4j
public class MHttpSessionHandshakeInterceptor extends HttpSessionHandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {if (request instanceof ServletServerHttpRequest servletRequest){//ws://127.0.0.1:8080/group/2?username=xxxxHttpServletRequest httpServletRequest = servletRequest.getServletRequest();String requestURI = httpServletRequest.getRequestURI();String groupId = requestURI.substring(requestURI.lastIndexOf("/") + 1);String username = httpServletRequest.getParameter("username");log.info(">>>>>>> beforeHandshake groupId: {} - username: {}", groupId, username);attributes.put("username", username);//解析占位符attributes.put("groupId", groupId);}return super.beforeHandshake(request, response, wsHandler, attributes);}}
GroupWebSocketHandler

消息发送

@Slf4j
public class GroupWebSocketHandler implements WebSocketHandler {//Map<room,List<map<session,username>>>private ConcurrentHashMap<String, Queue<WebSocketSession>> sessionMap = new ConcurrentHashMap<>();@Autowiredprivate MessageClient messagingClient;@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {String username = (String) session.getAttributes().get("username");String groupId = (String) session.getAttributes().get("groupId");log.info("{} 用户上线房间 {}", username, groupId);TomcatWsSession wsSession = new TomcatWsSession(session.getId(),groupId, username, session);SessionRegistry.getInstance().addSession(wsSession);}@Overridepublic void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {String groupId = (String) session.getAttributes().get("groupId");String username = (String) session.getAttributes().get("username");if (message instanceof PingMessage){log.info("PING");return;}else if (message instanceof TextMessage textMessage) {MessageDto messageDto = new MessageDto();messageDto.setSessionId(session.getId());messageDto.setGroup(groupId);messageDto.setFromUser(username);messageDto.setContent(new String(textMessage.getPayload()));messagingClient.sendMessage(messageDto);}}@Overridepublic void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {String username = (String) session.getAttributes().get("username");String groupId = (String) session.getAttributes().get("groupId");log.info(">>> handleTransportError {} 用户上线房间 {}", username, groupId);}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {String username = (String) session.getAttributes().get("username");String groupId = (String) session.getAttributes().get("groupId");log.info("{} 用户下线房间 {}", username, groupId);TomcatWsSession wsSession = new TomcatWsSession(session.getId(),groupId, username, session);SessionRegistry.getInstance().removeSession(wsSession);}@Overridepublic boolean supportsPartialMessages() {return false;}}
WebSocketConfig

websocket 配置

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(myHandler(), "/group/{groupId}").addInterceptors(new MHttpSessionHandshakeInterceptor()).setAllowedOrigins("*");}@Beanpublic GroupWebSocketHandler myHandler() {return new GroupWebSocketHandler();}@Beanpublic ServletServerContainerFactoryBean createWebSocketContainer() {ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();container.setMaxTextMessageBufferSize(8192);  //文本消息最大缓存container.setMaxBinaryMessageBufferSize(8192);  //二进制消息大战缓存container.setMaxSessionIdleTimeout(3L * 60 * 1000); // 最大闲置时间,3分钟没动自动关闭连接container.setAsyncSendTimeout(10L * 1000); //异步发送超时时间return container;}}

session 管理

将 websocketSession进行抽像,websocketsession可以由不同容器实现

WsSession
public interface  WsSession {/*** session 组* @return*/String group();/*** session Id* @return*/String getId();/*** 用户名或其他唯一标识* @return*/String identity();/*** 发送文本消息* @param messageDto*/void sendTextMessage(MessageDto messageDto);
}public abstract class AbstractWsSession implements WsSession {private String id;private String group;private String identity;public AbstractWsSession(String id, String group, String identity) {this.id = id;this.group = group;this.identity = identity;}@Overridepublic String group() {return this.group;}@Overridepublic String getId() {return this.id;}@Overridepublic String identity() {return this.identity;}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;AbstractWsSession that = (AbstractWsSession) o;//简单比较 sessionIdreturn Objects.equals(id, that.id);}@Overridepublic int hashCode() {return Objects.hash(id, group, identity);}
}
TomcatWsSession

默认session实现

@Slf4j
public class TomcatWsSession extends AbstractWsSession {private WebSocketSession webSocketSession;public TomcatWsSession(String id, String group, String identity, WebSocketSession webSocketSession) {super(id, group, identity);this.webSocketSession = webSocketSession;}@Overridepublic void sendTextMessage(MessageDto messageDto) {String content = messageDto.getFromUser() + " say: " + messageDto.getContent();try {webSocketSession.sendMessage(new TextMessage(content));} catch (IOException e) {log.error("TomcatWsSession sendTextMessage error: identity:{}-group:{}-msg: {}",super.identity(), super.group(), JSON.toJSONString(messageDto));}}
}

SessionRegistry

websocket session管理

public class SessionRegistry {private static SessionRegistry instance;private SessionRegistry() {}public static SessionRegistry getInstance() {if (instance == null) {synchronized (SessionRegistry.class) {if (instance == null) {instance = new SessionRegistry();}}}return instance;}//Map<group,List<Session>>private ConcurrentHashMap<String, Queue<WsSession>> sessionMap = new ConcurrentHashMap<>();/*** 添加 session* @param wsSession*/public void addSession(WsSession wsSession) {sessionMap.computeIfAbsent(wsSession.group(),g -> new ConcurrentLinkedDeque<>()).add(wsSession);}/*** 移除 session* @param wsSession*/public void removeSession(WsSession wsSession) {Queue<WsSession> wsSessions = sessionMap.get(wsSession.group());if (!CollectionUtils.isEmpty(wsSessions)){//重写 WsSession equals 和 hashCode 方法,不然会移除失败wsSessions.remove(wsSession);if (CollectionUtils.isEmpty(wsSessions)){sessionMap.remove(wsSession.group());}}}/*** 发送消息* @param messageDto*/public void sendGroupTextMessage(MessageDto messageDto){Queue<WsSession> wsSessions = sessionMap.get(messageDto.getGroup());if (!CollectionUtils.isEmpty(wsSessions)){for (WsSession wsSession : wsSessions) {if (wsSession.getId().equals(messageDto.getSessionId())){continue;}wsSession.sendTextMessage(messageDto);}}}/*** session 在线统计* @param groupId* @return*/public Integer getSessionCount(String groupId) {if (StrUtil.isNotBlank(groupId)) {return sessionMap.get(groupId).size();}return sessionMap.values().stream().map(l -> l.size()).collect(Collectors.summingInt(a -> a));}
}

消息队列

这里使用 rabbitmq

MessageDto

消息体

@Data
public class MessageDto {/*** sessionId*/private String sessionId;/*** 组*/private String group;/*** 消息发送者*/private String fromUser;/*** 发送内容*/private String content;
}
MessageClient
@Component
@Slf4j
public class MessageClient {private String routeKey = "bws.key";private String exchange = "bws.exchange";@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(MessageDto messageDto) {try {rabbitTemplate.convertAndSend(exchange, routeKey, JSON.toJSONString(messageDto));} catch (AmqpException e) {log.error("MessageClient.sendMessage: {}", JSON.toJSONString(messageDto), e);}}
}
MessageListener
@Slf4j
@Component
public class MessageListener {@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "bws.exchange", type = "topic"), value =@Queue(value = "bws.queue", durable = "true"), key = "bws.key"))public void onMessage(Message message) {String messageStr = "";try {messageStr = new String(message.getBody(), StandardCharsets.UTF_8);log.info("<<<<<<<<< MessageListener.onMessage:{}", messageStr);MessageDto messageDto = JSON.parseObject(messageStr, MessageDto.class);if (!Objects.isNull(messageDto)) {SessionRegistry.getInstance().sendGroupTextMessage(messageDto);} else {log.info("<<<<<<<<< MessageListener.onMessage is null:{}", messageStr);}} catch (Exception e) {log.error("######### MessageListener.onMessage: {}-{}", messageStr, e);}}}

application.properties配置


spring.rabbitmq.host=192.168.x.x
spring.rabbitmq.password=guest
spring.rabbitmq.port=27067
spring.rabbitmq.username=guest
spring.rabbitmq.virtual-host=my-cluster

测试

websoket链接: ws://127.0.0.1:8080/group/2?username=xxx, websocket客户端测试地址

在这里插入图片描述

good luck!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/279108.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Guardrails for Amazon Bedrock 基于具体使用案例与负责任 AI 政策实现定制式安全保障(预览版)

作为负责任的人工智能&#xff08;AI&#xff09;战略的一部分&#xff0c;您现在可以使用 Guardrails for Amazon Bedrock&#xff08;预览版&#xff09;&#xff0c;实施专为您的用例和负责任的人工智能政策而定制的保障措施&#xff0c;以此促进用户与生成式人工智能应用程…

LT8711HE方案《任天堂Switch底座方案》

LT8711HE Type-c转HDMI方案 LT8711HE是高性能的Type-C/DP1.2转HDMI2.0转换器&#xff0c;设计用于连接 USB Type-C 源或 DP1.2 源到 HDMI2.0 接收器。该LT8711HE集成了符合 DP1.2 标准的接收器和符合 HDMI2.0 标准的发射器。此外&#xff0c;两个 CC 控制器是包括用于 CC 通信以…

sizeof和strlen的对比

1.首先对sizeof和strlen有初步的认识 sizeof 是操作符 计算操作数所占内存的大小&#xff0c;单位是字节 strlen 是库函数&#xff0c;使用是要包含头文件string。h 计算字符串长度&#xff0c;统计\0之前的字符个数&#xff08;不包括\0&#xff09; #include<stdio…

题目:区间或 (蓝桥OJ 3691)

题目描述: 解题思路: 本题采用位运算.先求出全部数组每一位各自的前缀和,然后再判断区间内每一位区间和是否为0,不为0则乘上相应的2^n并将各个为的2^n相加,得ans. 实现原理图 题解: #include<bits/stdc.h> using namespace std;const int N 1e5 9;int a[N], prefix[35…

C语言——高精度加法

我们知道long long int类型的数据的最大数量级大概是10 ^ 18&#xff0c;这个数量级已经和大了是吧&#xff0c;但是实际上还有更大的数&#xff0c;例如10 ^ 100。 如果写一个加法程序&#xff0c;输入两个整数a、b&#xff0c;a、b的范围都在10 ^ 17内&#xff0c;那样可以选…

揭秘光耦合器继电器:了解技术奇迹

光耦合器继电器是现代电子产品的关键部件&#xff0c;在确保电路安全和效率方面发挥着关键作用。了解它们的功能和意义对于工程师和爱好者理解它们的应用至关重要。本文旨在揭开光耦合器继电器技术方面的神秘面纱&#xff0c;深入了解其功能、应用以及在电子领域的重要性。 什…

AD20-Excel创建IC类元件库

目录 准备模板AD操作 准备模板 AD操作 结果生成如下&#xff1a; over&#xff01;&#xff01;&#xff01;

IntelliJ IDEA 自带HTTP Client接口插件上传文件示例

如何使用IntelliJ IDEA自带的HTTP Client接口插件进行文件上传的示例。在这个示例中&#xff0c;我们将关注Controller代码、HTTP请求文件&#xff08;xxx.http&#xff09;&#xff0c;以及文件的上传和处理。 Controller代码 首先&#xff0c;让我们看一下处理文件上传的Co…

FreeRtos里的几个中断屏蔽

1、primask 寄存器 PRIMASK用于禁止除NMI和HardFalut外的所有异常和中断&#xff0c;使用方法&#xff1a; cpsid i &#xff1b; //设置primask &#xff08;禁止中断&#xff09; cpsie i ; //清除primask (使能中断) 也可以 movs r0,#1 msr primask r0; //将 1写入p…

软件测试经典面试题(答案解析+视频讲解)

前言 &#xff08;第一个就刷掉一大批人&#xff09; 有很多“会自动化”的同学来咨询技术问题&#xff0c;他总会问到我一些元素定位的问题。元素定位其实都不算自动化面试的问题。 一般我都会问&#xff1a;你是定位不到吗&#xff1f;通常结果都是说确实定位不到。 做自…

ChatGPT在指尖跳舞: open-interpreter实现本地数据采集、处理一条龙

原文&#xff1a;ChatGPT在指尖跳舞: open-interpreter实现本地数据采集、处理一条龙 - 知乎 目录 收起 Part1 前言 Part2 Open - Interpreter 简介 Part3 安装与运行 Part4 工作场景 1获取网页内容 2 pdf 文件批量转换 3 excel 文件合并 Part5总结 参考资料 往期推…

【SpringBoot】之Mybatis=Plus集成及使用(入门级)

&#x1f389;&#x1f389;欢迎来到我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是君易--鑨&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f31f;推荐给大家我的博客专栏《SpringBoot开发之Mybatis-Plus系列》。&#x1…