Java 并发编程 —— 透过源码剖析 ForkJoinPool

目录

一. 前言

二. 工作窃取的实现原理

2.1. WorkQueue(工作队列)

2.2. 工作窃取流程

三. ForkJoinPool 源码解析

3.1. ForkJoinPool 的字段

3.1.1. 常量

3.1.2. 成员变量

3.1.3. ctl(5个部分组成)

3.2. 构造函数

3.3. ForkJoinPool 的基本组成

3.4. ForkJoinPool 外部任务的提交

3.4.1. invoke

3.4.2. execute

3.4.3. submit

3.5. 外部提交过程分析

3.5.1. externalPush

3.5.2. externalSubmit

3.5.3. signalWork

3.5.4. tryAddWorker

3.5.5. createWorker

3.5.6. registerWorker

3.5.7. deregisterWorker

3.5.8. 外部线程提交过程总结

3.6. workQueue 的工作过程(任务执行)

3.6.1. runWorker

3.6.2. scan

3.6.3. tryRelease

3.6.4. awaitWork

3.6.5. workQueue的执行过程总结

3.7. join

四. 总结


一. 前言

    ForkJoinPool 是 JDK7 引入的,由 Doug Lea 编写的高性能线程池。核心思想是将大的任务拆分成多个小任务(即 Fork),然后在将多个小任务处理汇总到一个结果上(即 Join),非常像MapReduce 处理原理。同时,它提供基本的线程池功能,支持设置最大并发线程数,支持任务排队,支持线程池停止,支持线程池使用情况监控,也是 AbstractExecutorService 的子类,主要引入了“工作窃取”机制,在多 CPU 计算机上处理性能更佳。其广泛用在 Java8 的 Stream 中。

Fork/Join Pool 采用优良的设计、代码实现和硬件原子操作机制等多种思路保证其执行性能。其中包括(但不限于):计算资源共享、高性能队列、避免伪共享、工作窃取机制等。 

二. 工作窃取的实现原理

2.1. WorkQueue(工作队列)

先对 ForkJoinPool 里的队列有一个感性认识,这个队列和 Java 线程池的队列有很大区别,Java线程池的任务队列是一个阻塞队列。而这里的队列是一个队列的数组,也就是多个队列,即ForkJoinPool 每个线程都有自己的队列。另外偶数下标的队列用来保存外部提交的任务,奇数下标装的是线程自己的任务。

ForkJoinPool 类中的 WorkQueue 正是实现工作窃取的队列,javadoc 中的注释如下:

大意是大多数操作都发生在工作窃取队列中(在嵌套类工作队列中)。这些是特殊形式的Deques,主要有 push、pop、poll 操作。

Deque 是双端队列(double ended queue缩写),头部和尾部任何一端都可以进行插入,删除,获取的操作,即支持 FIFO(队列)也支持 LIFO(栈)顺序。

Deque 接口的实现最常见的是 LinkedList,除此还有 ArrayDeque、ConcurrentLinkedDeque 等。详细介绍可参见《Java 之 Queue & Deque 介绍》、《Java 之 Queue 家族》。

2.2. 工作窃取流程

工作窃取模式主要分以下几个步骤:

  1. 每个线程都有自己的双端队列。
  2. 当调用 fork 方法时,将任务放进队列头部,线程以 LIFO 顺序,使用 push/pop 方式处理队列中的任务。
  3. 如果自己队列里的任务处理完后,会从其他线程维护的队列尾部使用 poll 的方式窃取任务,以达到充分利用 CPU 资源的目的。
  4. 从尾部窃取可以减少同原线程的竞争。
  5. 当队列中剩最后一个任务时,通过 cas 解决原线程和窃取线程的竞争。

流程大致如下图所示:

工作窃取便是 ForkJoinPool 线程池的优势所在,在一般的线程池比如 ThreadPoolExecutor 中,如果一个线程正在执行的任务由于某种原因无法继续运行,那么该线程会处于等待状态,包括singleThreadPool、fixedThreadPool、cachedThreadPool 这几种线程池。 

而在 ForkJoinPool 中,那么线程会主动寻找其他尚未被执行的任务然后窃取过来执行,减少线程等待时间。

JDK8 中的并行流(parallelStream)功能是基于 ForkJoinPool 实现的,另外还有java.util.concurrent.CompletableFuture 异步回调 future,内部使用的线程池也是 ForkJoinPool。

三. ForkJoinPool 源码解析

3.1. ForkJoinPool 的字段

3.1.1. 常量

// Bounds 控制线程池相关边界的常量
// 二进制形式0b00000000_00000000_11111111_11111111  线程池索引掩码
static final int SMASK = 0xffff;        // short bits == max index// 二进制形式0b00000000_000000000_1111111_11111111   工作者线程数的最大值
static final int MAX_CAP = 0x7fff;        // max #workers - 1// 二进制形式0b00000000_00000000_11111111_11111110
// 用来取workQueues偶数槽下标,(二进制最低位强置为0,当最低位为0时,表示偶数)
static final int EVENMASK = 0xfffe;        // even short bits// 二进制形式0b00000000_00000000_00000000_01111110  最大的槽位值(限制最多只有64们偶数槽位)
static final int SQMASK = 0x007e;        // max 64 (even) slots// Masks and units for WorkQueue.scanState and ctl sp subfield  
// 控制WorkQueue的scanState属性 、及ForkJoinPool的ctl属性的低32位sp子属性
// 二进制形式0b00000000_00000000_00000000_00000001  ,runState的最低位,1表示正在扫描,0表示正在执行任务
static final int SCANNING = 1;             // false when running tasks// 二进制形式0b10000000_00000000_00000000_00000000    runState最高位为1,表示此线程是INACTIVE空闲的
static final int INACTIVE = 1 << 31;       // must be negative// 0b00000000_00000001_00000000_00000000  版本号,让线程池索引加1,取下一个线程
static final int SS_SEQ = 1 << 16;       // version count// Mode bits for ForkJoinPool.config and WorkQueue.config 控制ForkJoinPool.config和WorkQueue.config属性的常量// 二进制形式0b11111111_11111111_00000000_00000000  获取当前队列mode的掩码,只取config的高16位
static final int MODE_MASK = 0xffff << 16;  // top half of int// 二进制形式0b00000000_00000000_00000000_00000000 先进先出模式(异步模式),将WorkQueue看作(内部任务)队列
static final int LIFO_QUEUE = 0;// 二进制形式0b00000000_00000001_00000000_00000000 后时先出模式(非异步模式),将WorkQueue看作(内部任务)栈
static final int FIFO_QUEUE = 1 << 16;// 二进制形式0b10000000_00000000_00000000_00000000  共离模式,提交外部任务放入的队列
static final int SHARED_QUEUE = 1 << 31;       // must be negative// Lower and upper word masks  分别取ctl的高32位和低32位的掩码
private static final long SP_MASK    = 0xffffffffL;
private static final long UC_MASK    = ~SP_MASK;// Active counts  获取活动线程数的相关掩码
private static final int  AC_SHIFT   = 48;
private static final long AC_UNIT    = 0x0001L << AC_SHIFT;
private static final long AC_MASK    = 0xffffL << AC_SHIFT;// Total counts 获取部线程数的相关掩码
private static final int  TC_SHIFT   = 32;
private static final long TC_UNIT    = 0x0001L << TC_SHIFT;
private static final long TC_MASK    = 0xffffL << TC_SHIFT;
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign// runState bits: SHUTDOWN must be negative, others arbitrary powers of two
// 执行器运行状态的掩码。
private static final int  RSLOCK     = 1;
private static final int  RSIGNAL    = 1 << 1;
private static final int  STARTED    = 1 << 2;
private static final int  STOP       = 1 << 29;
private static final int  TERMINATED = 1 << 30;
private static final int  SHUTDOWN   = 1 << 31;

上面这些常量,大多都是获取相关属性的各分割位的掩码。SMASK 、SQMASK是获取线程池中线程数相关边界的掩码,SCANNING 、INACTIVE 等又是设置获取 runState 及 ctl 的 sp 子属性相关分割位含义的掩码。

3.1.2. 成员变量

// 用来配置ctl在控制线程数量使用
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign// 控制线程池数量(ctl & ADD_WORKER) != 0L 时创建线程,
// 也就是当ctl的第16位不为0时,可以继续创建线程
volatile long ctl;                   // main pool control// 全局锁控制,全局运行状态
volatile int runState;               // lockable status// config二进制形式的低16位表示parallelism,
// config二进制形式的第高16位表示mode,1表示异步模式, 使用先进先出队列, 0表示同步模式, 使用先进后出栈
// 低16位表示workerQueue在pool中的索引,高16位表示mode, 有FIFI LIFL 
final int config;  // parallelism, mode   // 生成workerQueue索引的重要依据
int indexSeed;         // to generate worker index  // 工作者队列数组,内部线程ForkJoinWorkerThread启动时会注册一个WorkerQueue对象到这个数组中
volatile WorkQueue[] workQueues;     // main registry // 工作者线程线程工厂,创建ForkJoinWorkerThread的策略
final ForkJoinWorkerThreadFactory factory;  // 在线程因未捕异常而退出时,java虚拟机将回调的异常处理策略
final UncaughtExceptionHandler ueh;  // per-worker UEH // 工作者线程名的前缀
final String workerNamePrefix;       // to create worker name string  // 执行器所有线程窃取的任务总数,也作为监视runState的锁
volatile AtomicLong stealCounter;    // also used as sync monitor// 通用的执行器,它在静态块中初始化
static final ForkJoinPool common; 

3.1.3. ctl(5个部分组成)

    类似于 ThreadPoolExecutor,在 ForkJoinPool 中也有一个 ctl 变量负责表达 ForkJoinPool 的整个生命周期和相关的各种状态。不过 ctl 变量更加复杂,是一个 long 型变量。

ctl 组成:

/** Bits and masks for field ctl, packed with 4 16 bit subfields:* AC: Number of active running workers minus target parallelism* TC: Number of total workers minus target parallelism* SS: version count and status of top waiting thread* ID: poolIndex of top of Treiber stack of waiters*/

ctl 变量的64个比特位被分成五部分:

  1. AC:最高的16个比特位,表示 Active 线程数-parallelism,parallelism 是上面的构造方法传进去的参数;
  2. TC:次高的16个比特位,表示 Total 线程数-parallelism;
  3. ST:1个比特位,如果是1,表示整个 ForkJoinPool 正在关闭;
  4. EC:15个比特位,表示阻塞栈的栈顶线程的 wait count;
  5. ID:16个比特位,表示阻塞栈的栈顶线程对应的 poolIndex。

这个线程池支持的最大线程数为 32767,如果超过会抛出异常。这里只支持 32767 的并行度是因为 ctl 的组成关系。只有16位用来存放线程数,最高位表示正负,所以只有15位来表示,也就是 2^{15} - 1。

阻塞栈(Treiber Stack):

  1. 实现多个线程的阻塞、唤醒,除了 park/unpark 这一对操作原语,还需要一个无锁链表实现的阻塞队列,把所有阻塞的线程串在一起。
  2. 在 ForkJoinPool 中,使用了阻塞栈。把所有空闲的 Worker 线程放在一个栈里面。
  3. ForkJoinWorkerThread 的 poolIndex 变量,记录了自己在 ForkJoinWorkerThread[] 数组中的下标位置,poolIndex 变量就相当于每个 ForkJoinPoolWorkerThread 对象的地址;
  4. ForkJoinWorkerThread 的 nextWait 变量,记录了前一个阻塞线程的 poolIndex,nextWait 变量就相当于链表的 next 指针,把所有的阻塞线程串联在一起,组成一个 TreiberStack。
  5. ctl 变量的最低16位,记录了栈的栈顶线程的 poolIndex;中间的15位,记录了栈顶线程被阻塞的次数,也称为 waitcount。

下图为所有阻塞的 Worker 线程组成的 Treiber Stack:

首先,WorkQueue 有一个 id 变量,记录了自己在 WorkQueue[] 数组中的下标位置,id 变量就相当于每个 WorkQueue 或 ForkJoinWorkerThread 对象的地址;

其次,ForkJoinWorkerThread 还有一个 stackPred 变量,记录了前一个阻塞线程的 id,这个stackPred 变量就相当于链表的 next 指针,把所有的阻塞线程串联在一起,组成一个 Treiber Stack。

最后,ctl 变量的最低16位,记录了栈的栈顶线程的 id;中间的15位,记录了栈顶线程被阻塞的次数,也称为 wait count。

ctl 变量的初始值:

long np = (long)(-parallelism)
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);

在初始时 ForkJoinPool 中的线程个数为0,AC=0-parallelism,TC=0-parallelism。即只有高32位的 AC、TC 两个部分填充了值,低32位都是0填充。

ForkJoinWorkerThread 状态与个数:

    在 ThreadPoolExecutor 中,有 corePoolSize 和 maxmiumPoolSize 两个参数联合控制总的线程数,而在 ForkJoinPool 中只传入了一个 parallelism 参数,且这个参数并不是实际的线程数。那么,ForkJoinPool 在实际的运行过程中,线程数究竟是由哪些因素决定的呢?

要回答这个问题,先得明白 ForkJoinPool 中的线程都可能有哪几种状态? 

ForkJoinPool 中的线程可能的三种状态:

  1. 空闲状态(放在 Treiber Stack 里面)。
  2. 活跃状态(正在执行某个 ForkJoinTask,未阻塞)。
  3. 阻塞状态(正在执行某个 ForkJoinTask,但阻塞了,于是调用 join,等待另外一个任务的结果返回)。 

ctl 变量很好地反映出了三种状态:
高32位:u=(int) (ctl >>> 32),然后u又拆分成 tc、ac 两个16位;
低32位:c=(int) ctl。

  1. c>0,说明 Treiber Stack 不为空,有空闲线程;c=0,说明没有空闲线程;
  2. ac>0,说明有活跃线程;ac<=0,说明没有空闲线程,并且还未超出 parallelism;
  3. tc>0,说明总线程数 >parallelism。

在提交任务的时候:

在通知工作线程的时候,需要判断 ctl 的状态,如果没有闲置的线程,则开启新线程:

ForkerJoinPool 没有使用 BlockingQueue,也就不利用其阻塞/唤醒机制,而是利用了 park/unpark原语,并自行实现了 Treiber Stack。

3.2. 构造函数

ForkJoinPool 提供了3个构造函数:

public ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,boolean asyncMode) {this(checkParallelism(parallelism),checkFactory(factory),handler,asyncMode ? FIFO_QUEUE : LIFO_QUEUE,"ForkJoinPool-" + nextPoolId() + "-worker-");checkPermission();
}

此方法没有进行初始化,但是有些字段需要进行说明:

属性类型说明
parallelismint表示ForkJoinPool支持的最大并行度
factoryForkJoinWorkerThreadFactory用于产生线程的工厂方法
handlerUncaughtExceptionHandler用于异常处理的拒绝策略
asyncModeboolean异步模式,如果为ture,则队列将采用FIFO_QUEUE,实现先进先出,反之则LIFO_QUEUE 实现后进先出

这个方法中会对并行度和 factory 进程 check,以确保系统能支持。parallelism 最大不允许大于MAX_CAP。最小必须大于0。factory 则只是判断是否为空,如果为空则这个地方会出现 NPE 异常。 

private ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,int mode,String workerNamePrefix) {this.workerNamePrefix = workerNamePrefix;this.factory = factory;this.ueh = handler;this.config = (parallelism & SMASK) | mode;long np = (long)(-parallelism); // offset ctl countsthis.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

这个方法才是真正的初始化的构造方法。上述计算过程,假定 parallelism 为7:

然后再计算 np 和 ctl:

这样就完成了初始化,需要注意的是,这个位运算操作是 ForkJoinPool 的核心,需要扎实理解。

public ForkJoinPool(int parallelism) {this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}public ForkJoinPool() {this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),defaultForkJoinWorkerThreadFactory, null, false);
}

无参的构造函数,将通过 MAX_CAP 和当前系统允许的最大并行度的最小值来指定并行度。使用默认的 ThreadFactory。handler 位空,默认采用 asyncMode 为 false。

实际这个方法最终还是调用的上面的两个方法,完成了初始化工作。

3.3. ForkJoinPool 的基本组成

    在了解了前面的变量之后,我们可以发现,ForkJoinPool 的实际组成是,由一个 WorkQueue 的数组构成。但是这个数组比较特殊,在偶数位,存放外部调用提交的任务,如通过 execute 和submit 等方法。这种队列称为 SubmissionQueue。而另外一种任务是前者在执行过程种通过 fork方法产生的新任务。这种队列称为 workQueue。

    SubmissionQueue 与 WorkQueue 的区别在于,WorkQueue 的属性“final ForkJoinWorkerThread owner;”是有值的。也就是说,WorkQueue 将有 ForkJoinWorkerThread 线程与之绑定。在运行过程中不断的从 WorkQueue 中获取任务。如果没有可执行的任务,则将从其他的 SubmissionQueue 和 WorkQueue 中窃取任务来执行。

    前面学习过了工作窃取算法,实际上,在 WorkQueue 上的 ForkJoinWorkerThread 就是一个窃取者,它从 SubmissionQueue 中或者去偷的 WorkQueue 中,按 FIFO 的方式窃取任务。之后执行。也从自己的 WorkQueue 中安 LIFO 或者 FIFO 的方式执行任务。这取决于模式的设定。默认情况下是采用 LIFO 的方式在执行。组成如下图所示:

所以工作队列数组的一个分布情况,它的大小一定是2次幂,奇数位和偶数位存放的虽然都是任务队列。但是奇数位是带工作线程的存放 fork 出的子任务的队列,偶数队列存放的是外部提交的任务。

外部任务提交的方法调用过程:

3.4. ForkJoinPool 外部任务的提交

    ForkJoinPool 在初始化之后,支持三种调用方式 invoke 、execute 和 submit。我们来对这三种方式进行分析。

3.4.1. invoke

public <T> T invoke(ForkJoinTask<T> task) {if (task == null)throw new NullPointerException();externalPush(task);return task.join();
}

可以看到 invoke 是支持有返回值的调用方法,之后调用 externalPush 进行 push。

3.4.2. execute

public void execute(ForkJoinTask<?> task) {if (task == null)throw new NullPointerException();externalPush(task);
}

而 execute 方法,则是没有返回值的调用方法。适用于哪些不需要返回结果的计算。此外还有种变体,支持 Runnable。实际上是将 Runnable 转换位 ForkJoinTask。

public void execute(Runnable task) {if (task == null)throw new NullPointerException();ForkJoinTask<?> job;if (task instanceof ForkJoinTask<?>) // avoid re-wrapjob = (ForkJoinTask<?>) task;elsejob = new ForkJoinTask.RunnableExecuteAction(task);externalPush(job);
}

3.4.3. submit

submit 与 invoke 方法基本一致,唯一的不同是,方法内部没有 join 方法,需要自行定义。

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {if (task == null)throw new NullPointerException();externalPush(task);return task;
}

这样可能更加灵活。

此外也支持 Runnable 和 Callable,而且 Runnable 也可以支持返回值。对于 Callable,通过AdaptedCallable 进行适配。

public <T> ForkJoinTask<T> submit(Callable<T> task) {ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task);externalPush(job);return job;
}

对于 Runnable,如果需要返回值,统一还是返回的 ForkJoinTask,之后在外层 join 得到结果。 

public <T> ForkJoinTask<T> submit(Runnable task, T result) {ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result);externalPush(job);return job;
}

 也可以采用这种方式:

public ForkJoinTask<?> submit(Runnable task) {if (task == null)throw new NullPointerException();ForkJoinTask<?> job;if (task instanceof ForkJoinTask<?>) // avoid re-wrapjob = (ForkJoinTask<?>) task;elsejob = new ForkJoinTask.AdaptedRunnableAction(task);externalPush(job);return job;
}

实际上通过这些提交方法,我们发现,最关键的入口时 externalPush 方法。下面我们就对这个方法进行分析。

3.5. 外部提交过程分析

现在开始跟踪 externalPush 的源码,对外部提交过程进行分析。

3.5.1. externalPush

这是外部提交的唯一入口。

/*** 尝试将任务添加到提交者当前的队列中,此方法只处理大多数情况,实际上是根据随机数指定一个workQueues的槽位,如果这个位置存在WorkQueue,则加入队列,然后调用signalWork通知其他工作线程来窃取。反之,则通过externalSubmit处理。这只适用于提交队列存在的普通情况。更复杂的逻辑参考externalSubmit。** @param task the task. Caller must ensure non-null.*/
final void externalPush(ForkJoinTask<?> task) {WorkQueue[] ws; WorkQueue q; int m;// 通过ThreadLocalRandom产生随机数int r = ThreadLocalRandom.getProbe();// 线程池的状态int rs = runState;// 如果ws已经完成初始化,且根据随机数定位的index存在workQueue,且cas的方式加锁成功if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&// 此处先用随机数和wq的size取&,之后再取SQMASK,这些操作将多余的位的值去除(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&// cas的方式加锁 将q中位于QLOCK位置的内存的值如果为0,则改为1,采用cas的方式进行U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a; int am, n, s;// 判断q中的task数组是否为空,if ((a = q.array) != null &&// am为q的长度 这是个固定值,如果这个值大于n n就是目前队列中的元素,实际实这里是判断队列是否有空余的位置(am = a.length - 1) > (n = (s = q.top) - q.base)) {// j实际上是计算添加到workQueue中的indexint j = ((am & s) << ASHIFT) + ABASE;// 将task通过cas的方式插入a的index为j的位置U.putOrderedObject(a, j, task);// 将队列q的QTOP位置的内存加1,实际上就是将TOP增加1U.putOrderedInt(q, QTOP, s + 1);// 以可见的方式将q的QLOCK改为0U.putIntVolatile(q, QLOCK, 0);// 此处,如果队列中的任务小于等于1则通知其他worker来窃取。为什么当任务大于1的时候不通知。而且当没有任务的时候发通知岂不是没有意义?此处不太理解if (n <= 1)// 这是个重点方法,通知其他worker来窃取signalWork(ws, q);return;}U.compareAndSwapInt(q, QLOCK, 1, 0);}// 初始化externalSubmit(task);
}

实际上 externalPush 的逻辑只能处理简单的逻辑,对于其他复杂的逻辑,则需要通过externalSubmit 提供,而这些简单的逻辑,实际上就是添加到任务队列。这个任务队列的索引一定是偶数:i = m & r & SQMASK。

这个计算过程,实际上由于 SQMASK 最后一位为0,因此计算的 index 的最后一位一定为0,这样导致这个值为偶数。也就是说,workQueues 的偶数位存放的是外部提交的任务队列。之后提交成功之后,调用 signalWork 方法让其他的 worker 来窃取。

总结下,就是当从外部提交一个任务,会随机的选择一个偶数下标位,然后将任务追加到这个WorkQueue 里的数组的最后。

但是请注意,这个方法是精简版的任务提交,因为它没有处理线程池的初始化等问题,如果随机到的偶数槽位队列可以提交任务,则就会直接将任务推入队列。否则会调用完整版任务提交方法externalSubmit。这种方式很像重入锁的入队,Doug Lea 一贯做法。

3.5.2. externalSubmit

这是提交过程中的分支逻辑处理的方法。完整版外部的任务提交方法:

  • 第一步,如果线程池没有初始化会先进行初始化操作,比如工作队列数组的空间分配还有线程池的状态修改等;
  • 第二步,如果随机的偶数槽位队列不为空,则将任务推入队列并调用signalWork方法唤醒线程;
  • 如果第二步槽位为null,则第三步为这个槽位创建队列后再重复循环。如果发生竞争会重新随机槽位。
/*** externalPush的完整版本,处理哪些不常用的逻辑。如第一次push的时候进行初始化、此外如果索引队列为空或者被占用,那么创建一个新的任务队列。** @param task the task. Caller must ensure non-null.*/
private void externalSubmit(ForkJoinTask<?> task) {// r是随机数,此处双重检测,确保r不为0int r;                                    // initialize caller's probeif ((r = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit();r = ThreadLocalRandom.getProbe();}// 死循环for (;;) {WorkQueue[] ws; WorkQueue q; int rs, m, k;// move默认为falseboolean move = false;// 如果runstate小于0 则线程池处于SHUTDOWN状态,配合进行终止if ((rs = runState) < 0) {// 终止的方法 并抛出异常,拒绝该任务tryTerminate(false, false);     // help terminatethrow new RejectedExecutionException();}// 如果状态不为STARTED 说明此时线程池可用else if ((rs & STARTED) == 0 ||     // initialize// 如果workQueues为null 或者其length小于1 则说明没用初始化((ws = workQueues) == null || (m = ws.length - 1) < 0)) {int ns = 0;// 对线程池以CAS的方式加锁,从RUNSTATE变为RSLOCK,如果不为RUNSTATE则自旋rs = lockRunState();try {// 如果状态为  RSIGNAL RSLOCK  说明加锁成功if ((rs & STARTED) == 0) {// 用cas的方式初始化STEALCOUNTERU.compareAndSwapObject(this, STEALCOUNTER, null,new AtomicLong());// create workQueues array with size a power of two// 创建workQueues的数组// 根据并行度计算得到config,此处确保p在SMASK范围内,即2个字节int p = config & SMASK; // ensure at least 2 slots// n判断p是否大于1,反之则默认按1处理int n = (p > 1) ? p - 1 : 1;// 下列过程是找到大于n的最小的2的幂 这个过程之前在HashMap中演示过n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;// 根据并行度计算得到了n,之后根据n确定workQueues的array的大小,这个数组的大小不会超过2^16workQueues = new WorkQueue[n];// 将ns的值修改为STARTEDns = STARTED;}} finally {// 最后将状态解锁 此时改为STARTED状态,这个计算过程有一点绕 unlockRunState(rs, (rs & ~RSLOCK) | ns);}// 实际上这个分支只是创建了外层的workQueues数组,此时数组内的内容还是全部都是空的 }// 如果根据随机数计算出来的槽位不为空,即索引处的队列已经创建,这个地方是外层死循环再次进入的结果// 需要注意的是这个k的计算过程,SQMASK最低的位为0,这样就导致,无论随机数r怎么变化,得到的结果总是偶数。else if ((q = ws[k = r & m & SQMASK]) != null) {// 如果这个槽位的workQueue未被锁定,则用cas的方式加锁 将其改为1if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {// 拿到这个队列中的arrayForkJoinTask<?>[] a = q.array;// s为top索引int s = q.top;// 初始化submitted状态boolean submitted = false; // initial submission or resizingtry {                      // locked version of push// 与上面的externalPush一致,此处push到队列中// 先判断 数组不为空且数组中有空余位置,能够容纳这个taskif ((a != null && a.length > s + 1 - q.base) ||// 或者通过初始化的双端队列的数组不为null (a = q.growArray()) != null) {// 计算数组的indexint j = (((a.length - 1) & s) << ASHIFT) + ABASE;// 在索引index处插入taskU.putOrderedObject(a, j, task);// 将队列的QTOP加1U.putOrderedInt(q, QTOP, s + 1);// 将提交成功状态改为truesubmitted = true;}} finally {// 最终采用cas的方式进行解锁 将队列的锁定状态改为0U.compareAndSwapInt(q, QLOCK, 1, 0);}//如果submitted为true说明数据添加成功,此时调用其他worker来窃取if (submitted) {// 调用窃取的方法signalWork(ws, q);// 退出return;}}// move状态改为truemove = true;                   // move on failure}// 如果状态不为RSLOCK 上面两个分支都判断过了,那么此处说明这个索引位置没有初始化else if (((rs = runState) & RSLOCK) == 0) { // create new queue// new一个新队列q = new WorkQueue(this, null);// hint 记录随机数q.hint = r;// 计算config SHARED_QUEUE 将确保第一位为1 则这个计算出来的config是负数,这与初始化的方法是一致的q.config = k | SHARED_QUEUE;// 将scan状态改为INCATIVEq.scanState = INACTIVE;// 用cas的方式加锁rs = lockRunState();  // 将创建的workQueue push到workQueues的数组中// publish indexif (rs > 0 &&  (ws = workQueues) != null &&k < ws.length && ws[k] == null)// 赋值ws[k] = q;                 // else terminated// 解锁unlockRunState(rs, rs & ~RSLOCK);}else// 将move改为truemove = true;                   // move if busyif (move)// 重新计算rr = ThreadLocalRandom.advanceProbe(r);}
}

三种情况:1、需要初始化。2、该槽位上的 workqueue 不是 null。3、整体数组不为 null 但是其对应的槽位上的 workqueue 是 null。

把任务放进队列是自旋过程,直到成功为止,所以最后就是把任务放入了其对应的槽位中。

这个地方将产生的无 owner 的 workQueue 放置在索引 k 的位置,需要注意的是 k 的计算过程,k= r & m & SQMASK。r 是随机数,m 是数组的长度,而 SQMASK:

SQMASK: 0000 0000 0000 0000 0000 0000 0111 1110

最后一位不为1,这就导致不管 r 如何变化,得到的 k 最后一位都不为1,这就构造了一个偶数 k 最后一位为0,k 不可能是奇数。

3.5.3. signalWork

    任务提交成功后会调用这个方法,它的作用就是激活一个空闲线程或创建一个线程并绑定一个队列在队列数组的奇数槽位。

    如果清楚 ForkJoinPool 中主要成员变量所代表的含义,这个方法就可以很容易的理解。它首先去判断是否有空闲的队列也就是通过 ctl 的低32位,如果没有则会判断是否能再添加线程,可以就会创建。如果有空闲线程则会进行激活。具体实现可以看下面代码:

/*** 此处将激活worker Thread。如果工作线程太少则创建,反之则来进行窃取。** @param ws the worker array to use to find signallees* @param q a WorkQueue --if non-null, don't retry if now empty*/
final void signalWork(WorkQueue[] ws, WorkQueue q) {long c; int sp, i; WorkQueue v; Thread p;// 如果ctl为负数  ctl初始化的时候就会为负数 如果小于0  说明有任务需要处理while ((c = ctl) < 0L) {                       // too few active// c为long,强转int 32位的高位都丢弃,此时如果没有修改过ctl那么低位一定为0 可参考前面ctl的推算过程,所以此处sp 为0 sp为0则说明每月空闲的workerif ((sp = (int)c) == 0) {                  // no idle workers// 还是拿c与ADD_WORKER取& 如果不为0 则说明worker太少,需要新增workerif ((c & ADD_WORKER) != 0L)            // too few workers// 通过tryAddWorker 新增workertryAddWorker(c);break;}// 再次缺认ws有没有被初始化 如果没有 退出if (ws == null)                            // unstarted/terminatedbreak;// 如果ws的length小于sp的最低位 退出if (ws.length <= (i = sp & SMASK))         // terminatedbreak;// 如果index处为空 退出if ((v = ws[i]) == null)                   // terminatingbreak;// 将sp的低32位取出int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState// 计算用sp减去 scanStateint d = sp - v.scanState;                  // screen CASlong nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);// 采用cas的方式修改ctl 实际上就是加锁 由于ctl的修改可能会导致while循环退出if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {v.scanState = vs;                      // activate v// 如果p被park wait中if ((p = v.parker) != null)// 将worker唤醒 U.unpark(p);// 退出break;}// 如果此队列为空或者没有task 也退出if (q != null && q.base == q.top)          // no more workbreak;}
}

可以看到,实际上 signalWork 就做了两件事情,第一,判断 worker 是否充足,如果不够,则创建新的 worker。第二,判断 worker 的状态是否被 park了,如果 park 则用 unpark 唤醒。这样worker 就可以去 scan 其他队列进行窃取了。

3.5.4. tryAddWorker

此方法是在 worker 不足的时候,添加一个 worker 来执行的具体类。

/*** 尝试新增一个worker,然后增加ctl中记录的worker的数量** @param c incoming ctl value, with total count negative and no* idle workers.  On CAS failure, c is refreshed and retried if* this holds (otherwise, a new worker is not needed).*/
private void tryAddWorker(long c) {// 传入的c为外层调用方法的ctl add标记为falseboolean add = false;do {long nc = ((AC_MASK & (c + AC_UNIT)) |(TC_MASK & (c + TC_UNIT)));// 如果此时ctl与外层传入的ctl相等 说明没有被修改if (ctl == c) {int rs, stop;                 // check if terminating// 用cas的方式加锁if ((stop = (rs = lockRunState()) & STOP) == 0)// 增加ctl的数量,如果成功 add为tureadd = U.compareAndSwapLong(this, CTL, c, nc);// 解锁unlockRunState(rs, rs & ~RSLOCK);// 如果stop不为0 则说明线程池停止 退出if (stop != 0)break;// 如果前面增加ctl中的数量成功,那么此处开始创建workerif (add) {createWorker();break;}}// 这个while循环, 前半部分与ADD_WORKER取并,最终只会保留第48位,这个位置为1,同时c的低32为为0,} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}

实际上这个类中,只是做了一些准备过程,增加 count,以及加锁判断,最终还是通过createWorker 来进行。

3.5.5. createWorker

创建 worker 的具体方法。

// Creating, registering and deregistering workers/*** 创建并启动一个worker,因为前面已经做了增加count,如果此处出现异常,创建worker不成功,则在deregisterWorker中会判断如果ex不为空,且当前为创建状态的话,会重新进入tryAddWorker方法。** @return true if successful*/
private boolean createWorker() {// 创建线程的工厂方法ForkJoinWorkerThreadFactory fac = factory;Throwable ex = null;ForkJoinWorkerThread wt = null;try {// 如果工厂方法不为空,则用这个工厂方法创建线程,之后再启动线程,此时newThread将与workQueue绑定if (fac != null && (wt = fac.newThread(this)) != null) {wt.start();return true;}// 如果创建失败,出现了异常 则ex变量有值} catch (Throwable rex) {ex = rex;}deregisterWorker(wt, ex);return false;
}

此方法只是创建了一个 ForkJoinThread,实际上 worker 还是没有创建。实际上这个创建过程是再newThread(this) 中来进行的。

public class ForkJoinWorkerThread extends Thread {final ForkJoinPool pool;                // the pool this thread works infinal ForkJoinPool.WorkQueue workQueue; // work-stealing mechanicsprotected ForkJoinWorkerThread(ForkJoinPool pool) {// Use a placeholder until a useful name can be set in registerWorkersuper("aForkJoinWorkerThread");this.pool = pool;this.workQueue = pool.registerWorker(this);}
}

再线程创建成功之后调用 registerWorker 与之绑定。如果线程创建失败或者出现异常就要调用deregisterWorker 对 count 进行清理或者解除绑定。

3.5.6. registerWorker

实际上是这个方法,完成 worker 的创建和绑定。

/*** 创建线程,并建立线程与workQueue的关系,此处只会在workQueues数组的奇数位操作** @param wt the worker thread* @return the worker's queue*/
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {UncaughtExceptionHandler handler;// 将线程设置为守护线程wt.setDaemon(true);                           // configure thread// 如果没有handler则抛出异常if ((handler = ueh) != null)wt.setUncaughtExceptionHandler(handler);// 创建一个workQueue,此时owoner为当前输入的ForkJoinThreadWorkQueue w = new WorkQueue(this, wt);// 定义i为0int i = 0;                                    // assign a pool index// 定义mode int mode = config & MODE_MASK;// 加锁int rs = lockRunState();try {WorkQueue[] ws; int n;                    // skip if no array// 如果workQueues存在,且长度大于0if ((ws = workQueues) != null && (n = ws.length) > 0) {// 通过魔数计算int s = indexSeed += SEED_INCREMENT;  // unlikely to collide// m为n-1,而n为数组的初始化长度,第一次创建的时候,n=16,那么m为15int m = n - 1;// 将s左移然后最后一位补上1,之后与奇数m求并集,那么得到的结果必然是奇数i = ((s << 1) | 1) & m;               // odd-numbered indices// 判断i位置是否为空 如果为空,出现了碰撞,则计算步长向后移动 这个步长一定是偶数if (ws[i] != null) {                  // collisionint probes = 0;                   // step by approx half n// 最小步长为2  n是数组长度,比为偶数,那么如果n小于等于4,则步长为2,反之,则将n右移,将偶数最后一位的0清除,之后再和EVENMASK求并,这样就相当于将原来的长度缩小2倍,并确保是偶数。之后再加上2。那么假定n为16的话,此处计算的step就为10int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;// 之后再通过while循环,继续判断增加步长之后是否碰撞,如果碰撞,则继续增加步长while (ws[i = (i + step) & m] != null) {// 如果还是碰撞,且probes增加1之后大于长度n,则会触发扩容,workQueues会扩大2倍 这个probes感觉意义不大if (++probes >= n) {workQueues = ws = Arrays.copyOf(ws, n <<= 1);m = n - 1;// 将probes置为0probes = 0;}}}// 设置随机数seedw.hint = s;                           // use as random seed// 修改configw.config = i | mode;// 修改scanState为iw.scanState = i;                      // publication fence// 将w设置到i处ws[i] = w;}} finally {// cas的方式进行解锁unlockRunState(rs, rs & ~RSLOCK);}// 此处设置线程namewt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));return w;
}

在 registerWorker 中,对工作线程和 workQueue 进行绑定,并设置到 workQueues 的奇数索引处。注意这里计算得到奇数索引的算法。

// 将s左移然后最后一位补上1,之后与奇数m求并集,那么得到的结果必然是奇数。
i = ((s << 1) | 1) & m;

这个位操作,是我们非常值得注意的地方。

注册 worker 的逻辑就是在 workqueue 数组里找一个空的槽位给当前的 worker,如果超过了自旋的上限就扩容。

3.5.7. deregisterWorker

如果线程创建没有成功,那么 count 需要回收。以及进行一些清理工作。

/*** 此方法的主要目的是在创建worker或者启动worker失败之后的回调方法,此时将之前的ctl中增加的count减去。** @param wt the worker thread, or null if construction failed* @param ex the exception causing failure, or null if none*/
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {WorkQueue w = null;// 如果workQueue和thread不为空if (wt != null && (w = wt.workQueue) != null) {WorkQueue[] ws;                           // remove index from array// 根据config计算indexint idx = w.config & SMASK;// 加锁int rs = lockRunState();// 如果ws不为空且length大于idx同时idx处的worker就是workerQueue 则将该idx处的worker移除if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)ws[idx] = null;// 解锁 修改rs状态 unlockRunState(rs, rs & ~RSLOCK);}// 后续对count减少long c;                                       // decrement counts// 死循环 cas的方式将ctl修改do {} while (!U.compareAndSwapLong(this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |(TC_MASK & (c - TC_UNIT)) |(SP_MASK & c))));// 入果workQueue不为空  将其中的task取消                              if (w != null) {w.qlock = -1;                             // ensure setw.transferStealCount(this);w.cancelAll();                            // cancel remaining tasks}// 死循环 如果为停止状态则配合停止for (;;) {                                    // possibly replaceWorkQueue[] ws; int m, sp;if (tryTerminate(false, false) || w == null || w.array == null ||(runState & STOP) != 0 || (ws = workQueues) == null ||(m = ws.length - 1) < 0)              // already terminatingbreak;if ((sp = (int)(c = ctl)) != 0) {         // wake up replacementif (tryRelease(c, ws[sp & m], AC_UNIT))break;}else if (ex != null && (c & ADD_WORKER) != 0L) {tryAddWorker(c);                      // create replacementbreak;}else                                      // don't need replacementbreak;}// 异常处理if (ex == null)                               // help clean on way outForkJoinTask.helpExpungeStaleExceptions();else                                          // rethrowForkJoinTask.rethrow(ex);
}

至此,我们通过外部线程 push 之后,将任务提交到 submissionQueue 队列之后,会根据并行度以及工作线程的需要创建 workQueue。唤醒工作线程进行窃取的操作就完成了,外部线程的调用就结束了,会回到 main 方法中去等待结果。

3.5.8. 外部线程提交过程总结

上述代码调用的过程,我们可以形象的用如下图进行表示:

最开始,workQueues 是 null 状态。在第一次执行的时候,externalSubmit 方法中会初始化这个数组。

在这之后,还是在 externalSubmit 方法的 for 循环中,完成对任务队列的创建,将任务队列创建在偶数索引处。之后将任务写入这个队列:

此后,任务添加完成,但是没有工作队列进行工作。因此在这之后调用 signalWork,通知工作队列开始干活。但是在这个方法执行的过程中,由于工作队列并不存在,没有 worker,所以调用tryAddWorker 开始创建 worker。在 createWorker 会创建一个 worker 线程:

但是 workQueue 还没创建。这是在 newThread 之后,通过 registerWorker 创建的,在registerWorker 方法中,会在奇数位创建一个 workQueue,并将此前创建的线程与之绑定。这样一个 worker 就成功创建了。 

这样就完成了 worker 创建的全过程。

3.6. workQueue 的工作过程(任务执行)

    在 workQueue 创建完成之后,下一步,这些线程的 run 方法调用后被启动。之后就进入了worker 线程的生命周期了。实际上 run 方法如下:

public void run() {if (workQueue.array == null) { // only run onceThrowable exception = null;try {onStart();pool.runWorker(workQueue);} catch (Throwable ex) {exception = ex;} finally {try {onTermination(exception);} catch (Throwable ex) {if (exception == null)exception = ex;} finally {pool.deregisterWorker(this, exception);}}}
}

重点执行的是 runWorker,此时一旦出错,可以调用 deregisterWorker 方法进行清理。下面来看看 runWorker 的详细过程。

3.6.1. runWorker

这是 worker 工作线程的执行方法。通过死循环,不断 scan 是否有任务,之后窃取这个任务进行执行。

/*** 通过调用线程的run方法,此时开始最外层的runWorker*/
final void runWorker(WorkQueue w) {// 初始化队列,这个方法会根据任务进行判断是否需要扩容w.growArray();                   // allocate queue// hint是采用的魔数的方式增加int seed = w.hint;               // initially holds randomization hint// 如果seed为0 则使用1int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift// 死循环for (ForkJoinTask<?> t;;) {// 调用scan方法 对经过魔数计算的r 之后开始进行窃取过程 如果能够窃取 则task不为空if ((t = scan(w, r)) != null)// 运行窃取之后的taskw.runTask(t);// 反之则当前线程进行等待else if (!awaitWork(w, r))break;r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift}
}

通过 scan 方法从其他队列获得任务。

3.6.2. scan

/*** 通过scan方法进行任务窃取,扫描从一个随机位置开始,如果出现竞争则通过魔数继续随机移动,反之则线性移动,直到所有队列上的相同校验连续两次出现为空,则说明没有任何任务可以窃取,因此worker会停止窃取,之后重新扫描,如果找到任务则重新激活,否则返回null,扫描工作应该尽可能少的占用内存,以减少对其他扫描线程的干扰。** @param w the worker (via its WorkQueue)* @param r a random seed* @return a task, or null if none found*/
private ForkJoinTask<?> scan(WorkQueue w, int r) {WorkQueue[] ws; int m;// 如果workQueues不为空且长度大于1,当前workQueue不为空if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {// ss为扫描状态int ss = w.scanState;                     // initially non-negative// for循环 这是个死循环  origin将r与m求并,将多余位去除。然后赋值给kfor (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;int b, n; long c;// 如果k处不为空if ((q = ws[k]) != null) {// 如果task大于0if ((n = (b = q.base) - q.top) < 0 &&(a = q.array) != null) {      // non-empty// 计算ilong i = (((a.length - 1) & b) << ASHIFT) + ABASE;// 得到i处的task if ((t = ((ForkJoinTask<?>)U.getObjectVolatile(a, i))) != null &&q.base == b) {// 如果扫描状态大于0if (ss >= 0) {// 更改a中i的值为空 也就是此处将任务窃取走了if (U.compareAndSwapObject(a, i, t, null)) {// 将底部的指针加1q.base = b + 1;// 如果n小于-1 则通知工作线程工作if (n < -1)       // signal otherssignalWork(ws, q);// 将窃取的task返回return t;}}// 如果 scan状态小于0 则调用tryRelease方法唤醒哪些wait的workerelse if (oldSum == 0 &&   // try to activatew.scanState < 0)// 调用tryRelease方法 后续详细介绍tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);}// 如果ss小于0 if (ss < 0)                   // refresh// 更改ssss = w.scanState;r ^= r << 1; r ^= r >>> 3; r ^= r << 10;origin = k = r & m;           // move and rescanoldSum = checkSum = 0;continue;}checkSum += b;}// 此处判断k,k在此通过+1的方式完成对原有workQueues的遍历if ((k = (k + 1) & m) == origin) {    // continue until stableif ((ss >= 0 || (ss == (ss = w.scanState))) &&oldSum == (oldSum = checkSum)) {if (ss < 0 || w.qlock < 0)    // already inactivebreak;int ns = ss | INACTIVE;       // try to inactivatelong nc = ((SP_MASK & ns) |(UC_MASK & ((c = ctl) - AC_UNIT)));w.stackPred = (int)c;         // hold prev stack topU.putInt(w, QSCANSTATE, ns);if (U.compareAndSwapLong(this, CTL, c, nc))ss = ns;elsew.scanState = ss;         // back out}checkSum = 0;}}}return null;
}

此方法最大的特点就是,根据随机数计算一个k,然后根据 k 去遍历 workQueues,之后看看这个位置是否有数据,如果不为空,则检查 checkSum,根据 checkSum 的状态确认是否从这个队列中取数据。按之前约定的 FIFO 或者 LIFO 取数。这意味着,工作队列对窃取和是否获得本队列中的任务之间并没有优先级,而是根据随机数得到的 index,之后对数组进行遍历。

扫描整个队列连续出现两次扫描的 checkSum 的值相同,说明所有的队列都是空的了,需要去激活当前的队列。因为两次 checkSum 的值相同说明两次都遍历了所有的队列的 base,也就是都是线性的增加 k 的值,如果有的队列有元素发生竞争失败了会随机移动下标,很大概率不会形成两次一样 checkSum 的。

如果 scan 没有扫描到任务会将这个队列失活,并放入将队列的 scanState 字段方法 ctl 的低32位,替换原来的值并将原来的值放入当前队列的 stackPred 字段构成一个栈。scan 没有扫描到任务返回后,runWork 方法会调用 awaitWork 方法阻塞线程。

3.6.3. tryRelease

对于执行完成的 worker,则需要进行释放。

/*** 如果worker处于空闲worker Stack的workQueue的顶部。则发信号对其进行释放。** @param c incoming ctl value* @param v if non-null, a worker* @param inc the increment to active count (zero when compensating)* @return true if successful*/
private boolean tryRelease(long c, WorkQueue v, long inc) {// sp取c的低位,计算vsint sp = (int)c, vs = (sp + SS_SEQ) & ~INACTIVE; Thread p;// 如果v不为空 且v的sancState为sp if (v != null && v.scanState == sp) {          // v is at top of stack// 计算nclong nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred);// 采用cas的方式 将ctl改为ncif (U.compareAndSwapLong(this, CTL, c, nc)) {// 修改scanState的状态v.scanState = vs;// 如果此时这线程为park状态,则调用unparkif ((p = v.parker) != null)U.unpark(p);return true;}}return false;
}

3.6.4. awaitWork

    如果上述 scan 不到任务呢?也就是说,scan 方法没有拿到 task,则会调用 awaitWork。将当前的线程进行阻塞。

/*** 如果不能窃取到任务,那么就将worker阻塞。如果停用导致线程池处于静止状态,则检查是否要关闭,如果这不是唯一的工作线程,则等待给定的持续时间,达到超时时间后,如果ctl没有更改,则将这个worker终止,之后唤醒另外一个其他的worker对这个过程进行重复。** @param w the calling worker* @param r a random seed (for spins)* @return false if the worker should terminate*/
private boolean awaitWork(WorkQueue w, int r) {// 如果w不为空切w的qlock小于0 则直接返回false if (w == null || w.qlock < 0)                 // w is terminatingreturn false;// for循环,这是个死循环,定义predfor (int pred = w.stackPred, spins = SPINS, ss;;) {// 如果ss大于0 则返回if ((ss = w.scanState) >= 0)break;// 如果spins大于0 else if (spins > 0) {// 计算rr ^= r << 6; r ^= r >>> 21; r ^= r << 7;// 如果r大于0 且spins为0if (r >= 0 && --spins == 0) {         // randomize spinsWorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;// 如果pred不为0 且ws不为空if (pred != 0 && (ws = workQueues) != null &&// j<ws.length(j = pred & SMASK) < ws.length &&// j位置处不为空(v = ws[j]) != null &&        // see if pred parking// 并且没有park(v.parker == null || v.scanState >= 0))// 继续在for循环中自旋spins = SPINS;                // continue spinning}}// 如果qlock小于0  返回falseelse if (w.qlock < 0)                     // recheck after spinsreturn false;// 如果被中断else if (!Thread.interrupted()) {long c, prevctl, parkTime, deadline;int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);// 如果线程池停止if ((ac <= 0 && tryTerminate(false, false)) ||(runState & STOP) != 0)           // pool terminatingreturn false;// 最后的等待 获取不到任务 此时采用超时等待  park的方式进行if (ac <= 0 && ss == (int)c) {        // is last waiterprevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);int t = (short)(c >>> TC_SHIFT);  // shrink excess sparesif (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))return false;                 // else use timed waitparkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;}elseprevctl = parkTime = deadline = 0L;// 获取线程Thread wt = Thread.currentThread();// 设置PARKBLOCKERU.putObject(wt, PARKBLOCKER, this);   // emulate LockSupportw.parker = wt;// 调用park方法if (w.scanState < 0 && ctl == c)      // recheck before parkU.park(false, parkTime);U.putOrderedObject(w, QPARKER, null);U.putObject(wt, PARKBLOCKER, null);// 如果scanState大于0 退出if (w.scanState >= 0)break;// 如果已经达到deadline的时间 则返回falseif (parkTime != 0L && ctl == c &&deadline - System.nanoTime() <= 0L &&U.compareAndSwapLong(this, CTL, c, prevctl))return false;                     // shrink pool}}return true;
}

上述这些方法,如果返回了 false,则会导致外层的阻塞不能阻塞住,在 runWorker 方法中,会退出循环,这个线程就会退出。只有返回了 true,那么这个线程就会一直运行,获得任务循环。

3.6.5. workQueue的执行过程总结

    当 workQueue 上的 thread 启动之后,这个线程就会调用 runWorker 方法。之后再 runWorker方法中有一个死循环,不断的通过 scan 方法去扫描任务,实际上就是执行窃取过程。如下图所示,这样通过遍历外层 workQueues 的方式将会从任务队列中窃取任务进行执行。

当执行之后,会通过 fork 产生新的任务,将这些任务添加到工作队列中去。其他线程继续 scan,执行。这个过程不断循环,直到任务全部都执行完成,这样就完成了整个计算过程。

3.7. join

public final V join() {int s;if ((s = doJoin() & DONE_MASK) != NORMAL)reportException(s); // 可能被取消或发生异常return getRawResult();
}private int doJoin() {int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;return (s = status) < 0 ? s :((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?(w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s :wt.pool.awaitJoin(w, this, 0L) :externalAwaitDone();// 是否已执行完// 是 直接返回任务状态// 否 当前线程是否是ForkJoinWorkerThread// 是 执行workQueue的tryUnPush方法和doExec方法。这里的意思是移除在top的当前任务,然后自己主动执行// 移除成功 返回任务状态// 移除失败 调用awaitJoin方法// 否 执行externalAwaitDone
}final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {int s = 0;if (task != null && w != null) {// 将当前任务变为队列的正在join的任务,先前的放到task的pervJoin,形成一个栈。ForkJoinTask<?> prevJoin = w.currentJoin;U.putOrderedObject(w, QCURRENTJOIN, task);CountedCompleter<?> cc = (task instanceof CountedCompleter) ?(CountedCompleter<?>)task : null;for (;;) {// 小于0说明完成if ((s = task.status) < 0)break;// CountedCompleter任务由helpComplete来完成joinif (cc != null)helpComplete(w, cc, 0);// 如果队列为空或 执行任务没有成功则会去帮助偷窃.// 执行失败说明任务被偷了。// tryRemoveAndExec任务执行成功则会返回false// 在当前队列任务执行完了或者// 或者(在队列中没有找到这个任务且任务没有执行)// 则这个任务是被偷了,偷窃任务的可能是在join。// 所以去帮助偷窃者执行他的任务。else if (w.base == w.top || w.tryRemoveAndExec(task))helpStealer(w, task);// 如果任务执行成功则会跳出循环if ((s = task.status) < 0)break;long ms, ns;if (deadline == 0L)ms = 0L;else if ((ns = deadline - System.nanoTime()) <= 0L)break;else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)ms = 1L;// 尝试补偿,在里面有进行if (tryCompensate(w)) {// 等待task.internalWait(ms);// 活跃线程加1U.getAndAddLong(this, CTL, AC_UNIT);}}// 还原当前队列正在join的任务U.putOrderedObject(w, QCURRENTJOIN, prevJoin);}// 返回任务的状态return s;
}

join 的 task 会形成一个栈。每个 task 都会执行 fork,每次 fork 都会往自己的 workqueue 的 task数组把自己方法进去。

然后在执行 join 的时候又会把自己弹出来 tryUnpush(this) 。依次执行并形成 join 栈。这样当任务拆解到足够小的时候,执行 join 就会返回了,这样它之前的任务也会返回了。

四. 总结

    在 Java 中运行 ForkJoinPool,经过对源码的分析,实际上,需要4个类来配合运行。这四个类分别是:

  1. ForkJoinPool 这是线程池的核心类,也是提供方法供我们使用的入口类。基本上 forkJoin 的核心操作及线程池的管理方法都由这个类提供。
  2. ForkJoinPool.WorkQueue 这是 ForkJoinPool 类的内部类。也是线程池核心的组成部分。ForkJoinPool 线程池将由 WorkQueue 数组组成。为了进一步提高性能,与ThreadPoolExecutor 不一样的是,这没有采用外部传入的任务队列,而是作者自己实现了一个阻塞队列。
  3. ForkJoinWorkerThread 线程池中运行的 thread 也是作者重新定义的。这个类的目的是在于将外部的各种形式的 task 都转换为统一的 ForkJoinTask 格式。
  4. ForkJoinTask 这是 ForkJoinPool 支持运行的 task 抽象类,我们一般使用其子类如RecursiveTask 或者 RecursiveAction。这个在前文有介绍。这个类提供了任务拆分和结果合并的方法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/292006.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

百模大战中的AI行业:新趋势与未来发展

文章目录 每日一句正能量前言技术进步应用拓展行业变革人才竞争后记 每日一句正能量 人生最重要的价值是心灵的幸福&#xff0c;而不是任何身外之物。 前言 随着科技的迅猛发展&#xff0c;人工智能&#xff08;AI&#xff09;已经成为引领技术革命的重要驱动力之一。在当前的…

企业微信无法给Gmail发邮件问题

问题说明 在使用企业微信给国外客户的Gmail邮箱发信件的时候&#xff0c;邮件一直被退信&#xff0c;退信内容如下&#xff1a; 发件人&#xff08;*******.cn&#xff09;域名的DNS记录未设置或设置错误导致对方拒收此邮件。 host gmail-smtp-in.l.google.com[142.251.175.2…

最小操作次数问题

思路如下&#xff1a; 1.其他颜色变成红色球的情况:蓝色变红色需要2步 所以"2 绿色变红色需要1步 所以 绿色 2.其他颜色变成蓝色球的情况:绿色变蓝色需要2步 红色变蓝色需要1步 3.其他颜色变成绿色球的情况 红色变绿色需要2步 蓝色变绿色需要1步 代码如下&#xff1a…

pip 常用指令 pip config 命令用法介绍

&#x1f4d1;pip 常用命令归类整理 pip config 是一个用于管理本地和全局配置的命令行工具。它允许用户获取和设置所有的 pip 配置值。 命令 pip config 有以下参数 list&#xff1a;列出所有的 pip 配置值。edit&#xff1a;编辑 pip 配置文件。get&#xff1a;获取一个配…

实现linux与windows进行文件共享

目录 一.展现形式 二.场景需求 三.具体操作 1.windows访问sumba 2.Linux访问sumba 一.展现形式 支持以文件夹的形式可视化操作文件 二.场景需求 1.有一台Linux物理机 2.有一台window物理机 3.Linux已配置好sumba服务器 三.具体操作 1.windows访问sumba 首先按下winR…

Ubuntu 常用命令之 cal 命令用法介绍

&#x1f4d1;Linux/Ubuntu 常用命令归类整理 cal命令在Ubuntu系统下用于显示日历。它可以显示任何特定月份或整个年份的日历。 cal命令的参数如下 -1&#xff1a;只显示当前月份的日历。-3&#xff1a;显示前一个月、当前月和下一个月的日历。-s&#xff1a;指定日历的开始…

音视频直播核心技术

直播流程 采集&#xff1a; 是视频直播开始的第一个环节&#xff0c;用户可以通过不同的终端采集视频&#xff0c;比如 iOS、Android、Mac、Windows 等。 前处理&#xff1a;主要就是美颜美型技术&#xff0c;以及还有加水印、模糊、去噪、滤镜等图像处理技术等等。 编码&#…

别再犹豫!一键下载安装Substance3D,在数字世界中创造引人注目的艺术品!

不要再在网上浪费时间寻找Substance3D的安装包了&#xff0c;一键下载安装&#xff0c;你要的一切都可以在这里找到&#xff01; 对于那些热衷于数字艺术和建模的人来说&#xff0c;Substance3D是一个不可或缺的工具。然而&#xff0c;在网上寻找合法且可靠的软件安装包却常常…

DAY5网络

1.用select实现TCP客户端程序 #include <head.h> #define PORT 9999 // 服务器端口号 #define IP "192.168.125.74" // 服务器IP地址#define CIP "192.168.125.74" // 客户端 #define CPORT 6666int main(int argc, const char* argv[]) {// 创建套…

2023 英特尔On技术创新大会直播让我感受到英特尔AI的强大

一、2023 英特尔On技术创新大会简单回顾 直播地址&#xff1a;英特尔On技术创新大会 (intel.cn) 英特尔明确表示其“四年五个制程节点”计划正在稳步推进当中&#xff0c;并展示了其首个基于通用芯粒高速互连开放规范(UCIe)的多芯粒封装。 英特尔公布了下一代英特尔至强可扩…

大数据---34.HBase数据结构

一、HBase简介 HBase是一个开源的、分布式的、版本化的NoSQL数据库&#xff08;即非关系型数据库&#xff09;&#xff0c;依托Hadoop分布式文件系统HDFS提供分布式数据存储&#xff0c;利用MapReduce来处理海量数据&#xff0c;用Zookeeper作为其分布式协同服务&#xff0c;一…

html之如何设置音频和视频

文章目录 前言一、音频标签&#xff1a;audio1.audio简介2.常用属性controlsautoplayloop代码演示&#xff1a; 二、视频标签&#xff1a;video1.video2.常用的视频元素controlsautoplayloop代码演示&#xff1a; 总结视频元素总结音频元素总结 前言 html中插入音频和视频的方…