高并发幂等计数器的设计与实现

🌷🍁 博主猫头虎 带您 Go to New World.✨🍁
🦄 博客首页——猫头虎的博客🎐
🐳《面试题大全专栏》 文章图文并茂🦕生动形象🦖简单易学!欢迎大家来踩踩~🌺
🌊 《IDEA开发秘籍专栏》学会IDEA常用操作,工作效率翻倍~💐
🌊 《100天精通Golang(基础入门篇)》学会Golang语言,畅玩云原生,走遍大小厂~💐

🪁🍁 希望本文能够给您带来一定的帮助🌸文章粗浅,敬请批评指正!🍁🐥

文章目录

  • 高并发幂等计数器的设计与实现
    • 摘要
    • 引言
    • 问题描述:
    • 依赖组件
    • 实现思路
    • Go 代码示例
    • Java 代码示例
      • 简单代码:
      • 详细代码:
      • 思路解析
      • 优化问题
      • 解决方法
        • 解决方案一(不使用Redis):
        • 方案二: 使用Redis
    • Python 代码示例
    • 拓展:
      • 1. 大量请求同时到来
        • 扩展性:
        • 限流:
        • 缓存:
        • 异步处理:
      • 2. 合适的过期时间
      • 如果不用redis呢?
      • 1. 数据库唯一索引
      • 2. 应用内存
      • 3. 分布式锁
      • 4. 消息队列
      • 5. 文件系统
    • 总结
  • 原创声明

高并发幂等计数器的设计与实现

在这里插入图片描述

摘要

本文探讨了如何实现一个高并发、幂等的计数器服务,该服务用于处理外部的 inc 请求以增加特定视频的播放计数。考虑到网络延迟和重试等因素,该服务需要确保每个请求至少被处理一次,同时避免重复计数。我们使用了 MySQL 用于持久化存储计数数据,并用 Redis 进行幂等性检查。本文通过 Go、Java 和 Python 三种编程语言展示了具体的实现代码,并对核心逻辑进行了详细解释。Java 代码部分更是进行了全流程的展示,包括幂等性检查、数据库更新和已处理请求的记录。这样的设计不仅确保了高并发处理能力,还实现了请求的幂等性。

引言

在分布式系统中,高并发和幂等性是两个非常关键的问题。本文将探讨如何实现一个高并发、幂等的计数器服务。该服务接受外部的 inc 请求,用于增加特定视频的播放计数。由于网络延迟和请求重试等原因,多个相同或不同的 inc 请求可能并发到达服务。因此,服务需要确保每个请求至少被处理一次(at least once),同时避免重复计数。我们将使用 Go、Java 和 Python 来分别演示这一实现。

问题描述:

高并发幂等计数器题目
问题描述:
1.实现一个计数器服务
2.服务接收外部的 inc 请求,每个请求具有全局唯一 request id 和视频 id
3.因为网络和重试的原因,请求可能会重复的到达
4.时序上,多个重复的请求可能并发达到,两次重复请求之间的间隔不可预期
5.需要保证 at least once ,计数值不能丢失
6.可以依赖一些外部组件, mysql redis

依赖组件

  • MySQL: 用于持久化存储计数器的数据。
  • Redis: 用于高速缓存和临时存储已经接收到的 request id。

实现思路

  1. 接收请求: 使用 Web 框架接收 inc 请求,并提取其中的 request_idvideo_id
  2. 幂等检查: 使用 Redis 查询该 request_id,如果已存在,则该请求已被处理。
  3. 队列或缓存: 如果是新的 request_id,则将其存入 Redis,并进行数据库更新操作。
  4. 计数逻辑: 从 MySQL 中获取当前计数,然后加 1,并更新回数据库。

Go 代码示例

// 导入相应的包
import ("github.com/go-redis/redis/v8""database/sql"// 其他必要的包
)func incHandler(requestID string, videoID string) string {if isProcessed(requestID) {return "Already Processed"}// 更新数据库updateCounter(videoID)return "OK"
}func isProcessed(requestID string) bool {// Redis 检查val, _ := redisClient.Get(ctx, requestID).Result()return val != ""
}func updateCounter(videoID string) {// MySQL 更新// 省略具体实现
}

Java 代码示例

简单代码:

import redis.clients.jedis.Jedis;
import org.springframework.jdbc.core.JdbcTemplate;
// 其他必要的导入@RestController
public class CounterController {@AutowiredJedis jedis;@AutowiredJdbcTemplate jdbcTemplate;@RequestMapping("/inc")public String inc(@RequestParam String requestId, @RequestParam String videoId) {if (isProcessed(requestId)) {return "Already Processed";}// 更新数据库updateCounter(videoId);return "OK";}public boolean isProcessed(String requestId) {// Redis 检查return jedis.exists(requestId);}public void updateCounter(String videoId) {// MySQL 更新// 省略具体实现}
}

详细代码:


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.bind.annotation.*;
import redis.clients.jedis.Jedis;@RestController
public class CounterController {@Autowiredprivate Jedis jedis;@Autowiredprivate JdbcTemplate jdbcTemplate;@GetMapping("/inc")public String inc(@RequestParam String requestId, @RequestParam String videoId) {// Step 1: 幂等性检查if (isProcessed(requestId)) {return "Already Processed";}// Step 2: 更新数据库if (updateCounter(videoId)) {// Step 3: 将 requestId 存入 Redis 以保证幂等性jedis.set(requestId, "true");return "OK";}return "Failed";}private boolean isProcessed(String requestId) {// 使用 Redis 检查 requestId 是否已处理return jedis.exists(requestId);}private boolean updateCounter(String videoId) {// 使用 JdbcTemplate 和 MySQL 更新计数器String query = "SELECT count FROM video_counter WHERE video_id = ?";Integer count = jdbcTemplate.queryForObject(query, new Object[]{videoId}, Integer.class);if (count == null) {// 如果视频尚未有计数,初始化为 1jdbcTemplate.update("INSERT INTO video_counter(video_id, count) VALUES(?, 1)", videoId);} else {// 如果视频已有计数,加 1jdbcTemplate.update("UPDATE video_counter SET count = ? WHERE video_id = ?", count + 1, videoId);}return true;}
}

思路解析

幂等性检查: 使用 Redis 进行了幂等性检查。如果请求已经被处理过(即在 Redis 中有记录),我们就直接返回。

数据库更新: 我们使用 Spring 的 JdbcTemplate 来与 MySQL 进行交互。如果这是一个全新的 video_id,我们将其添加到数据库并初始化计数为 1;否则,我们找到当前计数并加 1。

记录处理过的请求: 最后,我们将处理过的 request_id 添加到 Redis 中,以便进行未来的幂等性检查。

这样,我们就得到了一个高并发、幂等的计数器服务。

优化问题

如果两个重复的请求,进入到Step1 都没有查到,不是就都进入step2

解决方法

如果两个相同的 request_id 几乎同时到达,并且都通过了 Step 1 的幂等性检查,那么它们都会进入 Step 2,从而违反了我们想要的幂等性。

在不使用 Redis 或其他外部锁服务的情况下,这个问题变得更加复杂。一种可能的解决方案是使用数据库的原子操作和唯一约束来确保幂等性。具体来说,您可以使用数据库事务来解决这个问题。

解决方案一(不使用Redis):

  1. 创建一个新表(比如叫做 processed_requests)用于存储已经处理过的 request_id。该表有一个唯一约束在 request_id 上。

  2. 在处理一个新请求之前,尝试将 request_id 插入到 processed_requests 表中。如果插入成功,说明这是第一次处理该请求,然后再去更新视频的播放计数。如果插入失败(因为唯一约束),则说明该请求已经被处理过,直接返回。

  3. 这一切都应在一个数据库事务中完成,以确保操作的原子性。

这里是更新后的 Java 代码片段:

import org.springframework.transaction.annotation.Transactional;// ...@RestController
public class CounterController {@Autowiredprivate JdbcTemplate jdbcTemplate;@Transactional@GetMapping("/inc")public String inc(@RequestParam String requestId, @RequestParam String videoId) {// Step 1: 幂等性检查和插入try {jdbcTemplate.update("INSERT INTO processed_requests(request_id) VALUES(?)", requestId);} catch (DuplicateKeyException e) {return "Already Processed";}// Step 2: 更新数据库updateCounter(videoId);return "OK";}private void updateCounter(String videoId) {String query = "SELECT count FROM video_counter WHERE video_id = ?";Integer count = jdbcTemplate.queryForObject(query, new Object[]{videoId}, Integer.class);if (count == null) {jdbcTemplate.update("INSERT INTO video_counter(video_id, count) VALUES(?, 1)", videoId);} else {jdbcTemplate.update("UPDATE video_counter SET count = ? WHERE video_id = ?", count + 1, videoId);}}
}

这样,在并发环境中,由于数据库的唯一约束和事务,我们可以确保每个 request_id 只会被处理一次。这解决了您提到的问题。

方案二: 使用Redis

如果使用 Redis,一种常见的方法是利用 Redis 的原子操作来解决这个问题。具体来说,可以使用 SETNX (SET if Not eXists)命令来设置 request_id 的值,并检查操作是否成功。这个命令只有在键不存在时才会设置值,并返回一个标识(通常是 1 表示成功,0 表示失败)。因为 Redis 的这个操作是原子性的,所以即使有多个相同的请求几乎同时到达,也只有一个能够成功设置该 request_id

下面是 Java 代码中这部分逻辑的示例:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import redis.clients.jedis.Jedis;@RestController
public class CounterController {@Autowiredprivate Jedis jedis;@Autowiredprivate JdbcTemplate jdbcTemplate;@GetMapping("/inc")public String inc(@RequestParam String requestId, @RequestParam String videoId) {// Step 1: 幂等性检查if (!setIfNotExists(requestId)) {return "Already Processed";}// Step 2: 更新数据库updateCounter(videoId);return "OK";}private boolean setIfNotExists(String requestId) {return jedis.setnx(requestId, "true") == 1;}private void updateCounter(String videoId) {String query = "SELECT count FROM video_counter WHERE video_id = ?";Integer count = jdbcTemplate.queryForObject(query, new Object[]{videoId}, Integer.class);if (count == null) {jdbcTemplate.update("INSERT INTO video_counter(video_id, count) VALUES(?, 1)", videoId);} else {jdbcTemplate.update("UPDATE video_counter SET count = ? WHERE video_id = ?", count + 1, videoId);}}
}

这里,setIfNotExists 方法使用 Redis 的 SETNX 命令来尝试设置一个 request_id。如果设置成功(返回 1),则继续更新数据库。如果设置失败(返回 0,表示该 request_id 已经存在),则直接返回 “Already Processed”,实现幂等性。

这样,在高并发环境下,即使多个相同的 request_id 几乎同时到达,也只有一个会被实际处理,其他的都会在 setIfNotExists 阶段被拦截,从而确保了幂等性。

Python 代码示例

from flask import Flask, request
import redis
import mysql.connectorapp = Flask(__name__)
r = redis.Redis(host='localhost', port=6379, db=0)
conn = mysql.connector.connect(user='root', password='password', database='test')@app.route('/inc', methods=['POST'])
def inc():request_id = request.form['request_id']video_id = request.form['video_id']if r.exists(request_id):return 'Already Processed'r.set(request_id, 1)cursor = conn.cursor()# 更新 MySQL# 省略具体实现return 'OK'

拓展:

面试官提问:1.我就是同一个时间段大量的请求到来了怎么办?
2.还有这个合适的过期时间怎么合适

1. 大量请求同时到来

当大量请求在同一时间段到来时,一般有以下几种应对方式:

扩展性:

  1. 横向扩展(水平扩展): 通过增加更多的服务器来分摊负载。
  2. 垂直扩展(纵向扩展): 通过增加单个服务器的计算资源(例如,CPU、RAM)。

限流:

  1. 令牌桶或漏桶算法: 控制进入到系统的请求速率。
  2. 优先级队列: 为不同类型的请求设置不同的优先级。

缓存:

  1. 读缓存: 对常读不变或少变的数据进行缓存。
  2. 写缓存(Write-Through、Write-Back): 缓存写操作,批量或异步地更新到数据库。

异步处理:

  1. 消息队列: 使用如 RabbitMQ、Kafka 等消息队列服务将处理请求和更新数据库的操作解耦。

2. 合适的过期时间

设置合适的过期时间主要依赖于业务需求和系统资源。以下几点可能有助于确定一个合适的过期时间:

  1. 业务逻辑: 如果请求是短暂的(比如秒杀活动),那么短的过期时间可能更合适。

  2. 资源限制: 如果 Redis 或其他缓存资源有限,较短的过期时间可以更快地回收资源。

  3. 数据一致性: 长的过期时间可能导致数据不一致,短的过期时间虽然可以减少这种风险,但会增加系统的复杂性。

  4. 网络延迟和重试策略: 如果网络延迟和重试可能导致长时间的重复请求,应设置相对长一些的过期时间。

综合以上因素,过期时间的设置通常需要在性能和准确性之间找到一个平衡点。这通常需要通过实验和性能测试来不断调整和优化。

如果不用redis呢?

如果不使用 Redis,我们有几种其他方法也可以实现高并发和幂等性。

1. 数据库唯一索引

可以在数据库中为 request_id 设置唯一索引。当重复请求到来时,尝试插入相同的 request_id 将会触发数据库错误,从而达到幂等性的目的。

private boolean isProcessed(String requestId) {try {jdbcTemplate.update("INSERT INTO request_ids(request_id) VALUES(?)", requestId);} catch (DuplicateKeyException e) {return true;}return false;
}

2. 应用内存

在应用级别维护一个已处理 request_id 的集合。这样不需要外部存储,但这种方式不适合分布式系统,因为每个实例都有自己的内存。

private Set<String> processedRequestIds = Collections.synchronizedSet(new HashSet<>());private boolean isProcessed(String requestId) {if (processedRequestIds.contains(requestId)) {return true;}processedRequestIds.add(requestId);return false;
}

3. 分布式锁

使用数据库或者其他分布式锁机制确保同一时间只有一个请求在处理。这种方法可以确保幂等性但可能影响系统性能。

private boolean isProcessed(String requestId) {if (acquireLock(requestId)) {// 进行处理releaseLock(requestId);return false;}return true;
}

4. 消息队列

使用消息队列(如 Kafka、RabbitMQ 等)确保消息的幂等处理。大多数现代消息队列都提供了这种机制。

5. 文件系统

在文件系统中为每个 request_id 创建一个唯一文件。通过检查文件存在与否,来确定是否处理了该请求。但这种方法不适合高并发场景。

每种方法都有其优点和局限性。选择哪种方法取决于具体需求、可用资源以及你愿意接受的复杂性。

总结

通过使用 Redis 进行幂等性检查,以及使用 MySQL 进行持久化存储,我们成功地实现了一个高并发、幂等的计数器服务。这种设计能够在高并发条件下保证 at least once 语义,同时也实现了幂等性。

该设计还有进一步优化和扩展的空间,例如,可以加入更多的负载均衡和高可用性特性,或者使用消息队列来进一步解耦生产者和消费者。

感谢您的阅读,希望本文能为您提供有用的信息和启示。如有任何问题或建议,请随时留言。


作者: [猫头虎]
发布时间: [2023.08.30]

在这里插入图片描述

原创声明

======= ·

  • 原创作者: 猫头虎

作者wx: [ libin9iOak ]

学习复习

本文为原创文章,版权归作者所有。未经许可,禁止转载、复制或引用。

作者保证信息真实可靠,但不对准确性和完整性承担责任

未经许可,禁止商业用途。

如有疑问或建议,请联系作者。

感谢您的支持与尊重。

点击下方名片,加入IT技术核心学习团队。一起探索科技的未来,共同成长。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/94590.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PSP - 蛋白质结构预测 OpenFold Multimer 训练模型的数据加载

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/132597659 OpenFold Multimer 是基于深度学习的方法&#xff0c;预测蛋白质的多聚体结构和相互作用。利用大规模的蛋白质序列和结构数据&#xff…

【GUI开发】用python爬YouTube博主信息,并开发成exe软件

文章目录 一、背景介绍二、代码讲解2.1 爬虫2.2 tkinter界面2.3 存日志 三、软件演示视频四、说明 一、背景介绍 你好&#xff0c;我是马哥python说&#xff0c;一名10年程序猿。 最近我用python开发了一个GUI桌面软件&#xff0c;目的是爬取相关YouTube博主的各种信息&#…

腾讯云网站备案详细流程_审核时间说明

腾讯云网站备案流程先填写基础信息、主体信息和网站信息&#xff0c;然后提交备案后等待腾讯云初审&#xff0c;初审通过后进行短信核验&#xff0c;最后等待各省管局审核&#xff0c;前面腾讯云初审时间1到2天左右&#xff0c;最长时间是等待管局审核时间&#xff0c;网站备案…

Python Opencv实践 - 凸包检测(ConvexHull)

import cv2 as cv import numpy as np import matplotlib.pyplot as pltimg cv.imread("../SampleImages/stars.png") plt.imshow(img[:,:,::-1])img_contour img.copy() #得到灰度图做Canny边缘检测 img_gray cv.cvtColor(img_contour, cv.COLOR_BGR2GRAY) edges…

手写RPC框架--2.介绍Zookeeper

RPC框架-Gitee代码(麻烦点个Starred, 支持一下吧) RPC框架-GitHub代码(麻烦点个Starred, 支持一下吧) 该项目的RPC通信将采用NettyZookeeper&#xff0c;所以会在前两章介绍使用方法 介绍Zookeeper Zookeepera.概述1) 数据模型2) Watcher机制 b.安装和基本操作1) Java操作zooke…

前端Vue仿企查查 天眼查知识产权标准信息列表组件

引入Vue仿企查查天眼查知识产权标准信息列表组件 随着技术的不断发展&#xff0c;传统的开发方式使得系统的复杂度越来越高。在传统开发过程中&#xff0c;一个小小的改动或者一个小功能的增加可能会导致整体逻辑的修改&#xff0c;造成牵一发而动全身的情况。为了解决这个问题…

C# Solidworks二次开发:创建距离配合以及移动组件API详解

今天要讲的文章是关于如何创建距离配合和移动组件的API详解。 &#xff08;1&#xff09;创建配合API&#xff0c;CreateMate() 这个API的解释是根据指定的特性数据对象来创建配合&#xff0c;也就可以理解为输入什么样的特征对象就可以创建出什么配合&#xff0c;这个API的输…

async的用法

有以下几种形式 //从c11到c17有 template< class Function, class... Args > std::future<typename std::result_of<typename std::decay<Function>::type(typename std::decay<Args>::type...)>::type>async( Function&& f, Args&…

VBA:对Excel单元格进行合并操作

Sub hb()Dim nn 3For i 3 To 18If Range("b" & i) <> Range("b" & i 1) ThenRange("b" & n & ":b" & i).Mergen i 1End IfNextEnd Sub

JavaScript -【第一周】

文章来源于网上收集和自己原创&#xff0c;若侵害到您的权利&#xff0c;请您及时联系并删除~~~ JavaScript 介绍 变量、常量、数据类型、运算符等基础概念 能够实现数据类型的转换&#xff0c;结合四则运算体会如何编程。 体会现实世界中的事物与计算机的关系理解什么是数据并…

SpringCloudGateway集成SpringDoc

SpringCloudGateway集成SpringDoc 最近在搞Spring版本升级&#xff0c;按客户要求升级Spring版本&#xff0c;原来用着SpringBoot 2.2.X版本&#xff0c;只需要升级SpringBoot 2.X最新版本也就可以满足客户Spring版本安全要求&#xff0c;可是好像最新的SpringBoot 2.X貌似也不…

Empowering Long-tail Item Recommendation through Cross Decoupling Network (CDN)

Empowering Long-tail Item Recommendation through Cross Decoupling Network (CDN) 来源&#xff1a; KDD’2023Google Research 文章目录 Empowering Long-tail Item Recommendation through Cross Decoupling Network (CDN)长尾问题分析CDNItem Memorization and General…