开源模型应用落地-业务优化篇(三)

一、前言

    假如您跟随我的脚步,学习到上一篇的内容,到这里,相信细心的您,已经发现了,在上一篇中遗留的问题。那就是IM服务过载的时候,如何进行水平扩容?

    因为在每个IM服务中,我们用JVM缓存了用户与WS的通道的绑定关系,并且使用Redis队列进行解耦。那扩展了IM服务实例之后,如何确保Redis队列的消息能正常消费,即如何能找回对应的用户通道?别着急,接下来,我将给您做详细的解释。


二、术语

2.1.水平扩容

    是指通过增加系统中的资源实例数量来提高系统的处理能力和吞吐量。在计算机领域,水平扩容通常用于应对系统负载的增加或需要处理更多请求的情况。

2.2.无状态

    无状态(stateless)是指系统或组件在处理请求时不依赖于之前的请求或会话信息。换句话说,每个请求都是独立的,系统不会在不同的请求之间保存任何状态或上下文信息。

    在无状态系统中,每个请求被视为一个独立的事件,系统只关注当前请求所包含的信息和参数,而不依赖于之前的请求历史。这使得系统更加简单、可伸缩和易于管理。


三、前置条件

3.1. 已经完成前两篇的学习


四、技术实现

4.1、实现思路

    首先,IM服务是状态的(AI服务是无状态),每个实例中,会缓存用户与WebSocket通道之间的信息。那是否可以采用中间共享存储的方式,将状态信息保存至Redis或外部存储中?答案是:不行。WebScoket的通道信息,无法进行序列化。

    要实现IM服务水平扩容的方式有多种,但目前我们采用以下的方案:

  1.   每个IM服务保存对应的用户和WS通道的关系;
  2.   每个IM服务对应唯一一个redis队列;
  3.   前置的SLB(App入口)能根据用户标识(哈希)将请求转发至指定的IM服务;
  4.   当某一IM服务出现故障的时候,由App端发起重连,重新建立WebSocket连接。

4.2、调整配置文件

# 每个IM服务实例指定全局唯一的ID,例如:下面指定的node:0

ws:server:node: 0

PS:具体参数可以在外部指定,作为JVM的运行参数传入

4.3、调整业务逻辑处理类

# 将原有Redis的单一队列名,改为拼接上全局唯一ID的方式

# Redis中缓存的数据如下

4.4、调整任务处理类

# 将原有Redis的单一队列名,改为拼接上全局唯一ID的方式


五、测试

# 这次换一下测试方式,用离线页面的方式进行测试

5.1.  建立连接

5.2.  业务初始化

5.3.  业务对话


六、附带说明

6.1. BusinessHandler完整代码

import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandler;
import lombok.extern.slf4j.Slf4j;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;/*** @Description: 处理消息的handler*/
@Slf4j
@ChannelHandler.Sharable
@Component
public class BusinessHandler extends AbstractBusinessLogicHandler<TextWebSocketFrame> {public static final String LINE_UP_QUEUE_NAME = "AI-REQ-QUEUE";private static final String LINE_UP_LOCK_NAME = "AI-REQ-LOCK";private static final int MAX_QUEUE_SIZE = 100;//    @Autowired
//    private TaskUtils taskExecuteUtils;@Autowiredprivate RedisUtils redisUtils;@Autowiredprivate RedissonClient redissonClient;@Autowiredprivate NettyConfig nettyConfig;@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {String channelId = ctx.channel().id().asShortText();log.info("add client,channelId:{}", channelId);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {String channelId = ctx.channel().id().asShortText();log.info("remove client,channelId:{}", channelId);}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame)throws Exception {// 获取客户端传输过来的消息String content = textWebSocketFrame.text();// 兼容在线测试if (StringUtils.equals(content, "PING")) {buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode())).respTime(String.valueOf(System.currentTimeMillis())).msgType(String.valueOf(MsgType.HEARTBEAT.getCode())).contents("心跳测试,很高兴收到你的心跳包").build());return;}log.info("接收到客户端发送的信息: {}", content);Long userIdForReq;String msgType = "";String contents = "";try {ApiReqMessage apiReqMessage = JSON.parseObject(content, ApiReqMessage.class);msgType = apiReqMessage.getMsgType();contents = apiReqMessage.getContents();userIdForReq = apiReqMessage.getUserId();// 用户身份标识校验if (null == userIdForReq || (long) userIdForReq <= 10000) {ApiRespMessage apiRespMessage = ApiRespMessage.builder().code(String.valueOf(StatusCode.SYSTEM_ERROR.getCode())).respTime(String.valueOf(System.currentTimeMillis())).contents("用户身份标识有误!").msgType(String.valueOf(MsgType.SYSTEM.getCode())).build();buildResponseAndClose(channelHandlerContext, apiRespMessage);return;}if (StringUtils.equals(msgType, String.valueOf(MsgType.CHAT.getCode()))) {// 对用户输入的内容进行自定义违规词检测// 对用户输入的内容进行第三方在线违规词检测// 对用户输入的内容进行组装成Prompt// 对Prompt根据业务进行增强(完善prompt的内容)// 对history进行裁剪或总结(检测history是否操作模型支持的上下文长度,例如qwen-7b支持的上下文长度为8192)// ...//                通过线程池来处理
//                String messageId = apiReqMessage.getMessageId();
//                List<ChatContext> history = apiReqMessage.getHistory();
//                AITaskReqMessage aiTaskReqMessage = AITaskReqMessage.builder().messageId(messageId).userId(userIdForReq).contents(contents).history(history).build();
//                taskExecuteUtils.execute(aiTaskReqMessage);//                通过队列来缓冲boolean flag = true;RLock lock = redissonClient.getLock(LINE_UP_LOCK_NAME);String queueName = LINE_UP_QUEUE_NAME+"-"+nettyConfig.getNode();//尝试获取锁,最多等待3秒,锁的自动释放时间为10秒if (lock.tryLock(3, 10, TimeUnit.SECONDS)) {try {if (redisUtils.queueSize(queueName) < MAX_QUEUE_SIZE) {redisUtils.queueAdd(queueName, content);log.info("当前线程为:{}, 添加请求至redis队列",Thread.currentThread().getName());} else {flag = false;}} catch (Throwable e) {log.error("系统处理异常", e);} finally {lock.unlock();}} else {flag = false;}if (!flag) {buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode())).respTime(String.valueOf(System.currentTimeMillis())).msgType(String.valueOf(MsgType.SYSTEM.getCode())).contents("当前排队人数较多,请稍后再重试!").build());}} else if (StringUtils.equals(msgType, String.valueOf(MsgType.INIT.getCode()))) {//一、业务黑名单检测(多次违规,永久锁定)//二、账户锁定检测(临时锁定)//三、多设备登录检测//四、剩余对话次数检测//检测通过,绑定用户与channel之间关系addChannel(channelHandlerContext, userIdForReq);String respMessage = "用户标识: " + userIdForReq + " 登录成功";buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode())).respTime(String.valueOf(System.currentTimeMillis())).msgType(String.valueOf(MsgType.INIT.getCode())).contents(respMessage).build());} else if (StringUtils.equals(msgType, String.valueOf(MsgType.HEARTBEAT.getCode()))) {buildResponse(channelHandlerContext, ApiRespMessage.builder().code(String.valueOf(StatusCode.SUCCESS.getCode())).respTime(String.valueOf(System.currentTimeMillis())).msgType(String.valueOf(MsgType.HEARTBEAT.getCode())).contents("心跳测试,很高兴收到你的心跳包").build());} else {log.info("用户标识: {}, 消息类型有误,不支持类型: {}", userIdForReq, msgType);}} catch (Exception e) {log.warn("【BusinessHandler】接收到请求内容:{},异常信息:{}", content, e.getMessage(), e);// 异常返回return;}}}

6.2. TaskUtils完整代码

import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;@Component
@Slf4j
public class TaskUtils implements ApplicationRunner {private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);@Autowiredprivate AIChatUtils aiChatUtils;@Autowiredprivate RedisUtils redisUtils;@Autowiredprivate NettyConfig nettyConfig;@Overridepublic void run(ApplicationArguments args) throws Exception {while(true){String queueName = BusinessHandler.LINE_UP_QUEUE_NAME+"-"+nettyConfig.getNode();
//             执行定时任务的逻辑String content = redisUtils.queuePoll(queueName);if(StringUtils.isNotEmpty(content) && StringUtils.isNoneBlank(content)){try{ApiReqMessage apiReqMessage = JSON.parseObject(content, ApiReqMessage.class);String messageId = apiReqMessage.getMessageId();String contents = apiReqMessage.getContents();Long userIdForReq = apiReqMessage.getUserId();List<ChatContext> history = apiReqMessage.getHistory();AITaskReqMessage aiTaskReqMessage = AITaskReqMessage.builder().messageId(messageId).userId(userIdForReq).contents(contents).history(history).build();execute(aiTaskReqMessage);}catch (Throwable e){log.error("处理消息出现异常",e);//将请求再次返回去队列//将请求丢弃//其他处理?}}else{TimeUnit.SECONDS.sleep(1);}}}public void execute(AITaskReqMessage aiTaskReqMessage) {executorService.execute(() -> {Long userId = aiTaskReqMessage.getUserId();if (null == userId || (long) userId < 10000) {log.warn("用户身份标识有误!");return;}ChannelHandlerContext channelHandlerContext = BusinessHandler.getContextByUserId(userId);if (channelHandlerContext != null) {try {aiChatUtils.chatStream(aiTaskReqMessage);} catch (Throwable exception) {exception.printStackTrace();}}});}public static void destory() {executorService.shutdownNow();executorService = null;}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/444482.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL数据控制语言DCL

MySQL数据控制语言DCL 目录 MySQL数据控制语言DCLDCL关键字1.事务事务的四大特性START TRANSACTION&#xff1a;开始事务ROLLBACK&#xff1a;回滚COMMIT&#xff1a;提交事务 2.用户权限CREATE USER&#xff1a;创建新的用户并指定权限DROP USER&#xff1a;删除用户ALTER USE…

RHCE DNS域名解析服务器

目录 1. 正向解析 1.1 安装必要软件 1.2 配置静态ip 1.3 DNS配置 1.4 测试 2. 反向解析 2.1 关闭安全软件&#xff0c;安装必要软件 2.2 配置静态ip 2.3 DNS配置 2.4 测试 1. 正向解析 1.1 安装必要软件 1.2 配置静态ip 服务器配置 nmcli c modify ens32 ipv4.method man…

【JavaSE】抽象类与接口

一、抽象类 在面向对象的概念中&#xff0c;所有的对象都是通过类来描绘的&#xff0c;但是反过来&#xff0c;并不是所有的类都是用来描绘对象的&#xff0c;如果一个类中没有包含足够的信息来描绘一个具体的对象&#xff0c;这样的类就是抽象类 我们把没有实际工作的方法设…

跟着cherno手搓游戏引擎【16】Camera和Uniform变量的封装

相机封装&#xff1a; OrthographicCamera.h: #pragma once #include <glm/glm.hpp> namespace YOTO {class OrthographicCamera{public:OrthographicCamera(float left,float right , float bottom,float top);const glm::vec3& GetPosition()const { return m_Pos…

1.26囚徒困境(单次,多次(有限次数,无限次数)),四种策略(netlogo建模最优,利益矩阵)

单次囚徒困境 转为奖励性矩阵就是说&#xff0c;被判时间越长那么奖励越少&#xff0c;反之奖励越多 有限次数博弈 就是说最后一次了&#xff0c;就随便破罐子破摔&#xff0c;不再继续合作&#xff0c;直接选择自己利益最大化了&#xff0c;如果有方式可以使其在原来、之前的…

SpringMVC-基本概念

一、引子 我们在上篇文章Spring集成Web中抛出了一个问题&#xff1a;为什么我们一直在自用Java Web阶段使用的Servlet来承接客户端浏览器的请求呢&#xff0c;我们熟知甚至是已经在日常开发中经常使用的Controller又与之有什么关系呢&#xff1f;我们将在本篇文章解答读者的这…

盲盒小程序开发,小程序带来的优势

我国盲盒行业的产品主要是以手办、公仔、动漫周边等为主&#xff0c;与各类知名IP合作推出的盲盒产品引起了年轻人的兴趣&#xff0c;盲盒市场得到了快速发展。目前&#xff0c;我国盲盒行业已经进入了蓬勃发展时期&#xff0c;商业机遇较多&#xff01; 在互联网时代下&#…

神经网络与深度学习Pytorch版 Softmax回归 笔记

Softmax回归 目录 Softmax回归 1. 独热编码 2. Softmax回归的网络架构是一个单层的全连接神经网络。 3. Softmax回归模型概述及其在多分类问题中的应用 4. Softmax运算在多分类问题中的应用及其数学原理 5. 小批量样本分类的矢量计算表达式 6. 交叉熵损失函数 7. 模型预…

韶音、南卡、Oladance值不值得买?全面对比测评拒绝智商税!

​在目前市场上&#xff0c;有许多质量不佳、音质差的开放式耳机产品。这些产品不仅会影响音频的质量&#xff0c;还可能对用户的听力健康造成潜在风险。作为一名经验丰富的音频设备评测师&#xff0c;我深知在选择耳机时&#xff0c;必须谨慎选择那些具有专业实力的品牌。基于…

美睫师睫毛嫁接零基础学习,日式美睫与开花嫁接实战教学

一、教程描述 大家都说女人的钱好挣&#xff0c;这是因为每个女人在每年&#xff0c;都要花很多钱来打扮自己。本套教程是关于日式美睫和开花嫁接的&#xff0c;从零基础学习到店铺经营都有涉及&#xff0c;就做美睫和睫毛嫁接这两项业务&#xff0c;月收入万元以上应该问题不…

关于bypassuac的探究——uac程序特性探究

通常以shell\open\command命名的键值对存储的是可执行文件的路径&#xff0c;如果exe程序运行的时候找到该键值对&#xff0c;就会运行该键值对的程序&#xff0c;而因为exe运行的时候是静默提升了权限&#xff0c;所以运行的该键值对的程序就已经过了uac。所以我们把恶意的exe…

docker私有库

1.registry私有仓库 拉取registry镜像 docker pull registry 修改docker配置文件并重启 vim /etc/docker/daemon.json {"insecure-registries": ["172.16.23.23:5000"], #添加&#xff0c;注意用逗号结尾"registry-mirrors": ["ht…