【Spring Boot应用】Spring Boot + Bucket4j 实现API请求限流

news/2025/1/26 14:38:48/文章来源:https://www.cnblogs.com/o-O-oO/p/18691776

保护你的 API 免受滥用至关重要。速率限制是 API 安全的关键。它可以防止拒绝服务攻击、管理资源并确保客户端之间的公平使用。Spring Boot 3 和 Bucket4j 结合提供了一个强大且灵活的方式来为你的应用程序添加速率限制。

在本文中,我们将探讨如何在 Spring Boot 3 应用程序中使用 Bucket4j 开发速率限制功能。我们将介绍不同的方法,并提供实用的示例,供你根据需求进行调整。

先决条件

在开始之前,请确保你具备以下条件:

• Java 17 或更高版本。

• 对 Java、Spring Boot 和 API 开发有基本了解。

实现

第一步是将所需的依赖项添加到你的 pom.xml 或 build.gradle 中。

<dependency><groupId>com.bucket4j</groupId><artifactId>bucket4j-core</artifactId><version>8.3.0</version>
</dependency>
<dependency><groupId>com.bucket4j</groupId><artifactId>bucket4j-caffeine</artifactId><version>8.3.0</version>
</dependency>
<dependency><groupId>com.github.ben-manes.caffeine</groupId><artifactId>caffeine</artifactId><version>3.1.8</version>
</dependency>

我们不会直接跳到最终代码,而是逐步构建速率限制功能。让我们从创建一个基本的 REST 控制器开始。

@RestController
@RequestMapping("/api")
public class RateLimitedController {@GetMapping("/greeting")public String getGreeting() {return "Hello, World!";}
}

接下来,我们需要配置速率限制。

@Configuration
public class RateLimitConfig {@Beanpublic Bucket createNewBucket() {long overdraft = 50;Refill refill = Refill.intervally(40, Duration.ofMinutes(1));Bandwidth limit = Bandwidth.classic(overdraft, refill);return Bucket.builder().addLimit(limit).build();}
}

现在,我们需要设置一个速率限制拦截器。

@Component
@RequiredArgsConstructor
public class RateLimitInterceptor implements HandlerInterceptor {private final Bucket bucket;@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);if (probe.isConsumed()) {response.addHeader("X-Rate-Limit-Remaining", String.valueOf(probe.getRemainingTokens()));return true;}long waitForRefill = probe.getNanosToWaitForRefill() / 1_000_000_000;response.addHeader("X-Rate-Limit-Retry-After-Seconds", String.valueOf(waitForRefill));response.sendError(HttpStatus.TOO_MANY_REQUESTS.value(),"You have exhausted your API Request Quota");return false;}
}

目前,我们还没有注册我们的拦截器,让我们来解决这个问题。

@Configuration
public class WebMvcConfig implements WebMvcConfigurer {private final RateLimitInterceptor interceptor;public WebMvcConfig(RateLimitInterceptor interceptor) {this.interceptor = interceptor;}@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(interceptor).addPathPatterns("/api/**");}
}

我们已经实现了一个基本的速率限制器。这个基本版本并不适合实际生产环境。

IP 基础的速率限制

IP 基础的速率限制更接近实际生产场景。IP 限制提供了更细粒度的控制。

@Component
public class IpBasedRateLimitInterceptor implements HandlerInterceptor {private final Cache<String, Bucket> cache;public IpBasedRateLimitInterceptor() {this.cache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.SECONDS).build();}@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {String ip = getClientIP(request);Bucket bucket = cache.get(ip, this::newBucket);ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);if (probe.isConsumed()) {response.addHeader("X-Rate-Limit-Remaining", String.valueOf(probe.getRemainingTokens()));return true;}long waitForRefill = probe.getNanosToWaitForRefill() / 1_000_000_000;response.addHeader("X-Rate-Limit-Retry-After-Seconds", String.valueOf(waitForRefill));response.sendError(HttpStatus.TOO_MANY_REQUESTS.value(),"Rate limit exceeded. Try again in " + waitForRefill + " seconds");return false;}private String getClientIP(HttpServletRequest request) {String xfHeader = request.getHeader("X-Forwarded-For");if (xfHeader == null) {return request.getRemoteAddr();}return xfHeader.split(",")[0];}private Bucket newBucket(String ip) {return Bucket.builder().addLimit(Bandwidth.classic(10, Refill.intervally(10, Duration.ofMinutes(1)))).build();}
}

当然,我们需要单元测试来验证我们的实现是否有效。

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class RateLimitedControllerTest {@LocalServerPortprivate int port;@Autowiredprivate TestRestTemplate restTemplate;@Testvoid whenExceedingRateLimit_thenReceive429() {String url = "http://localhost:" + port + "/api/greeting";// 发送 10 个请求(超过我们的限制 9 次)for (int i = 0; i < 10; i++) {ResponseEntity<String> response = restTemplate.getForEntity(url, String.class);if (i < 10) {assertEquals(HttpStatus.OK, response.getStatusCode());} else {assertEquals(HttpStatus.TOO_MANY_REQUESTS, response.getStatusCode());}}}
}

基于系统负载的动态速率限制

最后但同样重要的是,让我们再构建一个速率限制器。这个速率限制器将根据应用程序的负载来限制请求。

@Slf4j
@Component
public class SystemMetricsCollector {private final OperatingSystemMXBean osBean;public SystemMetricsCollector() {this.osBean = ManagementFactory.getOperatingSystemMXBean();}public SystemMetrics collectMetrics() {double cpuLoad = getProcessCpuLoad();long freeMemory = Runtime.getRuntime().freeMemory();long totalMemory = Runtime.getRuntime().totalMemory();double memoryUsage = 1.0 - (double) freeMemory / totalMemory;return new SystemMetrics(cpuLoad, memoryUsage);}private double getProcessCpuLoad() {if (osBean instanceof com.sun.management.OperatingSystemMXBean) {return ((com.sun.management.OperatingSystemMXBean) osBean).getProcessCpuLoad();}return osBean.getSystemLoadAverage();}
}

以及:

@Data
@AllArgsConstructor
public class SystemMetrics {private double cpuLoad;private double memoryUsage;
}

然后,我们需要创建速率限制的计算组件。

@Component
@Slf4j
public class DynamicRateLimitCalculator {private static final int BASE_LIMIT = 100;private static final double CPU_THRESHOLD_HIGH = 0.8;private static final double CPU_THRESHOLD_MEDIUM = 0.5;private static final double MEMORY_THRESHOLD_HIGH = 0.8;private static final double MEMORY_THRESHOLD_MEDIUM = 0.5;public RateLimitConfig calculateLimit(SystemMetrics metrics) {int limit = BASE_LIMIT;// 根据 CPU 负载调整限制limit = adjustLimitBasedOnCpu(limit, metrics.getCpuLoad());// 根据内存使用率调整限制limit = adjustLimitBasedOnMemory(limit, metrics.getMemoryUsage());Duration refillDuration = calculateRefillDuration(metrics);log.debug("Calculated rate limit: {}/{}s", limit, refillDuration.getSeconds());return new RateLimitConfig(limit, refillDuration);}private int adjustLimitBasedOnCpu(int currentLimit, double cpuLoad) {if (cpuLoad > CPU_THRESHOLD_HIGH) {return (int) (currentLimit * 0.3); // 严重减少} else if (cpuLoad > CPU_THRESHOLD_MEDIUM) {return (int) (currentLimit * 0.6); // 适度减少}return currentLimit;}private int adjustLimitBasedOnMemory(int currentLimit, double memoryUsage) {if (memoryUsage > MEMORY_THRESHOLD_HIGH) {return (int) (currentLimit * 0.4);} else if (memoryUsage > MEMORY_THRESHOLD_MEDIUM) {return (int) (currentLimit * 0.7);}return currentLimit;}private Duration calculateRefillDuration(SystemMetrics metrics) {double maxLoad = Math.max(metrics.getCpuLoad(), metrics.getMemoryUsage());if (maxLoad > 0.8) {return Duration.ofMinutes(2);} else if (maxLoad > 0.5) {return Duration.ofMinutes(1);}return Duration.ofSeconds(30);}
}@Data
@AllArgsConstructor
public class RateLimitConfig {private int limit;private Duration refillDuration;
}

让我们创建一个灵活的速率限制器,它将作为处理程序拦截器。

@Slf4j
@Component
public class DynamicRateLimitInterceptor implements HandlerInterceptor, RateLimitConfigProvider {private final Cache<String, Bucket> bucketCache;private final SystemMetricsCollector metricsCollector;private final DynamicRateLimitCalculator calculator;private final AtomicReference<RateLimitConfig> currentConfig;private final ScheduledExecutorService scheduler;private final RateLimitMetrics metrics;public DynamicRateLimitInterceptor(SystemMetricsCollector metricsCollector,DynamicRateLimitCalculator calculator, MeterRegistry meterRegistry) {this.metricsCollector = metricsCollector;this.calculator = calculator;this.currentConfig = new AtomicReference<>(new RateLimitConfig(100, Duration.ofMinutes(1)));this.bucketCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.HOURS).build();this.scheduler = Executors.newSingleThreadScheduledExecutor();this.metrics = new RateLimitMetrics(meterRegistry, this);startMetricsUpdateTask();}private void startMetricsUpdateTask() {scheduler.scheduleAtFixedRate(this::updateRateLimitConfig,0,10,TimeUnit.SECONDS);}private void updateRateLimitConfig() {try {SystemMetrics metrics = metricsCollector.collectMetrics();RateLimitConfig newConfig = calculator.calculateLimit(metrics);RateLimitConfig oldConfig = currentConfig.get();if (hasSignificantChange(oldConfig, newConfig)) {currentConfig.set(newConfig);log.info("Rate limit updated: {}/{}s",newConfig.getLimit(),newConfig.getRefillDuration().getSeconds());// Clear cache to force bucket recreation with new limitsbucketCache.invalidateAll();}} catch (Exception e) {log.error("Error updating rate limit config", e);}}private boolean hasSignificantChange(RateLimitConfig oldConfig,RateLimitConfig newConfig) {double limitChange = Math.abs(1.0 -(double) newConfig.getLimit() / oldConfig.getLimit());return limitChange > 0.2; // 20% change threshold}public RateLimitConfig getRateLimitConfig() {return this.currentConfig.get();}@Overridepublic boolean preHandle(HttpServletRequest request,HttpServletResponse response,Object handler) throws Exception {String path = request.getRequestURI();String method = request.getMethod();Timer.Sample timerSample = metrics.startTimer();boolean rateLimited = false;try {metrics.recordRequest();String clientId = getClientIdentifier(request);Bucket bucket = bucketCache.get(clientId, this::createBucket);ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);if (probe.isConsumed()) {addRateLimitHeaders(response, probe);return true;}metrics.incrementRateLimitExceeded();handleRateLimitExceeded(response, probe);return false;} finally {metrics.stopTimer(timerSample, path, method, rateLimited);}}private Bucket createBucket(String clientId) {RateLimitConfig config = currentConfig.get();return Bucket.builder().addLimit(Bandwidth.classic(config.getLimit(),Refill.intervally(config.getLimit(),config.getRefillDuration()))).build();}private String getClientIdentifier(HttpServletRequest request) {// Could combine multiple factors: IP, user ID, API key, etc.return request.getRemoteAddr();}private void addRateLimitHeaders(HttpServletResponse response,ConsumptionProbe probe) {RateLimitConfig config = currentConfig.get();response.addHeader("X-Rate-Limit-Limit",String.valueOf(config.getLimit()));response.addHeader("X-Rate-Limit-Remaining",String.valueOf(probe.getRemainingTokens()));response.addHeader("X-Rate-Limit-Reset",String.valueOf(probe.getNanosToWaitForRefill() /1_000_000_000));}private void handleRateLimitExceeded(HttpServletResponse response,ConsumptionProbe probe)throws IOException {response.setStatus(HttpStatus.TOO_MANY_REQUESTS.value());response.setContentType(MediaType.APPLICATION_JSON_VALUE);String errorMessage = String.format("Rate limit exceeded. Try again in %d seconds",probe.getNanosToWaitForRefill() / 1_000_000_000);response.getWriter().write(String.format("{\"error\": \"%s\", \"retryAfter\": %d}",errorMessage,probe.getNanosToWaitForRefill() / 1_000_000_000));}@PreDestroypublic void shutdown() {scheduler.shutdown();}
}

配置 Spring Boot 应用程序以使用速率限制器

现在我们需要配置 Spring Boot 应用程序以使用我们实现的速率限制器。

@Configuration
public class RateLimitConfig implements WebMvcConfigurer {@Autowiredprivate DynamicRateLimiter rateLimiter;@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(rateLimiter).addPathPatterns("/api/**");}
}

通过上述配置,我们将 DynamicRateLimiter 注册为一个拦截器,并将其应用于所有以 /api 开头的请求路径。

创建自定义指标以监控应用程序

为了跟踪性能、负载、内存消耗等应用程序的各个方面,创建自定义指标是一个好主意。
以下是实现自定义指标的代码:

public class RateLimitMetrics {private final MeterRegistry meterRegistry;private final Counter rateLimitExceeded;private final Counter requestsTotal;private final Gauge currentLimit;public RateLimitMetrics(MeterRegistry registry,RateLimitConfigProvider configProvider) {this.meterRegistry = registry;this.rateLimitExceeded = Counter.builder("rate_limit.exceeded").description("Number of rate limit exceeded events").tag("type", "exceeded").register(registry);this.requestsTotal = Counter.builder("rate_limit.requests").description("Total number of requests processed").tag("type", "total").register(registry);this.currentLimit = Gauge.builder("rate_limit.current",configProvider,this::getCurrentLimit).description("Current rate limit value").tag("type", "limit").register(registry);}public Timer.Sample startTimer() {return Timer.start();}public void stopTimer(Timer.Sample sample, String path, String method, boolean rateLimited) {Timer timer = Timer.builder("rate_limit.request.duration").description("Request duration through rate limiter").tags("path", path,"method", method,"rate_limited", String.valueOf(rateLimited),"component", "rate_limiter").register(meterRegistry);sample.stop(timer);}public void incrementRateLimitExceeded() {rateLimitExceeded.increment();}public void recordRequest() {requestsTotal.increment();}private double getCurrentLimit(RateLimitConfigProvider provider) {return provider.getRateLimitConfig().getLimit();}public Map<String, Number> getCurrentMetrics() {return Map.of("rateLimitExceeded", rateLimitExceeded.count(),"totalRequests", requestsTotal.count(),"currentLimit", currentLimit.value());}
}

通过上述代码,我们创建了以下指标:

1、 rate_limit.exceeded:记录速率限制被触发的次数。

2、 rate_limit.requests:记录处理的请求总数。

3、 rate_limit.current:显示当前的速率限制值。

最佳实践和注意事项

1、 缓存实现:在生产环境中,使用分布式缓存(如 Redis)来实现集群环境中的速率限制。

2、 响应头:始终在响应头中包含速率限制信息,以帮助客户端管理其请求速率。
常见的头信息包括:

• X-Rate-Limit-Remaining:剩余的请求次数。

• X-Rate-Limit-Retry-After-Seconds:需要等待的秒数。

3、 错误处理:当用户超出速率限制时,提供清晰的错误信息。

4、 监控:设置指标以跟踪速率限制事件,并根据使用模式调整限制。

结论

本文展示了如何在 Spring Boot 3 应用程序中使用 Bucket4j 实现速率限制。我们介绍了三种方法:基于时间、基于 IP 地址和基于系统负载的速率限制。实际场景可能与本文中的示例有所不同。

速率限制是 API 安全策略的一部分。通过将其与其他安全措施结合使用,你可以构建强大的 API 保护机制。

欢迎关注 SpringForAll社区(spring4all.com),专注分享关于Spring的一切!关注公众号:SpringForAll社区

原创 s4a SpringForAll社区

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/875967.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[日记]轻量回测框架 Backtesting.py 与 Streamlit集成

找到一个目前觉得比较轻量级的框架,结构简单易用,几行代码搞定即可搞定回测。 对于回测结果提供可视化的找到一个目前觉得比较轻量级的框架,结构简单易用,几行代码搞定即可搞定回测。 对于回测结果提供可视化的图表分析。如下图:同时提供常用的收益和风险指标数据作为量化…

【开源】Pi-hole控制面板:深入解析你的网络流量和广告拦截

今天要给大家介绍一个非常实用的开源项目——Pi-hole。这是一款基于树莓派的全能广告屏蔽助手,能够在不安装任何客户端软件的前提下为设备提供网络内容屏蔽服务,非常轻量易用。Pi-hole的主要功能: 1、 全网广告拦截: Pi-hole 充当 DNS 污水坑,阻止网络上所有设备上不需要的…

Rust多线程中安全的使用变量

在Rust语言中,一个既引人入胜又可能带来挑战的特性是闭包如何从其所在环境中捕获变量,尤其是在涉及多线程编程的情境下。 如果尝试在不使用move关键字的情况下创建新线程并传递数据至闭包内,编译器将很可能返回一系列与生命周期、借用规则及所有权相关的复杂错误信息。 不过…

ARC_069 D - Menagerie 题解

atcoder 一道很有意思的模拟题啊。 思路很重要。 首先,我们只要知道连续两只动物的身份,就可以根据 \(s\) 推出所有动物的身份。 不妨假设我们知道第一只和第二只动物的身份,一共有几种情况呢? 用 \(1\) 代表羊,\(0\) 代表狼。 那么,共有 \(2^2=4\) 种情况,分别为: 00 …

『学习笔记』二分算法

今天记录二分知识点。 二分是一个简单清晰,实用性强的算法。 也是本人最喜欢的算法之一。 先给出二分模板吧!int l = 1, r = n;//初始值,根据情况而定while (l + 1 < r) {int mid = (l + r) >> 1;if (check(mid)) l = mid;// check函数判断左半部分是否不符合,更新…

回家之难难于蜀道难

回家难 之难于蜀道难 (仿写李白蜀道难)噫吁嚱,困乎难乎,回家之途,难于上班路。 盘古及女娲,开天辟地捏人烟,尓来文明已万年,难解归家争吵事。 游子无钱难上路,漂留外地护空城。 千思万想定下来,踏上归途望团年。 上有爸妈在老家,下有孩童八九岁。 列车无票不得行,驱…

MAC|Edge——下载视频

解码错误解码错误指的是当前音/视频帧与浏览器不兼容,可以尝试以下方式:1.chrome/edge 浏览器打开chrome://flags,搜索 Hardware-accelerated video decode,选择 disabled2.如果解码错误仍然存在,请对视频进行转码处理,以修复问题帧3.firefox浏览器请打开about:support,…

stdio.h的缓冲机制解析

在C语言中,由于stdio.h中的缓冲机制,printf的输出常令人感到迷惑。本文将介绍其缓冲机制的具体细节1. 令人迷惑的printf() 在C语言中,由于stdio.h中的缓冲机制,printf的输出通常会受到缓冲区的影响。 这种影响可能非常微妙,并常常令人疑惑,比如我们来看下面这段代码 #inc…

【新能源行业】新能源汽车电子驻车制动系统(EPB)谁在做?

长期以来,汽车的动力系统一直是人们所关注的焦点,然而,汽车制动系统在背后默默支撑起整个汽车安全与稳定。其重要性丝毫不亚于动力系统。行车上路,安全第一。在每一次的启程与停驻之间,唯有制动系统作为坚实保障,才能让每一次出行都安心无虞。一、制动系统分类与组成 目前…

如何从内存中提取shellcode

恶意程序有时会直接在内存中运行shellcode 。在这篇文章中,我将向你展示如何从内存中获取shellcode。 shellcode在内存中的位置 在内存中分配shellcode的常用方法是使用VirtualAlloc来分配具有所需权~限的内存。然后恶意软件使用RtlMoveMemory将shellcode写入分配的空间。然后…

施耐德UNITY中使用ST 语言计算日均值

以前做过练习,在unity中计算分钟均值和小时均值,做成自定义功能块。今天在家打算按照同样的思路,试着做一下日均值。 第一次打算建立一个三维数组PV_DAY[0..23,0..59,0..59],每秒存放一个数据,编译的时候提示数组太大。 第二次尝试建立24个数组,每个数组存放一个小时内36…

【转载】rpm 和 yum 软件包的应用

本节所讲内容:8.1 使用rpm命令-安装-查看-卸载-rpm软件包8.2 yum管理软件包8.3 CentOS8中使用DNF管理软件包8.4 实战tar源码包管理-源码包安装方法8.1 软件包的管理软件包的类型rpm二进制包------》已经使用GCC编译后的(二进制已经可以被操作系统直接执行了)tar源码包-----》…