线程池 ForkJoinPool 的任务 `RecursiveTask` 和 `RecursiveAction`

news/2025/3/21 23:16:13/文章来源:https://www.cnblogs.com/bestjosephine/p/18786063

线程池 ForkJoinPool 的任务 RecursiveTaskRecursiveAction

核心区别(一句话)

  • RecursiveTask:分治任务后需要返回计算结果(如求数组总和)
  • RecursiveAction:分治任务后不需要返回结果(如修改数组元素)

底层继承关系

ForkJoinTask
├─ RecursiveTask<V>  // 带泛型返回值
└─ RecursiveAction   // 无返回值

1. RecursiveTask (带返回值的分治任务)

特点

  • 需要实现 compute() 方法并返回结果
  • 适合需要合并子任务结果的场景(如求和、排序)

示例代码

class SumTask extends RecursiveTask<Long> {private final int[] array;private final int start, end;@Overrideprotected Long compute() {if (任务足够小) {return 直接计算; // 例如求array[start..end]的和}// 拆分任务SumTask leftTask = new SumTask(array, start, mid);SumTask rightTask = new SumTask(array, mid, end);// 并行执行子任务leftTask.fork(); rightTask.fork();// 合并结果return leftTask.join() + rightTask.join();}
}

使用场景

ForkJoinPool pool = new ForkJoinPool();
Long result = pool.invoke(new SumTask(array, 0, array.length));

2. RecursiveAction (无返回值的分治任务)

特点

  • 实现 compute() 方法但不返回结果
  • 适合直接修改共享数据结构的场景(如数组元素+1)

代码模板

class ModifyAction extends RecursiveAction {private final int[] array;private final int start, end;@Overrideprotected void compute() {if (任务足够小) {for (int i = start; i < end; i++) {array[i] += 1; // 直接修改原数组}} else {// 拆分任务ModifyAction left = new ModifyAction(array, start, mid);ModifyAction right = new ModifyAction(array, mid, end);// 并行执行invokeAll(left, right); // 更高效的触发方式}}
}

使用场景

ForkJoinPool pool = new ForkJoinPool();
pool.invoke(new ModifyAction(array, 0, array.length)); // 无返回值

关键原理(Fork/Join 框架如何运作)

  1. 任务拆分:通过 fork()invokeAll() 将任务推入工作队列
  2. 工作窃取:线程空闲时会从其他队列"偷"任务执行
  3. 结果合并RecursiveTask 通过 join() 等待子任务结果

选择依据表格

RecursiveTask RecursiveAction
返回值 有 (V 类型)
典型场景 计算总和、归并排序 修改数组、文件处理
结果处理 需要合并子任务结果 直接修改共享资源
方法签名 compute(): V compute(): void

性能陷阱提醒

  • 阈值选择:任务拆分到一定粒度后应直接计算(避免过多线程开销)
  • 避免阻塞:ForkJoinPool 的线程数有限,阻塞操作会导致池瘫痪
  • 共享数据RecursiveAction 直接修改数据结构时需注意线程安全

理解这两者的区别后,你可以更精准地根据任务是否需要结果返回,选择最适合的抽象类来提升并行效率。

高难度面试题

以下是基于 RecursiveTaskRecursiveAction 和 Fork/Join 框架设计的 10 道高难度面试题,涵盖原理、源码、性能优化和实际场景。

这些问题不仅考察 API 使用,更深入:

  • 源码理解(如 fork() 的任务推送机制)
  • 性能优化(阈值、任务粒度、线程安全)
  • 系统设计(混合任务类型、资源管理)
  • 陷阱分析(异常处理、共享状态)

1. RecursiveTaskRecursiveAction 的设计分别体现了什么设计模式?结合 ForkJoinTaskfork()join() 源码,说明它们如何实现任务的拆分与合并。

答案

  • 设计模式
    RecursiveTaskRecursiveAction 均基于 模板方法模式。抽象类 ForkJoinTask 定义了 exec() 方法,具体实现由子类的 compute() 完成。用户只需实现 compute(),框架通过模板方法控制任务执行流程。
  • 源码关键点
    • fork():将任务异步推入当前线程的工作队列(通过 ForkJoinPool#externalPush)。
    • join():阻塞等待任务完成,内部调用 doJoin(),根据任务状态(是否正常完成、是否有异常)返回结果或抛出异常。
    // ForkJoinTask 源码片段
    public final V join() {int s;if ((s = doJoin() & DONE_MASK) != NORMAL)reportException(s);return getRawResult();
    }
    

2. 以下 RecursiveTask 代码在计算大数组总和时性能极差,请指出问题并优化

protected Long compute() {if (end - start <= 1) {return (long) array[start];}int mid = (start + end) / 2;SumTask left = new SumTask(array, start, mid);SumTask right = new SumTask(array, mid, end);left.fork();right.fork();return left.join() + right.join();
}

答案
问题:代码中任务拆分到 end - start <= 1,导致任务粒度过小,产生过多子任务,线程上下文切换开销远大于计算本身。
优化

  • 设置合理的阈值(如 end - start <= 1000),直接计算小任务。
  • 使用 invokeAll() 代替显式 fork(),减少任务调度次数:
    protected Long compute() {if (end - start <= 1000) {long sum = 0;for (int i = start; i < end; i++) sum += array[i];return sum;}int mid = (start + end) >>> 1; // 避免整数溢出SumTask left = new SumTask(array, start, mid);SumTask right = new SumTask(array, mid, end);invokeAll(left, right); // 并行执行,优化调度return left.join() + right.join();
    }
    

3. 在图像处理中,需要对一个 2D 像素矩阵的每个区域并行应用滤镜。如果使用 RecursiveAction,如何设计任务拆分策略以优化内存局部性(Cache Locality)?

答案

  • 目标:减少缓存行失效,提高内存局部性。
  • 策略
    1. 将 2D 矩阵按行或块(Tile)拆分,而非逐像素拆分。
    2. 每个子任务处理连续的内存块(如 32x32 像素),减少跨缓存行的访问。
    3. 使用 RecursiveActioncompute() 递归拆分块,直到达到合理粒度。
    protected void compute() {if (blockSize <= 32) {applyFilterToBlock();} else {splitIntoFourSubBlocks();invokeAll(subBlocks);}
    }
    

4. 假设 ForkJoinPool 的某个工作线程的任务队列为空,而其他队列有任务。详细描述该线程如何通过工作窃取(Work-Stealing)获取任务,并解释为何使用双端队列(Deque)而不是普通队列。

答案

  • 窃取流程
    1. 空闲线程从其他线程的双端队列(Deque)的尾部窃取任务(pollLast())。
    2. 原线程从队列头部取任务(pop()),保证自己的任务顺序执行。
  • 为何用 Deque
    • 本地任务 LIFO:线程优先处理最新任务(栈行为),提高局部性。
    • 窃取任务 FIFO:其他线程从队列头部窃取旧任务(队列行为),平衡负载。
    // 工作线程运行逻辑(简化)
    while (task = getTaskFromLocalQueueOrSteal()) {execute(task);
    }
    

5. 以下 RecursiveAction 尝试并行初始化数组为随机值,但结果不稳定。请分析原因并修复:

class InitAction extends RecursiveAction {private final int[] array;private final int start, end;private final Random random = new Random(); // 每个任务创建独立 Random@Overrideprotected void compute() {if (end - start <= 100) {for (int i = start; i < end; i++) {array[i] = random.nextInt();}} else {// 拆分任务...}}
}

答案
问题Random 实例虽然是线程隔离的,但其种子生成算法(AtomicLong)在多线程下可能导致性能竞争,且 Random 的线性同余算法在多线程下可能生成重复序列。
修复

  • 使用 ThreadLocalRandom,每个线程独立生成随机数,无竞争。
    class InitAction extends RecursiveAction {// ... @Overrideprotected void compute() {if (end - start <= 100) {ThreadLocalRandom random = ThreadLocalRandom.current();for (int i = start; i < end; i++) {array[i] = random.nextInt();}}// ...}
    }
    

6. 如何动态调整 Fork/Join 任务拆分的阈值(Threshold),使其根据当前系统的 CPU 负载和任务类型自适应?给出一种可能的算法思路。

答案

  • 动态阈值思路
    1. 基于历史执行时间:记录子任务执行时间,若平均时间过短,增大阈值;反之减小。
    2. 基于系统负载:通过 Runtime.getRuntime().availableProcessors() 或操作系统指标(如 CPU 使用率)调整阈值。
    3. 自适应算法:类似 TCP 拥塞控制,动态试探最优阈值。
    // 示例:根据历史时间调整阈值
    class DynamicThresholdTask extends RecursiveTask<...> {private static volatile long threshold = 1000;@Overrideprotected Long compute() {long startTime = System.nanoTime();// ... 计算逻辑 ...long duration = System.nanoTime() - startTime;if (duration < 100_000) { // 时间过短,增大阈值threshold = (long)(threshold * 1.5);}// ...}
    }
    

7. 如果在 RecursiveTask 的子任务中抛出未捕获异常,主任务调用 join() 时会如何表现?结合 ForkJoinTask 的源码说明异常传播机制。

答案

  • 异常传播
    • 子任务异常会被封装为 ForkJoinTask#getException(),主任务调用 join() 时抛出 ExecutionException
    • ForkJoinTask 内部通过 setExceptionalCompletion() 标记异常状态。
  • 源码关键点
    // ForkJoinTask#reportException
    private void reportException(int s) {if (s == CANCELLED)throw new CancellationException();if (s == EXCEPTIONAL)rethrow(getThrowableException());
    }
    
  • 最佳实践:在 compute() 内捕获异常,或通过 try-catch 包裹 join()

8. Fork/Join 框架的 RecursiveTaskCompletableFuture 都可以实现并行计算。从任务编排、资源管理和适用场景的角度,分析两者的优劣。

答案

维度 ForkJoinPool (RecursiveTask) CompletableFuture
任务编排 递归分治,父任务依赖子任务结果 链式组合,支持异步回调
资源管理 固定线程数,适合 CPU 密集型 可指定线程池,适合混合任务类型
适用场景 计算密集型任务(如排序、数值计算) I/O 密集型或需要异步编排的任务
线程阻塞 避免阻塞池内线程 允许阻塞(需用自定义线程池)

9. 假设有一个计算斐波那契数的递归函数 fib(n),若将其改造为 RecursiveTask 实现,直接递归拆分会导致指数级子任务。如何优化任务拆分策略以减少任务数量?

答案

  • 直接递归的问题fib(n) 拆分为 fib(n-1)fib(n-2),导致指数级任务数。
  • 优化策略
    1. 记忆化(Memoization):缓存已计算结果,避免重复计算。
    2. 迭代式拆分:按斐波那契数列特性拆分(如矩阵快速幂),减少任务数。
    class FibTask extends RecursiveTask<Long> {private final int n;@Overrideprotected Long compute() {if (n <= 1) return (long) n;FibTask f1 = new FibTask(n - 1);f1.fork();FibTask f2 = new FibTask(n - 2);return f2.compute() + f1.join(); // 显式计算 f2,减少任务数}
    }
    

10. 在一个日志处理系统中,需要并行完成以下操作:

1) 读取并解析日志文件(I/O 密集型)
2) 对解析后的数据过滤(CPU 密集型)
3) 将结果写入数据库(I/O 密集型)
如何结合 RecursiveActionRecursiveTaskForkJoinPool 的特性设计高效并行流水线?需考虑线程阻塞和资源竞争。

答案

  • 设计要点
    1. 分离 I/O 和 CPU 任务
      • RecursiveAction 用于文件读取和数据库写入(需异步回调或结合 NIO)。
      • RecursiveTask 用于数据过滤和计算。
    2. 避免阻塞 ForkJoinPool
      • I/O 操作使用单独线程池(如 Executors.newCachedThreadPool())。
    3. 流水线设计
      // 示例伪代码
      CompletableFuture.supplyAsync(() -> readFile(), ioPool).thenApplyAsync(data -> filterData(data), forkJoinPool) // 使用 RecursiveTask.thenAcceptAsync(result -> writeToDB(result), ioPool);
      

总结

这些问题深入考察对 Fork/Join 框架的掌握程度,回答时需清晰区分场景,结合代码和设计模式展开。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/902705.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Echarts-普通地图和3D地图实现

效果图实现代码 <template><div class="app"><h1>普通地图----------------</h1><div class="map-container" ref="map_ref"></div><h1>3D地图----------------</h1><div class="map-c…

Navicat Premium 16 For Mac 激活,无限试用,非破解,官网安装程序,Mac版Navicat无限试用

Navicat Premium 16 For Mac 激活,无限试用,非破解,官网安装程序,Mac版Navicat无限试用Navicat Premium是一个可多重连线资料库的管理工具,它可以让你以单一程式同时连线到 MySQL、SQLite、Oracle、MariaDB、Mssql、及 PostgreSQL 资料库,让管理不同类型的资料库更加的方…

Vue3 slot

6.9. 【slot】 1. 默认插槽父组件中:<Category title="今日热门游戏"><ul><li v-for="g in games" :key="g.id">{{ g.name }}</li></ul></Category> 子组件中:<template><div class="item&…

英语四级计划第三天

第三天 单词阅读 Smaller Museums From Niche to Mainstream “小而精”的小众博物馆,正在出圈出彩 Chinas museum boom has continued to rise, leading to increased attention and visibility for smaller but more specialized museums. 中国的博物馆热潮持续升温,导致规…

今日总结(app链接数据库的简单实现以及AI训练学习)

所花时间:145min 代码量(行):120 博客量:14 了解到的知识点: 今天又到了周五了,如同往常一样下午进行自学测试,今天是实现一个app数据库连接 在手机上进行查询显示的任务。 因为对于AS开发我并没有进行系统的学习,只是知道大体的框架和对AI进行训练,但是 我发现了一个…

学嵌入式C语言,看这一篇就够了(6)

C语言的语句和块 C语言标准中一共提供6种语句 注意:C语言中的语句要指明执行的操作,并且没有特殊情况,语句是按照顺序执行的一般把实现某些功能的语句整合在一起,构成一个语法单元,C语言标准的语法单元也被称为块,也被称为块语句 复合语句 复合语句可以限制语句的作用范围…

P3375 【模板】KMP

P3375 【模板】KMP 题目描述 给出两个字符串 \(s_1\) 和 \(s_2\),若 \(s_1\) 的区间 \([l, r]\) 子串与 \(s_2\) 完全相同,则称 \(s_2\) 在 \(s_1\) 中出现了,其出现位置为 \(l\)。 现在请你求出 \(s_2\) 在 \(s_1\) 中所有出现的位置。 定义一个字符串 \(s\) 的 border 为 …

Cursor 使用教程

目录核心功能TabAICHATCOMPOSER@DocsWebGitNotepadCodebase技巧和思路如果Cursor乱改代码怎么办? 核心功能 Tab 这个是Cursor最为强大的功能,使用也很简单,在编写代码的时候,如果出现了灰色的提示词,直接按Tab就可以自动补全了最后的灰色的就是提示词 Cursor的光标预测也很…

C语言学习打卡第二天(2025.3.21)

时间有点少,今天也没学多少,只把指针基础概念学了一下(例如指针变量,下标法等),题也只做了三道。

3.21 学习记录

设计科技查询手机端系统,实现了政策关键字进行模糊匹配查询,点击下方的政策名称,可以打开新窗口,显示政策全文,实现分页查询,分类查询 采用springboot+vue3实现

P8436 【模板】边双连通分量

P8436 【模板】边双连通分量 题目描述 对于一个 \(n\) 个节点 \(m\) 条无向边的图,请输出其边双连通分量的个数,并且输出每个边双连通分量。 输入格式 第一行,两个整数 \(n\) 和 \(m\)。 接下来 \(m\) 行,每行两个整数 \(u, v\),表示一条无向边。 不保证图为简单图,图中可…