深入分析Java线程池——ThreadPoolExecutor

文章目录

  • Java 线程池概述
    • ThreadPoolExecutor 构造方法
    • 线程池拒绝策略
    • 工作流程
    • 并发库中的线程池
      • CachedThreadPool
      • FixedThreadPool
      • SingleThreadExecutor
      • ScheduledThreadPool
  • ThreadPoolExecutor 源码分析
    • 线程池状态表示
      • 获取 runState
      • 获取 workerCount
      • 生成 ctl
    • 提交任务 execute()
      • 为什么需要二次检查
      • 创建工作线程 addWorker()
    • 工作线程 Worker
      • 主逻辑 runWorker
      • 获取任务 getTask()
    • 工作线程的退出
      • RUNNING 状态所有任务执行完成
      • shutdown 关闭线程池
        • 所有线程等待新任务
        • 所有线程繁忙
        • 队列中剩余少量的任务
  • 写在最后

Java 线程池概述

Java 语言中创建线程看上去就像创建一个对象,仅仅调用 new Thread() 即可,但实际上创建线程比创建对象复杂得多。创建对象仅仅是在 JVM 的堆里分配一块内存,而创建线程,需要 调用操作系统内核的 API,然后操作系统要为线程分配一系列的资源,这个成本很高。因此,线程是一个重量级的对象,应该避免频繁的创建和销毁。


线程池没有采用一般的池化资源设计方法(例如:连接池、对象池),因为我们无法获取一个启动的 Thread 对象,然后动态地将需要执行的任务 Runnable task 提交给线程执行。目前业界地线程池设计,普遍采用生产者-消费者模型,线程池的使用方为 Producer,而线程池中的 工作线程为 Consumer

ThreadPoolExecutor 构造方法

Java 提供的线程池相关的工具类中,最核心的是ThreadPoolExecutor。ThreadPoolExecutor 的构造函数如下:

  • corePoolSize:线程池保有的最小线程数(核心线程数)。如果线程池中的线程数小于 corePoolSize,提交任务时会创建一个核心线程,该任务作为新创建的核心线程第一个执行的任务。
  • maximumPoolSize:最大线程数。如果提交任务时任务队列已经满了,且当前工作线程数小于 maximumPoolSize,会创建新的工作线程用于执行该任务;反之如果工作线程数大于等于 maximumPoolSize,则执行拒绝策略。
  • keepAliveTime & unit:一个线程如果在时间单位为 unit 的 keepAliveTime 时间内没有执行任务,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收。
  • workQueue:任务队列,为 BlockingQueue 实现类。
  • threadFactory:线程工厂,通过该参数可以自定义如何创建线程。ThreadFactory 是一个接口,里面是有一个 newThread 方法等待实现:
    Thread newThread(Runnable r);//接口方法默认为public abstract
    
  • handler:任务的拒绝策略,如果线程池中 所有的线程都在忙碌,且任务队列已经满了(前提是任务队列是有界队列),此时提交任务线程池会拒绝执行。决绝的策略可以通过该参数指定。

温馨提示:线程池的静态工厂类 Executors 提供了很多开箱即用的线程池,可以帮助快速创建线程池,但提供的线程池很多使用的是 无界队列 LinkedBlockingQueue,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理。
在阅读完本节后我们知道,在生产环境中使用线程池时需要设置 ThreadPoolExecutor 构造方法的 workQueue 参数为 ArrayBlockingQueue 等有界阻塞队列。


线程池拒绝策略

上一小节提到,构造方法中的 RejectedExecutionHandler handler 参数可以用于自定义任务拒绝策略。ThreadPoolExecutor 已经提供了 4 种拒绝策略:

  • CallerRunsPolicy:提交任务的线程自己去执行该任务。

  • AbortPolicy:默认的拒绝策略,会抛出 RejectedExecutionException。

  • DiscardPolicy:直接丢弃任务,没有任何异常抛出。

  • DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。


默认拒绝策略为 AbortPolicy,该拒绝策略抛出 RejectedExecutionException 为运行时异常,编译器不会强制 catch,开发人员可能会忽略,因此默认拒绝策略要慎重使用
如果线程池处理的任务非常重要,建议自定义自己的拒绝策略;并且在实际工作中, 自定义的拒绝策略往往和 降级策略 配合使用

例如:将任务信息插入数据库或者消息队列,配置 XXL-JOB 定时任务扫描失败任务表,将执行失败的任务交给专用于补偿的线程池去进行补偿


工作流程

线程池中有几个重要的概念:核心线程池(CorePool)、**空闲线程池(IdlePool)**以及 任务队列。下图为我绘制的线程池工作流程图,包含上述三个概念模型,cpSize 核心线程池中当前的线程数、cpCap 核心线程池容量、ipSize 空闲线程池中当前线程数。

请添加图片描述

我来简述下提交任务 task 时,线程池的执行流程:

  1. 如果核心线程池未满,即 cpSize 小于 cpCap,通过线程工厂 创建一个核心线程,将 task 作为新线程的第一个任务。

  2. 如果 核心线程池已满,但是任务队列仍然有空间,将 task 添加到任务队列。核心线程在执行完手头的任务后,会从任务队列中获取新的任务,继续执行。如果任务队列为空,核心线程会阻塞在任务获取阶段,直到有 新的任务提交到任务队列

  3. 如果任务队列已满,则创建空闲线程,并将 task 作为第一个执行的任务。空闲线程如果执行完手头的任务,也会从任务队列中获取新的任务。
    如果任务队列为空,空闲线程会阻塞,直到 超出 keepalive 设定的时间 或 获取到新的任务执行。如果等待新任务超时,空闲线程的生命周期就会结束了。

  4. 如果空闲线程数+核心线程数已经达到了 maximumPoolSize,创建新线程的方法会失败,此时提交的任务将被拒绝,拒绝策略由 RejectedHandler 负责执行。


并发库中的线程池

java.util.concurrent.Executors 提供了通用线程池创建方法,去创建不同配置的线程池,该工具类目前提供了五种不同的线程池创建配置:

CachedThreadPool

CachedThreadPool 是一种用来 处理大量短时间工作任务的线程池,会在先前构建的线程可用时重用已创建的工作线程,但是当工作线程空闲超过 60s,则会从线程池中移除。

任务队列为 SynchronousQueue,它是一个不存储元素的阻塞队列(容量 0),提交任务的操作必须等待工作线程的移除操作,反之亦然。

在这里插入图片描述


为什么使用 SynchronousQueue 作为任务队列

个人想法:线程池的工作逻辑是,提交任务时如果 核心线程数达到 corePoolSize 且任务队列已满,则会创建空闲线程执行。因为 SynchronousQueue 容量为 0 天然是满的,且 corePoolSize 被设置为 0,这意味着创建任务时如果没有可用线程,就会立即创建一个新线程来处理任务。

这使得 CachedThreadPool 在执行大量短期异步任务时更加高效,避免了任务对线程资源的等待,符合设计初衷:快速执行大量的短暂任务

FixedThreadPool

核心线程数和最大线程数相等,使用的是 无界任务队列 LinkedBlockingQueue。如果当前的工作线程数已经达到 nThreads,任务将被添加到任务队列中等待执行。如果有工作线程退出,下一次提交任务时将会有新的工作线程被创建来补足线程池。

SingleThreadExecutor

工作线程限制为 1 的 FixedThreadExecutor,它 保证了所有任务的都是被顺序执行

ScheduledThreadPool

ScheduledThreadPoolExecutor 允许安排一个任务在延迟指定时间后 执行,还可以 周期性地执行任务。周期性调度任务有两种类型:固定延迟和固定频率。固定延迟 是在上一个任务结束和下一个任务开始之间保持固定的延迟,而 固定频率 是以固定的频率执行任务,不管任务的执行时间多长。


ScheduledThreadPoolExecutor 中定义了内部类 DelayedWorkQueue 作为任务队列,DelayedWorkQueue 是基于堆的数据结构。队列中的元素为 RunnableScheduledFuture 类型:

private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];

RunnableScheduledFuture 接口继承关系如下图所示:

  • Delayed 接口继承了 Comparable 接口,getDelay 方法返回任务剩余的延迟时间,返回值小于等于 0 说明延迟的时间已过;compareTo 方法用于比较任务下一次的执行时间,用于维护小顶堆属性(父节点任务的执行时间小于儿子节点)。
  • RunnableFuture 接口的 run 方法定义了需要执行的任务逻辑,Future 接口用于获取异步任务的执行结果。

DelayedWorkQueue#take 方法用于获取下一个需要执行的定时任务,代码及详细注释如下:

public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;// 上锁, 避免堆数据访问产生的数据竞争lock.lockInterruptibly();try {for (;;) {// 堆顶元素: Delayed#getDelay 延迟时间最小的任务RunnableScheduledFuture<?> first = queue[0];if (first == null)// 堆中任务空, 等待新任务入堆available.await();else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0)// delay小于等于0, 说明延迟时间已过, 可以执行;// finishPoll 弹出堆顶任务return finishPoll(first);first = null; // don't retain ref while waitingif (leader != null)// leader为等待堆顶任务到达执行时间的线程// leader 非空说明已经有线程正在等待堆顶任务可执行, 因此当前线程为 follower, 需要等待直到堆顶元素变更available.await();else {// 当前线程是等待堆顶元素的 leader 线程, 设置 leader 属性Thread thisThread = Thread.currentThread();leader = thisThread;try {// 等待任务延迟的时间available.awaitNanos(delay);} finally {// await期间会释放锁, leader可能因为新任务的加入而失效(当前线程可能等待的不再是堆顶任务)// 所以await超时后, 需要判断leader是否为当前线程, 为当前线程才能设为nullif (leader == thisThread)leader = null;}}}}} finally {// 任务队列非空, leader为空说明没有线程等待堆顶元素可执行, 此时唤醒 follower 线程, 尝试获取堆顶的任务if (leader == null && queue[0] != null)available.signal();lock.unlock();}
}

ThreadPoolExecutor 源码分析

线程池状态表示

ThreadPoolExecutor 最重要的状态参数为:线程池状态(rs) 以及 活跃线程数(wc)。ThreadPoolExecutor 使用一个 Integer 变量 ctl 存储这两个状态参数:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

Integer 长度位 32 bits,ctl 中最高的三位 (29-31) 存储线程池状态,低 29 位 (0~28) 存储活跃线程数,因此线程池中活跃线程数理论上限为 2 29 − 1 2^{29}-1 2291

了解了 ThreadPoolExecutor 的这种设计之后,我们来看看状态相关的位运算:

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

CAPACITY 表示线程池中活跃线程的理论上限 2 29 − 1 2^{29}-1 2291COUNT_BITS 表示线程数位数( 32 − 3 = 29 32-3=29 323=29)。

RUNNING 、SHUTDOWN、STOP、TIDYING、TERMINATED 为线程池的五种状态。根据代码,这五种状态表示如下图所示:

在这里插入图片描述

  • RUNNING:可接收新的任务,并且处理队列中排队的任务;
  • SHUTDOWN:不接收新的任务,但会处理队列中剩下的任务;
  • STOP:不接收新任务,不处理队列中的任务,并且中断进行中的任务;
  • TIDYING:所有的任务终止,工作线程数 (workerCount) 等于 0;
  • TERMINATED:线程池关闭,terminated() 方法完成。

获取 runState

private static int runStateOf(int c)     { return c & ~CAPACITY; }

runStateOf 方法从 ctl 获取线程池运行状态,保留 ctl 的最高的三位,其余位设置为 0。以 STOP 状态、3 个活跃线程数的 ctl 为例,求 rs 的过程如下:
在这里插入图片描述


获取 workerCount

private static int workerCountOf(int c)  { return c & CAPACITY; }

workerCountOf 获取线程池中的活跃线程数,即保留 ctl 的 0-28 位,将 29-31 位设置为 0。

在这里插入图片描述

生成 ctl

private static int ctlOf(int rs, int wc) { return rs | wc; }

ctlOf 通过状态值和线程数值计算出 ctl,就是对 rs 和 wc 进行或运算,保留 wc 的 0-28 位和 rs 的 29-31 位。

提交任务 execute()

ThreadPoolExecutor#execute 方法用于提交任务给线程池执行,代码以及详细注释如下:

public void execute(Runnable command) {if (command == null)throw new NullPointerException();// 获取线程池状态参数 ctlint c = ctl.get();// 如果活跃线程数小于核心线程池容量corePoolSize, addWorker创建新线程, 以command作为第一个任务if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;// 创建新线程失败, 更新 ctlc = ctl.get();}// 创建核心线程失败, 尝试将任务添加到任务队列中if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 二次检查, 如果线程池不在运行状态, 需要回滚刚刚入队的任务if (!isRunning(recheck) && remove(command))// 移除任务成功, 执行拒绝策略reject(command);else if (workerCountOf(recheck) == 0)// 线程池为运行状态, 但是没有工作线程, 创建线程处理任务队列中的任务addWorker(null, false);}// 任务添加到队列失败, (1)线程池状态不是RUNNING状态 或 (2)任务队列已满// 尝试增加非核心线程, 执行 command 任务, 如果线程池不为RUNNING, addWorker会返回falseelse if (!addWorker(command, false))// 线程池不为RUNNING, 新增非核心线程失败, 执行任务拒绝策略reject(command);}

这段代码的主要逻辑很简洁:

  1. 当 wc 小于 corePoolSize 时,创建核心线程执行 command 任务;
  2. 如果核心线程数已满,则将任务缓存在任务队列中 (workQueue.offer),工作线程完成手头上的任务后,从任务队列中获取新任务。
  3. 如果任务队列也满了,offer 方法返回 false,尝试增加非核心线程执行 command。如果线程创建失败,reject 执行任务拒绝策略。

除此之外,我想在本篇博客中探讨下 execute 方法的一些实现细节,并给出我自己的观点用于抛砖引玉。


为什么需要二次检查

大家请看下面这段有关二次检查的代码,在阅读源码时,我产生了疑问: 为什么需要二次检查 ?该操作 解决了什么场景下的数据竞争

// ...
c = ctl.get(); // -------(1)
if (isRunning(c) && workQueue.offer(command)) { // -------(2)int recheck = ctl.get();  // -------(3)if (!isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);
}
// ...

执行二次检查的前提是:

  • 线程池在执行语句 (1) 的时候,是运行状态;
  • 任务队列未满,command 被添加至队列,(2) 处的 offer 方法返回 true;

假设没有二次检查,会发生什么

场景 1:在语句 (1) 后,语句(2) 执行前,线程池使用者调用了 shutdownNow 方法将线程池工作线程关闭,清空任务队列中的任务。时序图如下:


我们先来看看 shutdownNow 调用的 drainQueue 方法:

public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(STOP);interruptWorkers();tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;
}private List<Runnable> drainQueue() {BlockingQueue<Runnable> q = workQueue;ArrayList<Runnable> taskList = new ArrayList<Runnable>();q.drainTo(taskList);if (!q.isEmpty()) {for (Runnable r : q.toArray(new Runnable[0])) {if (q.remove(r))taskList.add(r);}}return taskList;
}

drainQueue 方法用于移除任务队列 workQueue 中的 Runnable 任务,这些未执行的任务作为 shutdownNow 方法的返回值,通知方法调用者哪些任务未执行。

如果按照上图中的执行序列,线程池的状态已经为 STOP,任务队列也被清空,但是新提交的任务 command 却被添加到任务队列中。这导致这个新任务不会被运行、也不会执行拒绝策略、也无法通过 shutdownNow 返回的任务列表通知调用者

这严重降低了线程池的健壮性,难以想象一个已提交的任务消失在线程池中!


场景 2:线程池处于运行状态,corePoolSize 设置为 0,阻塞队列的容量大于 0。

线程池刚启动时,提交任务 command 显然无法创建核心线程执行,任务会被缓冲在任务队列中,直到任务队列容量到达上限,线程池才会创建非核心线程执行任务。这导致 大量任务将不能及时被处理,甚至可能永远得不到执行

场景示意图如下(图中任务队列容量为 4,corePoolSize 等于 0):


二次检查解决了上述两种场景的问题吗?当然!!

c = ctl.get(); // -------(1)
if (isRunning(c) && workQueue.offer(command)) { // -------(2)int recheck = ctl.get();  // -------(3)if (!isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0) // -----(4)addWorker(null, false);
}

针对场景 1

如果在语句 (1) 和 语句(2) 之间,shutdownNow 被调用并执行完成,然后语句 (2) 将新任务 command 加入任务队列。在语句 (3) 重新获取最新的 ctl,此时就能得知线程池的状态已经为 STOP,使用 remove 方法回滚入队的任务,并执行 reject 方法拒绝执行任务


针对场景 2
如果线程池状态为 RUNNING,但因为线程中没有线程,语句(4) 判断为 true,创建非核心线程处理任务队列中的任务,防止异步任务长时间处于队列中得不到处理 的情况。


创建工作线程 addWorker()

addWorker 用于创建工作线程,我将其分为两部分分析:

  • 第一部分:根据外层死循环判断 ThreadPoolExecutor 的运行状态 是否能够创建线程。如果可以创建线程,通过内层死循环 CAS 更新状态参数 ctl,直到更新成功或线程池状态发生改变。

第一部分的含详细注释的代码如下:

private boolean addWorker(Runnable firstTask, boolean core) {// retry为外层循环retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 仅 (1) RUNNING状态 或 (2) SHUTDOWN状态+队列中仍有任务+firstTask为空 时 创建工作线程// firstTask为空, 说明活跃线程数不满足线程池运行的最小数量if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;// for内层循环for (;;) {int wc = workerCountOf(c);// 如果线程数达到容量上限, 不可创建新线程// 如果core为true, 线程数大于等于corePoolSize, 不能创建核心线程// 如果 core 为 false, 线程数大于等于 maximumPoolSize, 不可以创建非核心线程if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// CAS更新 ctl, 如果成功, 则退出 retry 循环, 执行创建流程if (compareAndIncrementWorkerCount(c))break retry;// CAS更新失败, 重新读取 ctlc = ctl.get();if (runStateOf(c) != rs)// 状态发生改变, 重新执行大循环continue retry;// else: 线程数改变导致CAS失败, 继续for循环即可}}// ... 省略第二部分
} 
  • 第二部分:状态更新成功后,执行真正的线程创建逻辑,包括:工作线程添加至 Worker 集合、启动 Thread 对象。

第二部分详细代码注释如下:

private boolean addWorker(Runnable firstTask, boolean core) {// ... 省略第一部分boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 持有锁的情况下获取 ctl, 防止 shutdown、shutdownNow 导致的状态变更int rs = runStateOf(ctl.get());// 运行状态为 RUNNING或运行状态为 SHUTDOWN 且 firstTask为空 才允许启动工作线程if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 线程可能已经启动, 抛出异常(例如: 自定义的ThreadFactory#newThread 方法多次调用返回同一个 Thread 对象)if (t.isAlive())throw new IllegalThreadStateException();workers.add(w); // 添加到 HashSet 中int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// worker 成功添加到 workers 集合, 在这里真正启动工作线程t.start();workerStarted = true;}}} finally {if (! workerStarted)// 启动线程失败(可能线程已经启动 或 线程池状态发生改变), 将worker从workers中移除, 扣减 workerCountaddWorkerFailed(w);}return workerStarted;
}

大部分代码阅读注释即可了解原理,这里提一下我阅读时产生疑惑的地方:

疑惑一firstTask 等于 null 代表什么?为什么判断能否创建线程时,处于 SHUTDOWN 状态还需要 firstTask 等于 null ?

 if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;

疑惑二:为什么需要在持有 mainLock 后,需要重新检查运行状态 rs


先来看疑惑一,firstTask 等于 null 出现的场景有:

  • 预启动核心线程(所有包含 prestart 单词的方法)
public boolean prestartCoreThread() {return workerCountOf(ctl.get()) < corePoolSize &&addWorker(null, true);
}public int prestartAllCoreThreads() {int n = 0;while (addWorker(null, true))++n;return n;
}
  • 在工作线程退出时,替换死亡的工作线程。(processWorkerExit 方法)
private void processWorkerExit(Worker w, boolean completedAbruptly) {// ...int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}
}
  • 提交的新任务被缓冲在队列,但活跃线程数 workCount 等于 0。(execute方法)

在 addWorker 方法中,Running 状态可以创建工作线程,SHUTDOWN 状态仅可以在 firstTask 等于 null 的条件下创建线程。这符合 SHUTDOWN 状态的设计初衷:不接受新的任务、仅处理已添加至阻塞队列中的任务

除了预启动场景,execute 场景和 processWorkerExit场景 均是为了确保已经添加到任务队列中的任务不被放弃,能够成功执行。


再来看疑惑二:为什么在持有 mainLock 的情况下获取运行状态 rs?

这是为了防止 shutdown、shutdownNow 方法关闭线程池,改变运行状态。
为了确保 shutdown 和 shutdownNow 方法执行时 worker 集合的稳定,从而保证方法执行过程的原子性,这两种方法都会 在持有 mainLock 的情况下,修改 runState

因此,如果创建 worker 时 rs 发生了改变从而不应该增加工作线程,应该退出创建流程。(例如 RUNNING 变为 STOP 状态,此时不应该创建线程,因为任务都被丢弃了)。

mainLock.lock();int rs = runStateOf(ctl.get());// 确保运行状态 rs 可以创建新的线程if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// ...}
mainLock.unlock();

下面是我绘制的 addWorker 工作流程图,作为本小节的总结:


工作线程 Worker

ThreadPoolExecutor 中的线程资源被包装为 Worker 对象,它持有一个 Thread 对象,实现了 Runnable 接口,又继承了 AQS,因此也具有锁的性质。

需要指出的是,它没有利用 AQS 中的 CLH 队列管理等待资源的线程,因为 Worker 并 不存在多个线程争抢所有权,它的 lock 方法仅由内部持有的 线程调用。

private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{final Thread thread;/** Initial task to run.  Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;Worker(Runnable firstTask) {setState(-1); // AQS state 属性初始化为 -1this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}// 线程的执行逻辑就是 runWorker方法public void run() {runWorker(this);}// runWorker方法中, 线程在执行任务前持有锁, 将state更改为 1public void lock()        { acquire(1); }// shutdown 关闭空闲线程时, 使用 tryLock 尝试获取锁 public boolean tryLock()  { return tryAcquire(1); }// 任务执行完成释放锁, state更改为 0public void unlock()      { release(1); }public boolean isLocked() { return isHeldExclusively(); }// ...
}

AQS 在 Worker 中的主要作用是维护 state 属性。Worker 构造函数中,state 初始化为 -1,执行 runWorker() 方法时会被设置为 0。state 等于 0 说明线程是空闲的,state 等于 1 说明线程正在处理任务

  • Worker#lock() 方法在仅在 runWorker 方法中被调用,线程在执行任务前调用该方法持有锁,将state更改为 1。
  • Worker#unlock() 方法在执行完任务后被调用,释放锁,将 state 更改为 0。
  • Worker#tryLock() 方法在 shutdown() 方法中被调用,用于中断空闲的工作线程,因为空闲的 Worker state 等于 0,因为 tryLock 能返回 true。

主逻辑 runWorker

runWorker 代码及详细注释如下:

    final void runWorker(Worker w) {Thread wt = Thread.currentThread();// 工作线程的第一个任务, 创建核心线程 或 线程池已满创建非核心线程时, firstTask非空Runnable task = w.firstTask;w.firstTask = null;// 将 state 由初始值 -1 修改为 0w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// getTask如果为空, 说明任务队列中已经没有任务可以执行, 工作线程正常退出while (task != null || (task = getTask()) != null) {w.lock(); // 在执行任务前, 清除线程的中断标记(较为费解, 随后详细解释)if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 执行任务前的钩子方法, 继承ThreadPoolExecutor的类可重写beforeExecute(wt, task);Throwable thrown = null;try {// 执行任务task.run(); } catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 执行任务完成后的钩子方法, 继承ThreadPoolExecutor的类可重写afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock(); // state修改为0, 工作线程空闲}}completedAbruptly = false;} finally {// 处理线程退出:// 1. 从 worker 集合中移除当前工作线程// 2. 如果活跃线程数不满足线程池运行的最低要求, 或者线程因为执行异常而终止, 创建新线程替换processWorkerExit(w, completedAbruptly);}}

工作线程的运行流程概括起来为:

  1. getTask 从线程池中获取 Runnable 任务;
  2. 按照 beforeExecuteRunnable#runafterExecute 的顺序执行,beforeExecute 和 afterExecute 为 ThreadPoolExecutor 提供的两个扩展点,子类可以重写这两个方法满足打点、日志等自定义需求。
  3. 如果任务顺利执行,进行下一轮循环,通过 getTask 获取新任务。
    如果 getTask 返回 null,说明任务队列中没有任务 或者 当前线程因为线程池关闭而被中断
  4. 如果任务 或 钩子函数执行时抛出了异常,线程同样会退出,completedAbruptly 为 true。

在讲解完工作线程的主要流程后,我们来讨论下面这个 if 语句的含义:

Thread wt = Thread.currentThread();
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();
// ...

这段代码执行的目的是:

工作线程 worker 已经领取了一个任务准备执行,如果线程池状态为 RUNNING 或 SHUTDOWN,应该确保当前线程的中断标记被清除,从而不影响任务的执行。Thread.interrupted() 方法会 返回当前线程的中断标记,并将线程中断标记清空

如果线程池的状态为 STOP,且当前线程未被中断,wt.interrupt() 为当前线程打上中断标记。

下面我来分类讨论,帮助大家更好的理解:

  • runStateAtLeast(ctl.get(), STOP) == true && !wt.isInterrupted() == true
    当前线程池的状态至少为 STOP,当前线程却没有中断标记。if 判断为 true,中断当前线程;
  • (runStateAtLeast(ctl.get(), STOP) == falseThread.interrupted() == true && runStateAtLeast(ctl.get(), STOP) == false
    if 判断为 false,当前线程的状态为 RUNNING 或 SHUTDOWN,且已经有一个即将执行的任务,Thread.interrupted() 将中断标记清除。
  • (runStateAtLeast(ctl.get(), STOP) == falseThread.interrupted() == true && runStateAtLeast(ctl.get(), STOP) == true!wt.isInterrupted() == true
    这种情况非常反直觉,但是有可能出现的。下图操作序列很好说明了这种情况:因为 错误地将 STOP 中断标记给清除,所以 if 也会判断为 true,执行 wt.interrupt() 中断当前线程。
  • (runStateAtLeast(ctl.get(), STOP) == falseThread.interrupted() == false && runStateAtLeast(ctl.get(), STOP) == true
    这种情况类似上一种,只是线程池状态设置为 STOP,还未中断当前线程,if 操作会返回 false。


获取任务 getTask()

工作线程通过 getTask 从任务队列中获取任务,如果 getTask 返回 null,线程就会退出 runWorker 中的死循环。

getTask 何时返回 null

条件一:线程池状态为STOP、TIDYING、TERMINATED;或者是 SHUTDOWN且工作队列为空

条件二:工作线程 wc 大于最大线程数或当前工作线程已经超时, 且还有其他工作线程或任务队列为空。

当前线程超时的条件:(【核心线程可以超时】或【线程数大于核心线程数】)且 上一轮循环从阻塞队列的 poll 方法超时返回。

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c); // rs保留c的高3位, 低29位全部清零// 大小顺序为TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING// 条件一if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount(); // cas扣减线程数return null;}int wc = workerCountOf(c);// timed表示当前线程是否能够超时(设置了【核心线程超时】或线程数超过了核心线程)boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 条件二if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {// 可能有多个线程同时满足条件二, 需要使用cas扣减if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 超出核心线程数时, poll等待存在超时时间; 反之, 使用take阻塞Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS):workQueue.take();if (r != null)return r;timedOut = true; // poll取任务超时, timedOut设置为true} catch (InterruptedException retry) {timedOut = false;}}
}

getTask 的流程图如下:


随后,我将在【工作线程的退出】章节,详细介绍 不同场景线程池回收工作线程的过程 ,会结合 getTask 方法分析。

工作线程的退出

RUNNING 状态所有任务执行完成

这种场景下,会将工作线程的数量减少到核心线程数大小。

int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;
}

timed 表示是否允许线程因为超时被回收;timedOut 记录上一轮循环中,线程从阻塞队列获取任务是否超时了。

假设线程池核心线程数为2,最大线程数为4。线程数低于核心线程数时,使用execute 提交任务便会创建核心线程;线程数达到 2 后,任务被添加至阻塞队列,如果阻塞队列也满了,将工作线程逐渐增加到 4。当全部任务执行完成后:

  1. 工作队列为空,四个线程阻塞在 workQueue.poll 上,各自等待 keepAliveTime 时间后,超时返回,timedOut 设置为 true。

  2. 进入下一轮循环,因为 wc 等于 4 大于 corePoolSize=2,因此四个线程 timed 均为 true,从而 timed&timedOut 为 true 且 当前任务队列为空,情况二成立,4 个线程都可以被超时回收。

  3. 四个线程尝试 CAS 扣减 wc 为 3(仅有一个线程能扣减成功,getTask 返回 null)。其余三个线程继续循环,直到线程数达到核心线程数,timed 等于 false。

shutdown 关闭线程池

调用 shutdown() 后,线程池状态流转为 SHUTDOWN,随后向所有的空闲工作线程发送中断信号

public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers(); // 中断所有空闲线程onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();
}

处于 getTask 获取任务阶段的工作线程是空闲的,并没有锁定 Worker。我将分三种情况探讨工作线程如何响应中断信号。

  • 任务全部完成,所有线程在等待;
  • 任务队列中积压了大量任务,所有线程在繁忙;
  • 队列中剩余的任务少于空闲线程数;
所有线程等待新任务
// getTask(): 条件一
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount(); // cas扣减线程数return null;
}
...
try {// 超出核心线程数时, poll等待存在超时时间; 反之, 使用take阻塞Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true; // poll取任务超时, timedOut设置为true
} catch (InterruptedException retry) {timedOut = false;
}

中断信号将阻塞的线程唤醒,进入下一轮循环。当到达条件一处,检查到 rs 等于SHUTDOWN,且工作队列为空,满足条件,扣减线程数后返回null。在runWorker 中退出循环,结束线程。

所有线程繁忙

此时任务队列中积压了很多任务,工作线程因为 shutdown 而被中断,在获取任务时 调用 poll 或 take 方法都会抛出 InterruptedException 异常,然后被 catch 捕获,重新进行循环。

第二次循环到达条件一,虽然 rs 为 SHUTDOWN,但是工作队列非空,不满足退出条件。

// 工作队列非空, 条件1不满足
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;
}

timedOut 为 false,不是因为 poll 超时而返回,因此条件 2 也不满足:

// timedOut false
if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;
}

因此,shutdown 方法在线程池繁忙的情况下,相当于让 正在获取任务的线程空转了一次,不影响线程池运行。

队列中剩余少量的任务

假设情形

线程池状态已经是SHUTDOWN,但任务队列中剩余两个任务,A、B、C、D四个线程同时通过条件一和条件二,尝试从阻塞队列中获取任务。线程A、B成功获取任务,而线程 C、D因队列为空而阻塞。

线程A、B执行完任务后再次调用 getTask(),条件一的判断为true(线程池运行状态为SHUTDOWN且工作队列为空),于是返回 null,线程退出 runWorker 死循环,准备进行回收。

final void runWorker(Worker w) {boolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {...}}finally {// 回收退出的线程processWorkerExit(w, completedAbruptly);
}

在回收前,还需要执行 processWorkerExit 方法。在该方法中会将 worker 移除出 worker 集合,并调用tryTerminate()。

private void processWorkerExit(Worker w, boolean completedAbruptly) {// 执行任务时抛出异常退出, 而非getTask()返回null退出, 需要更新ctl属性反映线程数的变化if (completedAbruptly) decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks; // 统计完成的任务数workers.remove(w); // 将Worker对象移除工作线程集合} finally {mainLock.unlock();}tryTerminate();...
}

在 tryTerminate 中,线程A、B判断线程池状态为 SHUTDOWN 且工作队列为空,不会在第一个 if 处返回。

然后判断出当前workers中的工作线程数不为0(因为线程C、D正阻塞),然后调用 interruptIdleWorkers(ONLY_ONE)

注意:此时线程A、线程B的线程数已经从ctl扣减,Worker实例也从workers中移除。

final void tryTerminate() {for (;;) {int c = ctl.get();if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;// 线程池状态为SHUTDOWN, 但仍然有线程阻塞在take或poll方法处if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}...}
}

interruptIdleWorkers 的入参 onlyOne 为true,因此只会中断一个空闲线程,然后break循环。假设先中断线程C,线程C从阻塞中被唤醒,抛出InterruptedException异常,被 catch 住异常后重新进行一轮循环,发现条件一满足,更新 ctl 并返回null。

private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;// 正在执行任务的Worker是无法获取锁的, 因此这里只能回收空闲线程if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();    } catch (SecurityException ignore) {} finally {w.unlock();}}// 仅中断一个空闲线程if (onlyOne)break;}} finally {mainLock.unlock();}
}

随后,线程 D 可以由上一个退出的线程中断唤醒(例如线程 C),从而让工作线程优雅地退出。

写在最后

感谢各位读者阅读本片博客,本篇博客的创作过程中参考了大量资料,笔者也详细阅读了 ThreadPoolExecutor 源码。笔者将很多阅读源码的思考融入本篇博客,尽可能去体会 Doug Lea 大神每一行代码的用意。这些细节可能很少有博客涉及,因此很可能存在纰漏和理解错误。如果有异见,欢迎在评论区指教,笔者将虚心倾听

创作过程耗时费力,但我乐在其中(钻研源码的过程和分享知识是让人快乐的事情),如果大家喜欢这种图文结合、代码详细注释的写作风格,就给我点一个免费的赞吧!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/526022.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ARTS Week 20

Algorithm 本周的算法题为 1222. 可以攻击国王的皇后 在一个 下标从 0 开始 的 8 x 8 棋盘上&#xff0c;可能有多个黑皇后和一个白国王。 给你一个二维整数数组 queens&#xff0c;其中 queens[i] [xQueeni, yQueeni] 表示第 i 个黑皇后在棋盘上的位置。还给你一个长度为 2 的…

日期问题---算法精讲

前言 今天讲讲日期问题&#xff0c;所谓日期问题&#xff0c;在蓝桥杯中出现众多&#xff0c;但是解法比较固定。 一般有判断日期合法性&#xff0c;判断是否闰年&#xff0c;判断日期的特殊形式&#xff08;回文或abababab型等&#xff09; 目录 例题 题2 题三 总结 …

WPF 窗口添加投影效果Effect

BlurRadius&#xff1a;阴影半径 Color&#xff1a;颜色 Direction&#xff1a;投影方向 ShadowDepth&#xff1a;投影的深度 <Window.Effect><DropShadowEffect BlurRadius"10" Color"#FF858484" Direction"300" ShadowDepth&quo…

云计算项目十一:构建完整的日志分析平台

检查k8s集群环境&#xff0c;master主机操作&#xff0c;确定是ready 启动harbor [rootharbor ~]# cd /usr/local/harbor [rootharbor harbor]# /usr/local/bin/docker-compose up -d 检查head插件是否启动&#xff0c;如果没有&#xff0c;需要启动 [rootes-0001 ~]# system…

代码学习记录15

随想录日记part15 t i m e &#xff1a; time&#xff1a; time&#xff1a; 2024.03.09 主要内容&#xff1a;今天的主要内容是二叉树的第四部分&#xff0c;主要涉及平衡二叉树的建立&#xff1b;二叉树的路径查找&#xff1b;左叶子之和&#xff1b;找树左下角的值&#xff…

毅速3D打印随形透气钢:模具困气排气革新之选

在注塑生产过程中&#xff0c;模具内的气体若无法有效排出&#xff0c;往往会引发困气现象&#xff0c;导致产品表面出现气泡、烧焦等瑕疵。这些瑕疵不仅影响产品的美观度&#xff0c;更可能对其性能造成严重影响&#xff0c;甚至导致产品报废&#xff0c;从而增加生产成本。 传…

用C语言执行SQLite3的gcc编译细节

错误信息&#xff1a; /tmp/cc3joSwp.o: In function main: execSqlite.c:(.text0x100): undefined reference to sqlite3_open execSqlite.c:(.text0x16c): undefined reference to sqlite3_exec execSqlite.c:(.text0x174): undefined reference to sqlite3_close execSqlit…

数据结构入门篇 之 【单链表】的实现讲解(附单链表的完整实现代码以及用单链表完成通讯录的实现代码)

虽然封面是顶针&#xff0c;但是我们还是要好好学习❀ 一.单链表 1.单链表的概念 2.单链表的结构 3.单链表的实现 1&#xff09;.尾插函数 SLTPushBack 2&#xff09;.打印函数 SLPrint 3&#xff09;. 头插函数 SLTPushFront 4&#xff09;.尾删函数 SLTPopBack 5&am…

spring-cloud-openfeign 3.0.0(对应spring boot 2.4.x之前版本)之前版本feign整合ribbon请求流程

在之前写的文章配置基础上 https://blog.csdn.net/zlpzlpzyd/article/details/136060312 下图为自己整理的

IPsec VPN之安全联盟

一、何为安全联盟 IPsec在两个端点建立安全通信&#xff0c;此时这两个端点被称为IPsec对等体。安全联盟&#xff0c;即SA&#xff0c;是指通信对等体之间对某些要素的约定&#xff0c;定义了两个对等体之间要用何种安全协议、IP报文的封装方式、加密和验证算法。SA是IPsec的基…

MySQL-锁:共享锁(读)、排他锁(写)、表锁、行锁、意向锁、间隙锁,锁升级

MySQL-锁&#xff1a;共享锁&#xff08;读&#xff09;、排他锁&#xff08;写&#xff09;、表锁、行锁、意向锁、间隙锁 共享锁&#xff08;读锁&#xff09;、排他锁表锁行锁意向锁间隙锁锁升级 MySQL数据库中的锁是控制并发访问的重要机制&#xff0c;它们确保数据的一致性…

《剑指 Offer》专项突破版 - 面试题 76 : 数组中第 k 大的数字(C++ 实现)

目录 详解快速排序 面试题 76 : 数组中第 k 大的数字 详解快速排序 快速排序是一种非常高效的算法&#xff0c;从其名字可以看出这种排序算法最大的特点是快。当表现良好时&#xff0c;快速排序的速度比其他主要对手&#xff08;如归并排序&#xff09;快 2 ~ 3 倍。 快速排…