这Bug只能通过压测发现

大家好,我是洋子。之前发布过一篇有关于在性能测试当中发现Bug的文章《因为一个Bug,差点损失了100w》 这篇文章当时还登上了CSDN全站综合热榜TOP1,最近工作在做性能测试时,又发现了几个比较有意思得Bug,本期分享其中的一个,涉及Redis并发场景下的应用

有意思的是,这个Bug因为没有代码语法错误,并发量少的情况下,下游的监控也不会出现报警,所以光靠功能测试是没有办法发现,只能通过压测(性能测试)或者下游的监控报警才能发现

我们先来看一段Go语言实现的代码,这段代码的意思就是先获取(Get) Redis Key 的值,这个值只有true 或者 false 两种情况 ,如果是true 则直接返回,不执行后续代码逻辑,如果是 false 则 先设置(Set) Redis Key 的值为true,再执行后续代码逻辑

	var flag = false// 获取锁redisCache := redis.NewCache()if err := redisCache.Get(SetUserSizeRedisKey, &flag); err != nil {ctx.WarningF("request redis to get _user_size_flag fail, err: %s", err)}// flag = true说明已经有实例在请求了, 直接返回// 否则 设置redis锁if flag {ctx.Notice("request_user_size_flag is true, return")return} else {ctx.Notice("request_user_size_flag is false, request im to flush room user size")if err := redisCache.Set(SetUserSizeRedisKey, true, ScriptMaxRunTime); err != nil {ctx.WarningF("set request_user_size_flag to redis fail, err: %s", err)}}// ... 后续具体的业务代码逻辑,可忽略

然而单纯利用从Redis 当中获取一个Bool值,以此来充当互斥锁,这种实现方案,在同一时刻只有一个用户请求能满足需求,但是在并发场景会出现无法锁住的情况,如下图,在初始条件下,即Redis Key 还从来没有被Set时(Key不存在时),当3个用户同时从Redis 读取到的值均为False ,就有3个用户同时去Set Redis Key,并且走到后续的代码逻辑

所以并发场景下,“锁”失效了
在这里插入图片描述
"锁"失效了有什么影响,继续给出完整代码逻辑,这段代码其实是定时任务的一部分,在执行期间,会请求下游服务获得相关数据

在并发场景下,“锁”失效了会导致下游的服务压力上涨,假设下游只能抗50QPS,现在QPS 已经到5000了,严重情况下还会出现IO打满,CPU和内存打满,服务宕机等风险

package mainimport "time"var (CrontabTime      = 20  // 每20s执行一次脚本ScriptMaxRunTime = 150 // 脚本最长运行时间150s
)func SetUserSize(ctx *gin.Context) {var flag = false// 获取锁redisCache := redis.NewCache()if err := redisCache.Get(SetUserSizeRedisKey, &flag); err != nil {ctx.WarningF("request redis to get _user_size_flag fail, err: %s", err)}// flag = true说明已经有实例在请求了, 直接返回// 否则 设置redis锁if flag {ctx.Notice("request_user_size_flag is true, return")return} else {ctx.Notice("request_user_size_flag is false, request im to flush room user size")if err := redisCache.Set(SetUserSizeRedisKey, true, ScriptMaxRunTime); err != nil {ctx.WarningF("set request_user_size_flag to redis fail, err: %s", err)}}// 以下是定时任务的具体逻辑ctx.NoticeF("run time start:%d", util.GetMilliSecond())// 执行定时任务前,前置获取相关必要信息info, err := GetInfo()if err != nil {ctx.WarningF("request  info fail, error: %s", err)// 获取信息失败,提前释放锁_ = redisCache.Del(SetUserSizeRedisKey)return}ctx.NoticeF("run time start req im:%d", util.GetMilliSecond())// 定时任务具体逻辑 for _, people := range info {//请求下游// ...}ctx.NoticeF("run time end:%d", util.GetMilliSecond())
}

如何解决请求下游数量超限这个问题呢,有两种解法:第一种是在请求下游前,增加判断当前QPS。第二种是使用Redis 分布式锁setnx

限定QPS

先看第一种方案是在请求下游前,判断是否超过最大的QPS,如何获取QPS呢,QPS是在做性能测试时,我们常用的性能指标,指每秒的查询数量,用来衡量系统每秒处理的请求数量

那么要获取QPS,自然要获得当前的秒数,如果是在同一秒请求,我们用当前秒数作为Redis Key ,值初始为0,同一秒内每有一次请求,就把Redis 的值加1,这样就拿到了QPS(见代码当中的GetLimitRequest方法)

now := util.GetSecond()
nowMilli := util.GetMilliSecond() 
res := GetLimitRequest(ctx, now) // GetLimitRequest 获取当前qps
func GetLimitRequest(now int64) int {key := fmt.Sprintf("limit_key_request_service_%v", now)redisCache := redis.NewCache(ctx)res, _ := redisCache.Incr(key)if res > 0 {go redisCache.Expire(key, 60)}return res
}

那QPS超出限额了怎么办,得计算到下1秒还有多少时间,要精确计算的话,我们只能可以获取比秒更小的单位-毫秒进行计算,分别获取下1s的时间(毫秒为单位),以及当前这1s的时间(同样毫秒为单位),两者相减,这样就知道到下1秒还差多少毫秒(对应下面代码的变量gap),让系统sleep gap对应毫秒数,这样就可以使得请求的维持在最大的QPS范围内

完整的代码片段如下

package mainimport "time"var (CrontabTime      = 20  // 每20s执行一次脚本ScriptMaxRunTime = 150 // 脚本最长运行时间150s
)func SetUserSize() {var flag = false// 获取锁redisCache := redis.NewCache()if err := redisCache.Get(SetUserSizeRedisKey, &flag); err != nil {ctx.WarningF("request redis to get _user_size_flag fail, err: %s", err)}// flag = true说明已经有实例在请求了, 直接返回// 否则 设置redis锁if flag {ctx.Notice("request_user_size_flag is true, return")return} else {ctx.Notice("request_user_size_flag is false, request im to flush room user size")if err := redisCache.Set(SetUserSizeRedisKey, true, ScriptMaxRunTime); err != nil {ctx.WarningF("set request_user_size_flag to redis fail, err: %s", err)}}ctx.NoticeF("run time start:%d", util.GetMilliSecond())// 执行定时任务前,前置获取相关必要信息info, err := GetInfo()if err != nil {ctx.WarningF("request  info fail, error: %s", err)// 获取信息失败,提前释放锁_ = redisCache.Del(SetUserSizeRedisKey)return}ctx.NoticeF("run time start req im:%d", util.GetMilliSecond())// 定时任务具体逻辑,可忽略for _, people := range info {now := util.GetSecond()nowMilli := util.GetMilliSecond() res := GetLimitRequest(ctx, now)if res > MaxQps {gap := (now+1)*1000 - nowMilliif gap > 0 {time.Sleep(time.Duration(gap) * time.Millisecond) // 在sleep 期间 不再请求下游}} else if res == 0 { //异常time.Sleep(time.Duration(40) * time.Millisecond)}}// 请求下游具体代码逻辑,可忽略// ...// ...// 定时任务执行完毕,主动释放锁_ = redisCache.Del(SetUserSizeRedisKey)ctx.NoticeF("run time end:%d", util.GetMilliSecond())}/ GetLimitRequest 获取当前qps
func GetLimitRequest(now int64) int {key := fmt.Sprintf("limit_key_request_service_%v", now)redisCache := redis.NewCache(ctx)res, _ := redisCache.Incr(key)if res > 0 {go redisCache.Expire(key, 60)}return res
}

使用Redis分布式锁

setnx是Redis的一个命令,它代表"Set if Not eXists"。这个命令尝试在Redis中设置一个键值对,但仅当指定的键不存在时才会成功。如果键已经存在,setnx操作将失败

我们可以使用setnx命令来创建Redis分布式锁,分布式锁是一种机制,用于确保在分布式系统中的多个节点或线程不会同时访问或修改共享资源,以避免竞态条件(race conditions)

分布式锁的主要目的是确保在分布式系统中,只有一个客户端(或线程)能够成功获得锁,以执行关键任务,而其他客户端必须等待

setnx的使用方式是,客户端通常会使用setnx命令尝试创建一个带有唯一标识的锁,然后在锁上设置一个过期时间,以防止锁被永久占用。当客户端不再需要锁时,可以使用del命令来释放锁

对于上面的并发问题,我们还可以使用SetNX来解决

func SetUserSize(ctx *gin.Context) {ExpireTime:= int64(3)client := redis.NewCache(ctx)key := fmt.Sprintf("set_locker_%s", "param_ex")res, err := client.SetNX(ctx, key, time.Now().Unix(), ExpireTime) //创建redis 分布式锁,ExpireTime过期时间为3秒if err != nil {ctx.WarningF("SetQuestionStatus get lock fail, err: %v", err)errno.ErrRet(ctx, errno.ErrCallCacheFail)return}if res != true {errno.ErrRet(ctx, errno.ErrSetInfo)return}defer client.Del(ctx, key) //执行完删除Redis分布式锁,让其他线程能正常获取锁,避免永久等待//... 执行后续逻辑
}

用一张图片再来对比一下两种实现方案的区别,使用Redis分布式锁能帮助解决高并发下互斥任务的问题,但需要注意设置过期时间,避免永久锁住资源
在这里插入图片描述
下一期我会继续分享压测中发现的性能问题以及排查、调优实战解决方案,欢迎星标【测试开发Guide】公众号,及时获取最新推文

《测试开发面试宝典》已发布,现在订阅免费加入CSDN测试社区(内含测开面试录音,简历案例库,学习资源等多种重磅福利)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/145318.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

估值 10 亿美金的红帽 (Red Hat) 公司究竟是从哪冒出来的?

译者序 原文链接:The History of Red Hat[1] 译者水平有限,不免存在遗漏或错误之处。如有疑问,敬请查阅原文。 以下是译文。 Bob Young 的创业之路 成长背景 Bob Young 于 1954 年出生于加拿大安大略的汉密尔顿。他与祖母住得很近&#xff0c…

DC电源模块的数字电源优势

BOSHIDA DC电源模块的数字电源优势 数字电源模块是指在电源的设计和控制上采用数字式方案,采用数字化技术,将传统的电源模块从模拟传统电源转变为数字电源变成的模块。 传统的电源模块使用模拟技术,其主要优势在于可控性高、稳定性好&#…

跨越单线程限制:Thread类的魅力,引领你进入Java并发编程的新纪元

线程的概述 线程是一个程序的多个执行路径,执行调度的单位,依托于进程存在。 线程不仅可以共享进程的内存,而且还拥有一个属于自己的内存空间,这段内存空间也叫做线程栈,是在建立线程时由系统分配的,主要用…

学习笔记2——Nosql

学习笔记系列开头惯例发布一些寻亲消息 链接:https://baobeihuijia.com/bbhj/contents/3/194205.html 跟学链接 跟学视频链接:https://www.bilibili.com/video/BV1S54y1R7SB/?spm_id_from333.999.0.0 (建议有java基础的同学学习或者一直…

一天吃透Java面试题

给大家分享我整理的Java高频面试题,有小伙伴靠他拿到字节offer了。 Java基础面试题 Java的特点Java 与 C 的区别JDK/JRE/JVM三者的关系Java程序是编译执行还是解释执行?面向对象和面向过程的区别?面向对象有哪些特性?数组到底是…

为什么嵌入通常优于TF-IDF:探索NLP的力量

塔曼纳 一、说明 自然语言处理(NLP)是计算机科学的一个领域,涉及人类语言的处理和分析。它用于各种应用程序,例如聊天机器人、情绪分析、语音识别等。NLP 中的重要任务之一是文本分类,我们根据文本的内容将文本分类为不…

VS Code关闭受限模式,关闭信任工作区

打开VS code每次出现这个界面,烦戳死!今天,贷款也要把它关掉! 1、打开设置: 2、搜索以下值 security.workspace.trust3、重新启动VS Code即可! 4、或者直接在用户的设置文件 settings.json中加入以下: &…

CMMI V2.2模型介绍

1、CMMI模型内容编排 模型内容主要由三篇组成,包括概述、实践域(PA)和附录,共分为6个章节。 章节 标题说明篇章:概述第1章关于CMMI V2.0概述CMMI V2.0产品套件,包括模型的执行摘要。第2章 成…

whois人员信息python批处理读入与文本输出

使用pytho读取一个ip列表文本,批量获取whois输出并写入到一个文本 import socketif __name__ __main__:# 江苏电信DNS地址mylog open(whois.log, mode a,encodingutf-8)for line in open("ip.txt"):s socket.socket(socket.AF_INET, socket.SOCK_STR…

2023年中职组“网络安全”赛项云南省竞赛任务书

2023年中职组“网络安全”赛项 云南省竞赛任务书 一、竞赛时间 总计:360分钟 竞赛阶段 竞赛阶段 任务阶段 竞赛任务 竞赛时间 分值 A模块 A-1 登录安全加固 180分钟 200分 A-2 本地安全策略配置 A-3 流量完整性保护 A-4 事件监控 A-5 服务加固…

算法基础学习|前缀和差分

前缀和 代码模板 一维前缀和模板 S[i] a[1] a[2] ... a[i] a[l] ... a[r] S[r] - S[l - 1]二维前缀和模板 S[i, j] 第i行j列格子左上部分所有元素的和 以(x1, y1)为左上角,(x2, y2)为右下角的子矩阵的和为: S[x2, y2] - S[x1 - 1, y2] - S[x2, …

S4.2.4.3 Electrical Idle Sequence(EIOS)

一 本章节主讲知识点 1.1 EIOS的具体码型 1.2 EIOS的识别规则 1.3 EIEOS的具体码型 二 本章节原文翻译 当某种状态下,发送器想要进入电器空闲状态的时候,发送器必须发送EIOSQ,也既是:电器Electrical Idle Odered Set Sequenc…