Springboot项目使用原生Websocket

目录

  • 1.启用Websocket功能
  • 2.封装操作websocket session的工具
  • 3.保存websocket session的接口
  • 4.保存websocket session的类
  • 5.定义websocket 端点
  • 6.创建定时任务 ping websocket 客户端

1.启用Websocket功能

package com.xxx.robot.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configuration
@EnableWebSocket
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpoint() {return new ServerEndpointExporter();}}

2.封装操作websocket session的工具

package com.xxx.robot.websocket.util;import java.util.Map;import javax.websocket.Session;import org.apache.tomcat.websocket.Constants;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;import com.xxx.framework.security.config.MyUserDetails;
import com.xxx.framework.security.entity.LoginUser;
import com.xxx.user.entity.User;public final class WebSocketSessionUtils {private WebSocketSessionUtils() {}public static final int WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE = 8 * 1024 * 1024;public static final int WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE = 8 * 1024 * 1024;/*** websocket block 发送超时 毫秒*/public static final long WEBSOCKET_BLOCKING_SEND_TIMEOUT = 10 * 1000;/*** 从 websocket session 中找到登录用户* 其中 MyUserDetails 继承自 org.springframework.security.core.userdetails.User* LoginUser、User 从业务层自定义的类* 项目中使用了spring security框架*/public static User findUser (Session session) {UsernamePasswordAuthenticationToken uToken = (UsernamePasswordAuthenticationToken) session.getUserPrincipal();MyUserDetails userDetails = (MyUserDetails) uToken.getPrincipal();LoginUser loginUser = (LoginUser) userDetails.getUserData();return (User) loginUser.getAdditionalInfo();}/*** 给 websocket session 设置参数*/public static void setProperties(Session session) {//设置websocket文本消息的长度为8M,默认为8ksession.setMaxTextMessageBufferSize(WEBSOCKET_MAX_TEXT_MESSAGE_BUFFER_SIZE);//设置websocket二进制消息的长度为8M,默认为8ksession.setMaxBinaryMessageBufferSize(WEBSOCKET_MAX_BINARY_MESSAGE_BUFFER_SIZE);Map<String, Object> userProperties = session.getUserProperties();//设置websocket发送消息的超时时长为10秒,默认为20秒userProperties.put(Constants.BLOCKING_SEND_TIMEOUT_PROPERTY, WEBSOCKET_BLOCKING_SEND_TIMEOUT);}
}

3.保存websocket session的接口

package com.xxx.robot.websocket;import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;import javax.websocket.Session;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public interface WebSocketSessionManager {Logger log = LoggerFactory.getLogger(WebSocketSessionManager.class);String PING = "ping";String PONG = "pong";Session get (String key);List<String> keys();void add (String key, Session session);Session remove (String key);/*** ping每一个websocket客户端,如果ping超时,则触发由@OnError注释的方法*/default void pingBatch () {List<String> keyList = keys();log.info("WebSocket: {} 数量为:{}", this.getClass().getSimpleName(), keyList.size());for (String key : keyList) {if (key != null) {Session session = get(key);if (session != null) {try {session.getBasicRemote().sendPing(ByteBuffer.wrap(PING.getBytes()));try {Thread.sleep(10);} catch (InterruptedException e1) {}} catch (Exception e) {log.error("WebSocket-ping异常", e);}}}}}/*** 消除所有websocket客户端*/default void clearAllSession () {List<String> keyList = keys();int i = 0;for (String key : keyList) {if (key != null) {Session session = get(key);if (session != null) {try {remove(key);i++;session.close();} catch (IOException e1) {log.error("WebSocket-移除并关闭session异常", e1);}if (i % 10 == 0) {try {Thread.sleep(0);} catch (InterruptedException e1) {}}}}}log.info("WebSocket-移除并关闭session数量为:{}", i);}
}

4.保存websocket session的类

package com.xxx.robot.websocket.robot.manager;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;import javax.websocket.Session;import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;import com.xxx.robot.websocket.WebSocketSessionManager;/*** 机器人模块WebSocket Session管理器*/
@Component
public class RobotSessionManager implements WebSocketSessionManager {/*** key = userId + '-' + managerId* userId 从当前登录用户中可得到, managerId由客户端连接websocket时按服务端的接口传给服务端* 因为业务中不仅要获取每一个客户端,还要获取同一个用户下的所有客户端,所以由ConcurrentHashMap改为ConcurrentSkipListMap*/private static final ConcurrentSkipListMap<String, Session> SESSION_POOL = new ConcurrentSkipListMap<>();public static final String joinKey (String userId, String managerId) {return userId + '-' + managerId;}public static final String joinKey (Long userId, String managerId) {return userId.toString() + '-' + managerId;}public static final String[] splitKey (String key) {return StringUtils.split(key, '-');}@Overridepublic Session get(String key) {return SESSION_POOL.get(key);}/*** 根据用户ID查询所有websocket session的key* @param userId* @param excludeManagerId 排除的key, 可为空* @return*/public List<String> keysByUserId(String userId, String excludeManagerId) {//'-'的ascii码为45, '.'的ascii码为46, 所以下面获得的是key以 userId + '-' 为前缀的map视图ConcurrentNavigableMap<String, Session> subMap = SESSION_POOL.subMap(userId + '-', userId + '.');NavigableSet<String> keySet = subMap.navigableKeySet();List<String> list = new ArrayList<>();if (StringUtils.isBlank(excludeManagerId)) {for (String key : keySet) {if (key != null) {list.add(key);}}} else {for (String key : keySet) {if (key != null && !key.equals(excludeManagerId)) {list.add(key);}}}return list;}@Overridepublic List<String> keys() {NavigableSet<String> keySet = SESSION_POOL.navigableKeySet();List<String> list = new ArrayList<>();for (String key : keySet) {if (key != null) {list.add(key);}}return list;}@Overridepublic synchronized void add(String key, Session session) {removeAndClose(key);SESSION_POOL.put(key, session);}@Overridepublic synchronized Session remove(String key) {return SESSION_POOL.remove(key);}/*** 必须key和value都匹配才能删除*/public synchronized void remove(String key, Session session) {SESSION_POOL.remove(key, session);}private void removeAndClose (String key) {Session session = remove(key);if (session != null) {try {session.close();} catch (IOException e) {}}}}

5.定义websocket 端点

package com.xxx.robot.websocket.robot.endpoint;import java.util.Map;import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;import org.springframework.stereotype.Component;import com.fasterxml.jackson.databind.JsonNode;
import com.xxx.framework.util.SpringBeanUtils;
import com.xxx.user.entity.User;
import com.xxx.robot.corefunc.service.RobotCoreService;
import com.xxx.robot.util.serial.BaseJsonUtils;
import com.xxx.robot.websocket.WebSocketSessionManager;
import com.xxx.robot.websocket.robot.manager.RobotSessionManager;
import com.xxx.robot.websocket.util.WebSocketSessionUtils;import lombok.extern.slf4j.Slf4j;/*** 机器人模块WebSocket接口* 每一次websocket请求,RobotWebSocketServer都是一个新的实例,所以成员变量是安全的* 以致虽然类由@Component注释,但不可使用@Autowired等方式注入bean*/
@Slf4j
@Component
@ServerEndpoint(value = "/robot/{id}")
public class RobotWebSocketServer {private volatile User user;private volatile String id;private volatile Session session;private volatile Map<String, RobotCoreService> robotCoreServiceMap;/*** 所有初始化操作都写在@OnOpen注释的方法中* 连接成功* @param session*/@OnOpenpublic void onOpen(@PathParam("id") String id, Session session) {WebSocketSessionUtils.setProperties(session);this.user = WebSocketSessionUtils.findUser(session);this.id = id;this.session = session;log.info("连接成功:{}, {}", id, this.user.getUserCode());//使用BeanUtils代替@Autowired获取bean, //RobotCoreService为业务类,不必关心robotCoreServiceMap = SpringBeanUtils.getApplicationContext().getBeansOfType(RobotCoreService.class);RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);//保存websocket sessionrobotSessionManager.add(RobotSessionManager.joinKey(this.user.getId(), id), session);}/*** 连接关闭* @param session*/@OnClosepublic void onClose() {log.info("连接关闭:{}, {}", this.id, this.user.getUserCode());RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);//连接关闭时,使用两个参数的remove方法,多线程下安全删除robotSessionManager.remove(RobotSessionManager.joinKey(this.user.getId(), this.id), this.session);}@OnErrorpublic void onError(Throwable error) {log.error("onError:id = {}, {}, {}", this.id, this.session.getId(), this.user.getUserCode(), error);RobotSessionManager robotSessionManager = SpringBeanUtils.getBean(RobotSessionManager.class);//websocket异常时,使用两个参数的remove方法,多线程下安全删除//比如ping客户端超时,触发此方法,删除该客户端robotSessionManager.remove(RobotSessionManager.joinKey(this.user.getId(), this.id), this.session);}/*** 接收到消息* @param message*/@OnMessagepublic void onMessage(String message) {log.info("onMessage:id = {}, {}, {}", this.id, this.user.getUserCode(), message);if (WebSocketSessionManager.PING.equals(message)) {//自定义ping接口,收到ping后,响应pong,客户端暂时未使用此接口this.session.getAsyncRemote().sendText(WebSocketSessionManager.PONG);return;}//用 try...catch 包裹防止抛出异常导致websocket关闭try {//业务层,使用jackson反序列化json,不必关心具体的业务JsonNode root = BaseJsonUtils.readTree(message);String apiType = root.at("/apiType").asText();//业务层代码应在子线程中执行,防止wesocket线程执行时间过长导致websocket关闭robotCoreServiceMap.get(apiType + "Service").receiveFrontMessage(this.user, RobotSessionManager.joinKey(this.user.getId(), this.id), root);} catch (Exception e) {log.error("处理消息错误", e);}}}

在这里插入图片描述

6.创建定时任务 ping websocket 客户端

package com.xxx.robot.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;/*** 启用定时任务功能* 因为websocket session是有状态的,只能保存在各自的服务端,* 所以只能使用单机式的定时任务,而不能使用分布式定时任务,* 因此 springboot自带的定时任务功能成为了首选* springboot定时任务线程池*/
@Configuration
@EnableScheduling
public class TaskExecutorConfig {@Beanpublic ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(5);executor.setQueueCapacity(10);executor.setKeepAliveSeconds(60);executor.setThreadNamePrefix("scheduler-executor-");return executor;}}
package com.xxx.robot.websocket;import java.util.List;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import lombok.extern.slf4j.Slf4j;/*** @author Sunzhihua*/
@Slf4j
@Component
public class WebSocketSchedulerTask {/*** 注入所有的 websocket session 管理器*/@Autowiredprivate List<WebSocketSessionManager> webSocketSessionManagers;/*** initialDelay 表示 延迟60秒初始化* fixedDelay 表示 上一次任务结束后,再延迟30秒执行*/@Scheduled(initialDelay = 60000, fixedDelay = 30000)public void clearInvalidSession() {try {log.info("pingBatch 开始。。。");for (WebSocketSessionManager webSocketSessionManager : webSocketSessionManagers) {webSocketSessionManager.pingBatch();}log.info("pingBatch 完成。。。");} catch (Exception e) {log.error("pingBatch异常", e);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/7492.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

机器学习优化器和SGD和SGDM实验对比(编程实现SGD和SGDM)

机器学习优化器和SGD和SGDM实验对比 博主最近在学习优化器&#xff0c;于是呢&#xff0c;就做了一个SGD和SGDM的实验对比&#xff0c;可谓是不做不知道&#xff0c;一做吓一跳&#xff0c;这两个算法最终对结果的影响还是挺大的&#xff0c;在实验中SGDM明星要比SGD效果好太多…

springBoot配置多环境

在代码中一般有3个环境&#xff0c;为了避免频繁的每次上线需要手动更改环境的问题。 test 本地测试环境&#xff0c;代码调试的 dev 服务端开发环境-用来验证用 prod 服务端正式环境 我创建2个做示例&#xff0c;里面写的不同配置 点运行的项目会有一个Edit Configurations…

​浅谈大型语言模型

大型语言模型&#xff08;Large Language Models&#xff0c;LLMs&#xff09;是一类强大的人工智能模型&#xff0c;具有出色的自然语言处理能力。它们在许多任务中表现出色&#xff0c;如机器翻译、文本摘要、对话生成和情感分析等。下面我们将介绍大型语言模型的训练和生成过…

智谱AI-算法实习生(知识图谱方向)实习面试记录

岗位描述 没错和我的经历可以说是match得不能再match了&#xff0c;但是还是挂了hh。 面试内容 给我面试的是唐杰老师的博士生&#xff0c;方向是社交网络数据挖掘&#xff0c;知识图谱。不cue名了&#xff0c;态度很友好的 &#xff0c;很赞。 date&#xff1a;6.28 Q1 自…

【码银送书第一期】通用人工智能:初心与未来

目录 前言 正文 内容简介 作者简介 译者简介 目录 前言 自20世纪50年代图灵在其划时代论文《计算机器与智能》中提出“图灵测试”以及之后的达特茅斯研讨会开始&#xff0c;用机器来模仿人类学习及其他方面的智能&#xff0c;即实现“人工智能”&#xff08;Artificial …

ORA-31664: unable to construct unique job name when defaulted

某个环境备份不足空间问题处理后&#xff0c;手动执行expdp备份的脚本&#xff0c;报错如下 Export: Release 11.2.0.4.0 - Production on Tue Jul 4 11:46:14 2023 Copyright (c) 1982, 2011, Oracle and/or its affiliates. All rights reserved. Connected to: Oracle D…

获取移动设备的电池信息

通过BatteryManager来获取关于电池的信息 实例 package com.example.softwarepatentdemo;import android.content.BroadcastReceiver; import android.content.Context; import android.content.Intent; import android.content.IntentFilter; import android.os.BatteryManag…

2023年大学计算机专业实习心得14篇

2023年大学计算机专业实习心得精选篇1 20__年已然向我们挥手告别而去了。在20__年初之际&#xff0c;让我们对过去一年的工作做个总结。忙碌的一年里&#xff0c;在领导及各位同事的帮助下&#xff0c;我顺利的完成了20__年的工作。为了今后更好的工作&#xff0c;总结经验&…

Docker|kubernetes|本地镜像批量推送到Harbor私有仓库的脚本

前言&#xff1a; 可能有测试环境&#xff0c;而测试环境下有N多的镜像&#xff0c;需要批量导入到自己搭建的Harbor私有仓库内&#xff0c;一般涉及到批量的操作&#xff0c;自然还是使用脚本比较方便。 本文将介绍如何把某个服务器的本地镜像 推送到带有安全证书的私有Harb…

GAMES101笔记 Lecture07 Shading1(Illumination, Shading and Graphics Pipeline)

目录 Visibility / Occlusion(可见性 or 遮挡)Painters Algorithm(画家算法)Z-Buffer(深度缓冲算法) Shading(着色)A Simple Shading Model(Blinn-Phong Reflectance Model)一个简单的着色模型&#xff1a;Blinn-Phong反射模型Diffuse Reflection(漫反射) 参考资源 Visibility …

性能测试该怎么做,终于找到方法了

目录 开头 分类 服务器与场景设计 计算TPS 设计场景 场景运用 单交易最大压力&#xff1a; 单交易稳定性&#xff1a; 混合场景稳定性&#xff1a; 业务指标&#xff1a; 数据库 中间件 负载均衡&#xff1a; 最后&#xff1a; 开头 性能测试的工具有很多&#xf…

FreeRTOS学习笔记—基础知识

文章目录 一、什么是RTOS二、前后台系统三、实时内核&#xff08;可剥夺型内核&#xff09;四、RTOS系统五、FreeRTOS系统简介六、FreeRTOS源码下载 一、什么是RTOS RTOS全称为:Real Time OS&#xff0c;就是实时操作系统&#xff0c;核心在于实时性。实时操作系统又分为硬实时…