服务容错之限流之 Tomcat 限流 Tomcat 线程池的拒绝策略

在文章开头,先和大家抛出两个问题:

  1. 每次提到服务限流为什么都不考虑基于 Tomcat 来做呢?
  2. 大家有遇到过 Tomcat 线程池触发了拒绝策略吗?

JUC 线程池

在谈 Tomcat 的线程池前,先看一下 JUC 中线程池的执行流程,这里使用《Java 并发编程的艺术》中的一张图:
在这里插入图片描述

即执行流程为:

  1. 收到提交任务
  2. 当前线程数小于核心线程数,创建一个新的线程来执行任务
  3. 当前线程数大于等于核心线程数,
    • 如果阻塞队列未满,将任务存储到队列
    • 如果阻塞队列已满
      • 如果当前线程数小于最大线程数,则创建一个线程来执行新提交的任务
      • 如果当前线程数大于等于最大线程数,执行拒绝策略

可以看到设计思想是任务可以等待执行,但要尽量少的创造过多线程。如果队列很大,则很难扩大到最大线程数,同时会有大量的任务等待。

Tomcat 线程池分析

Tomcat 线程池是在 LifeCycle 中创建的。跳过前面繁琐的流程,直接看 org.apache.tomcat.util.net.NioEndpoint#startInternal

    /*** Start the NIO endpoint, creating acceptor, poller threads.*/@Overridepublic void startInternal() throws Exception {if (!running) {running = true;paused = false;processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,socketProperties.getProcessorCache());eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,socketProperties.getEventCache());nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,socketProperties.getBufferPool());// Create worker collectionif ( getExecutor() == null ) {createExecutor();}initializeConnectionLatch();// Start poller threadspollers = new Poller[getPollerThreadCount()];for (int i=0; i<pollers.length; i++) {pollers[i] = new Poller();Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);pollerThread.setPriority(threadPriority);pollerThread.setDaemon(true);pollerThread.start();}startAcceptorThreads();}}

再看 org.apache.tomcat.util.net.AbstractEndpoint#createExecutor

    public void createExecutor() {internalExecutor = true;TaskQueue taskqueue = new TaskQueue(); //无界队列TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);taskqueue.setParent( (ThreadPoolExecutor) executor);}

要注意这里的 ThreadPoolExecutor 不是 JUC 里面的 java.util.concurrent.ThreadPoolExecutor,而是 Tomcat 的 org.apache.tomcat.util.threads.ThreadPoolExecutor,它继承了 JUC 的 java.util.concurrent.ThreadPoolExecutor

public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {...
}

查看它的构造方法:

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());//提前启动核心线程prestartAllCoreThreads();}

可以发现它在构造的时候就会启动核心线程,而 java.util.concurrent.ThreadPoolExecutor 则是需要手动启动。而阻塞队列使用是 org.apache.tomcat.util.threads.TaskQueue

public class TaskQueue extends LinkedBlockingQueue<Runnable> {private static final long serialVersionUID = 1L;private volatile ThreadPoolExecutor parent = null;// No need to be volatile. This is written and read in a single thread// (when stopping a context and firing the  listeners)private Integer forcedRemainingCapacity = null;public TaskQueue() {super();}...
}

而在创建 org.apache.tomcat.util.threads.TaskQueue 的时候,并没有传递 capacity,也就是说 Tomcat 的线程池使用的是无界队列。

接下来看一下最核心的org.apache.tomcat.util.threads.ThreadPoolExecutor#execute(java.lang.Runnable)

/*** {@inheritDoc}*/@Overridepublic void execute(Runnable command) {//重载 java.util.concurrent.ThreadPoolExecutor#executeexecute(command,0,TimeUnit.MILLISECONDS);}public void execute(Runnable command, long timeout, TimeUnit unit) {submittedCount.incrementAndGet();try {super.execute(command);} catch (RejectedExecutionException rx) {if (super.getQueue() instanceof TaskQueue) {final TaskQueue queue = (TaskQueue)super.getQueue();try {if (!queue.force(command, timeout, unit)) {submittedCount.decrementAndGet();throw new RejectedExecutionException("Queue capacity is full.");}} catch (InterruptedException x) {submittedCount.decrementAndGet();throw new RejectedExecutionException(x);}} else {submittedCount.decrementAndGet();throw rx;}}}

本质上还是执行的 java.util.concurrent.ThreadPoolExecutor#execute 方法:

    public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// Tomcat 中这块逻辑不会执行,因为构造时已经初始化了核心线程if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}//强制入队public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected}

这里的 workQueueorg.apache.tomcat.util.threads.TaskQueueorg.apache.tomcat.util.threads.TaskQueue#offer

    @Overridepublic boolean offer(Runnable o) {//we can't do any checksif (parent==null) return super.offer(o);//we are maxed out on threads, simply queue the object//当前线程数达到最大,任务入队if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);//we have idle threads, just add it to the queue//如果已提交未执行完的任务数小于当前线程数(来了任务先+1,再入队,执行完才-1,说明还有空闲的worker线程),任务入队if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);//if we have less threads than maximum force creation of a new thread// 如果当前线程数小于最大线程数量,则直接返回false,java.util.concurrent.ThreadPoolExecutor#execute 会创建新的线程来执行任务if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;//if we reached here, we need to add it to the queue//任务入队(当前线程数大于最大线程数)return super.offer(o);}

再看下拒绝策略,结合 java.util.concurrent.ThreadPoolExecutor#execute 方法,需要 java.util.concurrent.ThreadPoolExecutor#addWorker 返回 false 才会触发,即达到了最大线程数才会触发,而 org.apache.tomcat.util.threads.ThreadPoolExecutor#execute(java.lang.Runnable) 在触发了拒绝策略后还有一个特殊处理:

					//如果是 TaskQueueif (super.getQueue() instanceof TaskQueue) {final TaskQueue queue = (TaskQueue)super.getQueue();try {//强制入队if (!queue.force(command, timeout, unit)) {submittedCount.decrementAndGet();throw new RejectedExecutionException("Queue capacity is full.");}} catch (InterruptedException x) {submittedCount.decrementAndGet();throw new RejectedExecutionException(x);}} else { //非 TaskQueue 直接触发拒绝策略submittedCount.decrementAndGet();throw rx;}

再看 org.apache.tomcat.util.threads.TaskQueue#force(java.lang.Runnable, long, java.util.concurrent.TimeUnit)

    public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if ( parent==null || parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected}

说白了就是直接入队(无界队列):

    public boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException {if (e == null) throw new NullPointerException();long nanos = unit.toNanos(timeout);int c = -1;final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) { //capacity是Integer最大值if (nanos <= 0)return false;nanos = notFull.awaitNanos(nanos);}enqueue(new Node<E>(e));c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return true;}

这么看,Tomcat 的线程池基本上不会触发拒绝策略。可以写个例子试一下:

package blog.dongguabai.others.tomcat_threadpool;import org.apache.tomcat.util.threads.TaskQueue;
import org.apache.tomcat.util.threads.TaskThreadFactory;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;import java.util.Date;
import java.util.concurrent.TimeUnit;/*** @author dongguabai* @date 2023-11-18 22:04*/
public class Demo {public static void main(String[] args) {//无界队列TaskQueue taskqueue = new TaskQueue();TaskThreadFactory tf = new TaskThreadFactory("dongguabai_blog" + "-exec-", false, 2);final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, taskqueue, tf);taskqueue.setParent(executor);observe(executor);while (true) {executor.execute(new Runnable() {public void run() {excuteForever();}});}}private static void observe(final ThreadPoolExecutor executor) {Runnable task = new Runnable() {public void run() {while (true) {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(new Date().toLocaleString() + "->" + executor.getQueue().size());}}};new Thread(task).start();}public static void excuteForever() {while (true) {}}
}

输出:

2023-11-18 22:18:27->6541506
2023-11-18 22:18:34->14395417
2023-11-18 22:18:37->25708908
2023-11-18 22:18:50->32014458
2023-11-18 22:19:07->47236736
2023-11-18 22:19:10->65616058
2023-11-18 22:19:32->66856933
...

可以看到,队列里的任务都有六千多万了,还没有触发拒绝策略,线程池还是可以继续接收任务。

当然我们也是可以自定义的,只需要重写 org.apache.tomcat.util.net.AbstractEndpoint#getExecutor 即可:

    public Executor getExecutor() { return executor; }

org.apache.tomcat.util.net.NioEndpoint#startInternal 会进行判断:

@Overridepublic void startInternal() throws Exception {if (!running) {...if ( getExecutor() == null ) {createExecutor(); //如果没有自定义实现,就会使用默认实现}}...}

Tomcat 默认线程池优先创建线程执行任务,达到了最大线程数,不会直接执行拒绝策略,而是尝试返回等待队列,但由于等待队列的容量是 Integer 最大值,所以几乎不会触发拒绝策略。

最后

最后再回过头看文章开头的两个问题:

  1. 每次提到服务限流为什么都不考虑基于 Tomcat 来做呢?

    Tomcat 的确可以用来做限流,比如可以控制最大线程数,这样后续的任务均会在队列等待,并不会执行。org.apache.tomcat.util.net.AbstractEndpoint#setMaxConnectionsConnector 的角度设置,这块不在本文探讨范围之内。

    虽然基于 Tomcat 的限流是一种可能的方案,但在实际应用中,我们通常会选择其他的层次来实现服务限流:

    • 可扩展性:基于 Tomcat 的限流方案通常只能在单个服务实例上工作,且只能针对HTTP/HTTPS协议的请。而在微服务或者分布式系统中,我们可能需要分布式限流方案和针对多协议的 限流。
    • 灵活性:在应用层或者分布式系统层实现的限流方案通常可以提供更多的配置选项和更精细的控制。例如,请求的资源、来源或者其他属性来进行限流。
  2. 大家有遇到过 Tomcat 线程池触发了拒绝策略吗?

    Tomcat 默认无限队列,难以触发拒绝策略,所以会有内存泄漏的风险。可以基于 org.apache.tomcat.util.net.AbstractEndpoint#getExecutor 自定义线程池进行控制。

References

  • 《Java 并发编程的艺术》

欢迎关注公众号:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/192341.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Linux】进程间通信 -- 共享内存

共享内存 共享内存是SystemV标准进程间通信的一种&#xff0c;该标准还有消息队列和信号量&#xff0c;但下文主要介绍共享内存&#xff0c;然后在谈一下信号量的内容。SystemV标准的进程间通信可以看做单机版的进程间通信。 // 1. log.hpp #pragma once#include <iostrea…

Java(一)(引用类型的参数在传递,方法重载,面向对象编程基础)

基本类型和引用类型的参数在传递的时候有什么不同? 基本类型的值传递:参数传输存储的数据值 引用类型的值传递:参数传输存储的地址值 传递数组名字的时候,传递的是数组的地址,change方法可以通过地址直接访问我们在堆内存中开辟的数组,然后改变数组,数组中的元素发生变化 方…

K-Means算法进行分类

已知数据集D中有9个数据点&#xff0c;分别是&#xff08;1,2&#xff09;&#xff0c;(2,3), (2,1), (3,1),(2,4),(3,5),(4,3),(1,5),(4,2)。采用K-Means算法进行聚类&#xff0c;k2&#xff0c;设初始中心点为&#xff08;1.1,2.2&#xff09;&#xff0c;&#xff08;2.3,3.…

在Rust编程中使用泛型

1.摘要 Rust中的泛型可以让我们为像函数签名或结构体这样的项创建定义, 这样它们就可以用于多种不同的具体数据类型。下面的内容将涉及泛型定义函数、结构体、枚举和方法, 还将讨论泛型如何影响代码性能。 2.在函数定义中使用泛型 当使用泛型定义函数时&#xff0c;本来在函…

竞赛选题 疲劳驾驶检测系统 python

文章目录 0 前言1 课题背景2 Dlib人脸识别2.1 简介2.2 Dlib优点2.3 相关代码2.4 人脸数据库2.5 人脸录入加识别效果 3 疲劳检测算法3.1 眼睛检测算法3.2 打哈欠检测算法3.3 点头检测算法 4 PyQt54.1 简介4.2相关界面代码 5 最后 0 前言 &#x1f525; 优质竞赛项目系列&#x…

基于Adapter用CLIP进行Few-shot Image Classification

文章目录 【ECCV 2022】《Tip-Adapter: Training-free Adaption of CLIP for Few-shot Classification》【NeuIPS 2023】《Meta-Adapter: An Online Few-shot Learner for Vision-Language Model》 【ECCV 2022】《Tip-Adapter: Training-free Adaption of CLIP for Few-shot C…

使用Redis实现分布式锁

Hi, I’m Shendi 使用Redis实现分布式锁 需求场景 需要使用到分布式锁的场景非常多&#xff0c;例如抢单等并发场景&#xff0c;这里举一个例子。 有一个商品&#xff0c;限量出售100个&#xff0c;一个用户下单&#xff0c;数量就减少一个&#xff0c;当剩下最后一个时&…

Unity在Windows选项下没有Auto Streaming

Unity在Windows选项下没有Auto Streaming Unity Auto Streaming插件按网上说的不太好使最终解决方案 Unity Auto Streaming插件 我用的版本是个人版免费版&#xff0c;版本号是&#xff1a;2021.2.5f1c1&#xff0c;我的里边Windows下看不到Auto Streaming选项,就像下边这张图…

基于猕猴感觉运动皮层Spike信号的运动解码分析不同运动参数对解码的影响

公开数据集中文版详细描述参考前文&#xff1a;https://editor.csdn.net/md/?not_checkout1&spm1011.2124.3001.6192神经元Spike信号分析参考前文&#xff1a;https://blog.csdn.net/qq_43811536/article/details/134359566?spm1001.2014.3001.5501神经元运动调制分析参考…

CUDA编程一、基本概念和cuda向量加法

目录 一、cuda编程的基本概念入门 1、GPU架构和存储结构 2、cuda编程模型 3、cuda编程流程 二、cuda向量加法实践 1、代码实现 2、代码运行和结果 有一段时间对模型加速比较感兴趣&#xff0c;其中的一块儿内容就是使用C和cuda算子优化之类一起给模型推理提速。之前一直…

HAL库STM32串口开启DMA接收数据

STM32CubeMx的配置 此博客仅仅作为记录&#xff0c;这个像是有bug一样&#xff0c;有时候好使&#xff0c;有时候不好&#xff0c;所以趁现在好使赶紧记录一下&#xff0c;很多地方用到串口接收数据&#xff0c;DMA又是一种非常好的接收方式&#xff0c;可以节约CPU的时间&…

Unity - Cinemachine

动态获取Cinemachine的内部组件 vCam.GetCinemachineComponent<T>() 动态修改Cinemachine的Transposer属性 var vCamComp transfrom.GetComponent<CinemachineVirtualCamera>(); var transposerComp vCamComp.GetCinemachineComponent<CinemachineTransposer&…