多线程系列(二十一) -ForkJoin使用详解

一、摘要

从 JDK 1.7 开始,引入了一种新的 Fork/Join 线程池框架,它可以把一个大任务拆成多个小任务并行执行,最后汇总执行结果。

比如当前要计算一个数组的和,最简单的办法就是用一个循环在一个线程中完成,但是当数组特别大的时候,这种执行效率比较差,例如下面的示例代码。

long sum = 0;
for (int i = 0; i < array.length; i++) {sum += array[i];
}
System.out.println("汇总结果:" + sum);

还有一种办法,就是将数组进行拆分,比如拆分成 4 个部分,用 4 个线程并行执行,分别计算,最后进行汇总,这样执行效率会显著提升。

如果拆分之后的部分还是很大,可以继续拆,直到满足最小颗粒度,再进行计算,这个过程可以反复“裂变”成一系列小任务,这个就是 Fork/Join 的工作原理。

Fork/Join 采用的是分而治之的基本思想,分而治之就是将一个复杂的任务,按照规定的阈值划分成多个简单的小任务,然后将这些小任务的执行结果再进行汇总返回,得到最终的执行结果。分而治之的思想在大数据领域应用非常广泛。

下面我们一起来看看 Fork/Join 的具体用法。

二、ForkJoin 用法介绍

以计算 2000 个数字组成的数组为例,进行并行求和, Fork/Join 简单的应用示例如下:

public class ForkJoinTest {public static void main(String[] args) throws Exception {// 创建2000个数组成的数组long[] array = new long[2000];// 记录for循环汇总计算的值long sourceSum = 0;for (int i = 0; i < array.length; i++) {array[i] = i;sourceSum += array[i];}System.out.println("for循环汇总计算的值: " + sourceSum);System.out.println("---------------");// fork/join汇总计算的值ForkJoinPool forkJoinPool = new ForkJoinPool();ForkJoinTask<Long> taskFuture = forkJoinPool.submit(new SumTask(array, 0, array.length));System.out.println("fork/join汇总计算的值: " + taskFuture.get());}
}
public class SumTask extends RecursiveTask<Long> {/*** 最小任务数组最大容量*/private static final int THRESHOLD = 500;private long[] array;private int start;private int end;public SumTask(long[] array, int start, int end) {this.array = array;this.start = start;this.end = end;}@Overrideprotected Long compute() {// 检查任务是否足够小,如果任务足够小,直接计算if (end - start <= THRESHOLD) {long sum = 0;for (int i = start; i < end; i++) {sum += this.array[i];}return sum;}// 任务太大,一分为二int middle = (end + start) / 2;// 拆分执行SumTask leftTask = new SumTask(this.array, start, middle);leftTask.fork();SumTask rightTask = new SumTask(this.array, middle, end);rightTask.fork();System.out.println("进行任务拆分,leftTask数组区间:" + start + "," + middle + ";rightTask数组区间:" + middle + "," + end);// 汇总结果return leftTask.join() +  rightTask.join();}
}

输出结果如下:

for循环汇总计算的值: 1999000
---------------
进行任务拆分,leftTask数组区间:0,1000;rightTask数组区间:1000,2000
进行任务拆分,leftTask数组区间:1000,1500;rightTask数组区间:1500,2000
进行任务拆分,leftTask数组区间:0,500;rightTask数组区间:500,1000
fork/join汇总计算的值: 1999000

从日志上可以清晰的看到,for 循环方式汇总计算的结果与Fork/Join方式汇总计算的结果一致。

因为最小任务数组最大容量设置为500,所以Fork/Join对数组进行了三次拆分,过程如下:

  • 第一次拆分,将0 ~ 2000数组拆分成0 ~ 10001000 ~ 2000数组
  • 第二次拆分,将0 ~ 1000数组拆分成0 ~ 500500 ~ 1000数组
  • 第三次拆分,将1000 ~ 2000数组拆分成1000 ~ 15001500 ~ 2000数组
  • 最后合并计算,将拆分后的最小任务计算结果进行合并处理,并返回最终结果

当数组量越大的时候,采用Fork/Join这种方式来计算,程序执行效率优势非常明显。

三、ForkJoin 框架原理

从上面的用例可以看出,Fork/Join框架的使用包含两个核心类ForkJoinPoolForkJoinTask,它们之间的分工如下:

  • ForkJoinPool是一个负责执行任务的线程池,内部使用了一个无限队列来保存需要执行的任务,而执行任务的线程数量则是通过构造函数传入,如果没有传入指定的线程数量,默认取当前计算机可用的 CPU 核心量
  • ForkJoinTask是一个负责任务的拆分和合并计算结果的抽象类,通过它可以完成将大任务分解成多个小任务计算,最后将各个任务执行结果进行汇总处理

正如上文所说,Fork/Join框架采用的是分而治之的思想,会将一个超大的任务进行分解,按照设定的阈值分解成多个小任务计算,最后将各个计算结果进行汇总。它的应用场景非常多,比如大整数乘法、二分搜索、大数组快速排序等等。

有个地方可能需要注意一下,ForkJoinPool线程池和ThreadPoolExecutor线程池,两者实现原理是不一样的。

两者最明显的区别在于:ThreadPoolExecutor中的线程无法向任务队列中再添加一个任务并在等待该任务完成之后再继续执行;而ForkJoinPool可以实现这一点,它能够让其中的线程创建新的任务添加到队列中,并挂起当前的任务,此时线程继续从队列中选择子任务执行。

因此在 JDK 1.7 中,ForkJoinPool线程池的实现是一个全新的类,并没有复用ThreadPoolExecutor线程池的实现逻辑,两者用途不同。

3.1、ForkJoinPool

ForkJoinPoolFork/Join框架中负责任务执行的线程池,核心构造方法源码如下:

/*** 核心构造方法* @param parallelism   可并行执行的线程数量* @param factory       创建线程的工厂   * @param handler       异常捕获处理器* @param asyncMode     任务队列模式,true:先进先出的工作模式,false:先进后出的工作模式*/
public ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,boolean asyncMode) {this(checkParallelism(parallelism),checkFactory(factory),handler,asyncMode ? FIFO_QUEUE : LIFO_QUEUE,"ForkJoinPool-" + nextPoolId() + "-worker-");checkPermission();
}

默认无参的构造方法,源码如下:

public ForkJoinPool() {this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),defaultForkJoinWorkerThreadFactory, null, false);
}

默认构造方法创建ForkJoinPool线程池,关键参数设置如下:

  • parallelism:取的是当前计算机可用的 CPU 数量
  • factory:采用的是默认DefaultForkJoinWorkerThreadFactory类,其中ForkJoinWorkerThreadFork/Join框架中负责真正执行任务的线程
  • asyncMode:参数设置的是false,也就是说存在队列的任务采用的是先进后出的方式工作

其次,也可以使用Executors工具类来创建ForkJoinPool,例如下面这种方式:

// 创建一个 ForkJoinPool 线程池
ExecutorService forkJoinPool = Executors.newWorkStealingPool();

ThreadPoolExecutor线程池一样,ForkJoinPool也实现了ExecutorExecutorService接口,支持通过execute()submit()等方式提交任务。

不过,正如上面所说,ForkJoinPoolThreadPoolExecutor在实现上是不一样的:

  • ThreadPoolExecutor中,多个线程都共有一个阻塞任务队列
  • ForkJoinPool中每一个线程都有一个自己的任务队列,当线程发现自己的队列里没有任务了,就会到别的线程的队列里获取任务执行。

这样设计的目的主要是充分利用线程实现并行计算的效果,减少线程之间的竞争。

比如线程 A 负责处理队列 A 里面的任务,线程 B 负责处理队列 B 里面的任务,两者如果队列里面的任务数差不多,执行的时候互相不干扰,此时的计算性能是最佳的;假如线程 A 的任务执行完毕,发现线程 B 中的队列数还有一半没有执行,线程 A 会主动从线程 B 的队列里获取任务执行。

在这时它们会同时访问同一个队列,为了减少线程 A 和线程 B 之间的竞争,通常会使用双端队列,线程 B 从双端队列的头部拿任务执行,而线程 A 从双端队列的尾部拿任务执行,确保两者不会从同一端获取任务,可以显著加快任务的执行速度。

Fork/Join框架中负责执行任务的线程ForkJoinWorkerThread,部分源码如下:

public class ForkJoinWorkerThread extends Thread {// 所在的线程池final ForkJoinPool pool;// 当前线程下的任务队列final ForkJoinPool.WorkQueue workQueue;// 初始化时的构造方法protected ForkJoinWorkerThread(ForkJoinPool pool) {// Use a placeholder until a useful name can be set in registerWorkersuper("aForkJoinWorkerThread");this.pool = pool;this.workQueue = pool.registerWorker(this);}
}
3.2、ForkJoinTask

ForkJoinTaskFork/Join框架中负责任务分解和合并计算的抽象类,它实现了Future接口,因此可以直接作为任务类提交到线程池中。

同时,它还包括两个主要方法:fork()join(),分别表示任务的分拆与合并。

可以使用下图来表示这个过程。

ForkJoinTask部分方法,源码如下:

public abstract class ForkJoinTask<V> implements Future<V>, Serializable {// 将任务推送到任务队列public final ForkJoinTask<V> fork() {Thread t;if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)((ForkJoinWorkerThread)t).workQueue.push(this);elseForkJoinPool.common.externalPush(this);return this;}// 等待任务的执行结果public final V join() {int s;if ((s = doJoin() & DONE_MASK) != NORMAL)reportException(s);return getRawResult();}
}

在 JDK 中,ForkJoinTask有三个常用的子类实现,分别如下:

  • RecursiveAction:用于没有返回结果的任务
  • RecursiveTask:用于有返回结果的任务
  • CountedCompleter:在任务完成执行后,触发自定义的钩子函数

我们最上面介绍的用例,使用的就是RecursiveTask子类,通常用于有返回值的任务计算。

ForkJoinTask其实是利用了递归算法来实现任务的拆分,将拆分后的子任务提交到线程池的任务队列中进行执行,最后将各个拆分后的任务计算结果进行汇总,得到最终的任务结果。

四、小结

整体上,ForkJoinPool可以看成是对ThreadPoolExecutor线程池的一种补充,在工作线程中存放了任务队列,充分利用线程进行并行计算,进一步提升了线程的并发执行性能。

通过ForkJoinPoolForkJoinTask搭配使用,将超大计算任务拆分成多个互不干扰的小任务,提交给线程池进行计算,最后将各个任务计算结果进行汇总处理,得到跟单线程执行一致的结果,当计算任务越大,Fork/Join框架执行任务的效率,优势更突出。

但是并不是所有的任务都适合采用Fork/Join框架来处理,比如读写数据文件这种 IO 密集型的任务就不合适,因为磁盘 IO、网络 IO 的操作特点就是等待,容易造成线程阻塞。

五、参考

1.https://www.liaoxuefeng.com/wiki/1252599548343744/1306581226487842

2.https://juejin.cn/post/6986899215163064333

3.https://developer.aliyun.com/article/806887

六、写到最后

最近无意间获得一份阿里大佬写的技术笔记,内容涵盖 Spring、Spring Boot/Cloud、Dubbo、JVM、集合、多线程、JPA、MyBatis、MySQL 等技术知识。需要的小伙伴可以点击如下链接获取,资源地址:技术资料笔记。

不会有人刷到这里还想白嫖吧?点赞对我真的非常重要!在线求赞。加个关注我会非常感激!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/534975.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

字节跳动的 SDXL-LIGHTNING : 体验飞一般的文生图

TikTok 的母公司字节跳动推出了最新的文本到图像生成人工智能模型&#xff0c;名为SDXL-Lightning。顾名思义&#xff0c;这个新模型只需很轻量的推理步骤&#xff08;1&#xff0c;4 或 8 步&#xff09;即可实现极其快速且高质量的文本到图像生成功能。与原始 SDXL 模型相比&…

vue项目因内存溢出启动报错

前端能正常启动&#xff0c;但只要一改动就报错启动出错。 解决办法&#xff1a; 安装依赖 npm install cross-env increase-memory-limit 然后再做两件事&#xff1a;在node 在package.json 里的 script 里进行配置 LIMIT是你想分配的内存大小&#xff0c;这里的8192单位…

基于最小二乘递推算法的系统参数辨识matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.本算法原理 5.完整程序 1.程序功能描述 基于最小二乘递推算法的系统参数辨识。对系统的参数a1&#xff0c;b1&#xff0c;a2&#xff0c;b2分别进行估计&#xff0c;计算估计误差以及估计收敛曲线&#…

人力资源管理软件好处:提升效率利器 !为什么选择合适软件很重要

高效运用合适的人力资源管理软件对企业发展大有裨益&#xff0c;下面我将详解运用适宜的人力资源管理软件对企业发展有什么好处&#xff0c;以及企业挑选适宜的人力资源软件应考虑的关键步骤。 一&#xff0e;使用合适的人力资源管理软件好处分析 招聘流程的优化 通过人力资…

Python常用图片数据方法

文章目录 1. 常用图片数据类型2. 图片的显示2.1 plt.imshow()2.2 使用 turtle 来绘制图片 3.图片ndarray数据的常用切片操作使用 cv2 来读取图片打印数据R G B 通道的获取BGR 转成 RGBcv2 不支持中文路径的解决方法 4 PIL.Image 转成 QImage 或 QPixmap 1. 常用图片数据类型 使…

基于Redis实现分布式锁、限流操作(基于SpringBoot)的实现

基于Redis实现分布式锁、限流操作——基于SpringBoot实现 本文总结了一种利用Redis实现分布式锁、限流的较优雅的实现方式本文原理介绍较为通俗&#xff0c;希望能帮到有需要的人本文的demo地址&#xff1a;https://gitee.com/rederxu/lock_distributed.git 一、本文基本实现…

迪杰斯特拉算法 代码

参考链接&#xff1a; 【路径规划】全局路径规划算法——Dijkstra算法&#xff08;含python实现 | c实现&#xff09;-CSDN博客 算法图解&#xff1a; 代码 def dijkstra(matrix, source):"""迪杰斯特拉算法实现Args:matrix (_type_): 用邻接矩阵表示带权图s…

SpringBoot(源码解析 + 实现底层机制)

文章目录 1.搭建SpringBoot底层机制开发环境1.创建maven项目2.使用Git管理项目&#xff08;可以略过&#xff09;1.创建一个github存储库2.克隆到本地&#xff0c;复制文件夹的内容3.粘贴到idea项目文件夹&#xff0c;将其作为本地仓库与远程仓库关联 3.pom.xml 引入父工程和场…

AI壁纸号一周增加上千粉丝,轻松变现的成功案例分享

前言 随着AI绘画技术的发展&#xff0c;传统的互联网副业壁纸号在新的技术加持下迎来了第二春。本文将分享一位壁纸号创作者的成功案例&#xff0c;并为大家提供创作门槛和硬件要求等相关信息。 该项目的创作门槛极低&#xff0c;基本上可以由AI完成内容创作。不过&#xff0…

LM358P/LM358DR/LM358DT/LM358DR2G运算放大器中文资料PDF数据手册引脚图功能

产品概述&#xff1a; LM358B 和 LM2904B 器件是行业标准运算放大器 LM358 和 LM2904 的下一代版本&#xff0c;其中包括两个高压 (36V) 运算放大器。这些器件为成本敏感型应用提供了卓越的价值&#xff0c;其特性包括低偏移&#xff08;300V&#xff0c;典型值&#xff09;、…

C++11新特性【右值引用】

文章目录 1. 什么是左值2. 什么是右值3. 左值引用4. 左值引用使用场景5. 右值引用6. 右值引用使用场景6.1 场景16.2 场景2 7. 完美转发 1. 什么是左值 左值不能根据字面意思来理解&#xff0c;不是说在左边的就是左值&#xff0c;例如&#xff1a; int main() {int a 0;int …

Windows11安装NodeJS18并配置环境变量

从官网下载&#xff0c;或者从百度网盘下载 解压下载的zip包&#xff1a; 重命名为nodejs&#xff1a; 在nodejs中添加cache和global两个目录&#xff1a; 将nodejs和nodejs\global添加到环境变量&#xff1a; 打开终端&#xff0c;输入&#xff1a; node -v接着配置…