CompletableFuture 超时功能有大坑!使用不当直接生产事故!

news/2025/2/2 21:04:22/文章来源:https://www.cnblogs.com/HuaTalkHub/p/18697082

CompletableFuture 超时功能有大坑!使用不当直接生产事故!

本文未经允许禁止转载!

上一篇文章《如何实现超时功能(以CompletableFuture为例)》中我们讨论了 CompletableFuture 超时功能的具体实现,从整体实现来说,JDK21前的版本有着内存泄露的bug,不过很少对实际生产有影响,因为任务的编排涉及的对象并不多,少量内存泄露最终会被回收掉。从单一功能内聚的角度来说,超时功能的实现是没有问题;然而由于并发编程的复杂性,可能会出现 Delayer 线程延迟执行的情况。本文将详细复现与讨论 CompletableFuture 超时功能的大坑,同时提供一些最佳实践指导。

2024年9月8日更新:CFFU 开源项目负责人李鼎(Jerry Lee) 更新了代码示例,点击这里查看。

1. 问题复现

感谢 CFFU 开源项目负责人李鼎(Jerry Lee) 提供代码:

public class CfDelayDysfunctionDemo {public static void main(String[] args) {dysfunctionDemo();System.out.println();cffuOrTimeoutFixDysfunctionDemo();}private static void dysfunctionDemo() {logWithTimeAndThread("dysfunctionDemo begin");final long tick = System.currentTimeMillis();final List<CompletableFuture<?>> sequentCfs = new ArrayList<>();CompletableFuture<Integer> incomplete = new CompletableFuture<>();CompletableFuture<?> cf = incomplete.orTimeout(100, TimeUnit.MILLISECONDS).handle((v, ex) -> {logWithTimeAndThread("[1] timout");sleep(1000);return null;});sequentCfs.add(cf);cf = incomplete.orTimeout(100, TimeUnit.MILLISECONDS).handle((v, ex) -> {logWithTimeAndThread("[2] timout");sleep(1000);return null;});sequentCfs.add(cf);cf = incomplete.orTimeout(100, TimeUnit.MILLISECONDS).handle((v, ex) -> {logWithTimeAndThread("[3] timout");sleep(1000);return null;});sequentCfs.add(cf);CompletableFuture.allOf(sequentCfs.toArray(CompletableFuture[]::new)).join();logWithTimeAndThread("dysfunctionDemo end in " + (System.currentTimeMillis() - tick) + "ms");}private static void cffuOrTimeoutFixDysfunctionDemo() {logWithTimeAndThread("cffuOrTimeoutFixDysfunctionDemo begin");final long tick = System.currentTimeMillis();final List<CompletableFuture<?>> sequentCfs = new ArrayList<>();CompletableFuture<Integer> incomplete = new CompletableFuture<>();CompletableFuture<?> cf = CompletableFutureUtils.cffuOrTimeout(incomplete, 100, TimeUnit.MILLISECONDS).handle((v, ex) -> {logWithTimeAndThread("[1] timout");sleep(1000);return null;});sequentCfs.add(cf);cf = CompletableFutureUtils.cffuOrTimeout(incomplete, 100, TimeUnit.MILLISECONDS).handle((v, ex) -> {logWithTimeAndThread("[2] timout");sleep(1000);return null;});sequentCfs.add(cf);cf = CompletableFutureUtils.cffuOrTimeout(incomplete, 100, TimeUnit.MILLISECONDS).handle((v, ex) -> {logWithTimeAndThread("[3] timout");sleep(1000);return null;});sequentCfs.add(cf);CompletableFuture.allOf(sequentCfs.toArray(CompletableFuture[]::new)).join();logWithTimeAndThread("cffuOrTimeoutFixDysfunctionDemo end in " + (System.currentTimeMillis() - tick) + "ms");}private static void logWithTimeAndThread(String msg) {System.out.printf("%tF %<tT.%<tL [%s] %s%n",System.currentTimeMillis(), Thread.currentThread().getName(), msg);}
}

执行结果如下:

image.png

代码思路是这样的:有3个运行1秒的任务,在超时之后运行,不切线程池(都在 Delayer 线程运行),运行了3秒,不能在设置100ms的超时后运行,因为单线程排队了。handle 方法传入的回调函数在 Delayer 线程中执行了。

示例代码中解决超时线程延迟执行的方法是使用CFFU提供的安全 timeout 方法,本文后面会分析相关源码。

2. 问题分析

为什么handle方法里的回调会在 CompletableFutureDelayScheduler 中执行?

// 这里的代码逐步深入到调用栈内部
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {return uniHandleStage(null, fn);
}private <V> CompletableFuture<V> uniHandleStage(Executor e, BiFunction<? super T, Throwable, ? extends V> f) {if (f == null) throw new NullPointerException();CompletableFuture<V> d = newIncompleteFuture();Object r;if ((r = result) == null)// 加入回调栈中后续再执行unipush(new UniHandle<T,V>(e, d, this, f));else if (e == null)// 有结果,直接执行d.uniHandle(r, f, null);else {try {e.execute(new UniHandle<T,V>(null, d, this, f));} catch (Throwable ex) {d.result = encodeThrowable(ex);}}return d;
}final <S> boolean uniHandle(Object r,BiFunction<? super S, Throwable, ? extends T> f,UniHandle<S,T> c) {S s; Throwable x;if (result == null) {try {// 此次调用中 c 为空,无需关注UniHandle,甚至不需要知道UniHandle的具体职责if (c != null && !c.claim())return false;if (r instanceof AltResult) {x = ((AltResult)r).ex;s = null;} else {x = null;@SuppressWarnings("unchecked") S ss = (S) r;s = ss;}// 执行回调completeValue(f.apply(s, x));} catch (Throwable ex) {completeThrowable(ex);}}return true;}

我们把出现问题的原因简单总结一下:

CompletionStage 中不带 async 的方法可能会在不同的线程中执行。一般情况下,如果CF的结果已经计算出来,后续的回调在调用线程中执行,如果结果没有计算出来,后续的回调在上一步计算的线程中执行。

以下是一个简化的代码示例:

@Slf4j
public class TimeoutBugDemo {public static void main(String[] args) {new CompletableFuture<Integer>().orTimeout(1, TimeUnit.SECONDS).handle((v, ex) -> {log.info("v: {}", v, ex);return -1;}).join();}
}

handle 方法传入的回调方法会在delayer线程中执行,从执行日志看也确实如此:

Task :TimeoutBugDemo.main()
11:58:53.465 [CompletableFutureDelayScheduler] INFO com.example.demo.cftimeout.TimeoutBugDemo -- v: null
java.util.concurrent.TimeoutException: null
at java.base/java.util.concurrent.CompletableFuture$Timeout.run(CompletableFuture.java:2920)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)

3. CFFU 是如何解决线程传导的?

// CFFU 代码实现
public static <C extends CompletableFuture<?>> C cffuOrTimeout(C cfThis, Executor executorWhenTimeout, long timeout, TimeUnit unit) {requireNonNull(cfThis, "cfThis is null");requireNonNull(executorWhenTimeout, "executorWhenTimeout is null");requireNonNull(unit, "unit is null");return hopExecutorIfAtCfDelayerThread(orTimeout(cfThis, timeout, unit), executorWhenTimeout);
}// 核心实现代码
private static <C extends CompletableFuture<?>> C hopExecutorIfAtCfDelayerThread(C cf, Executor executor) {CompletableFuture<Object> ret = newIncompleteFuture(cf);// use `cf.handle` method(instead of `cf.whenComplete`) and return null in order to// prevent reporting the handled exception argument of this `action` at subsequent `exceptionally`cf.handle((v, ex) -> {if (!atCfDelayerThread()) completeCf(ret, v, ex);// 使用 executor 后,CF的后续回调操作就不会在Dalayer 线程中执行了else executor.execute(() -> completeCf(ret, v, ex));return null;}).exceptionally(ex -> reportUncaughtException("handle of executor hop", ex));return (C) ret;
}private static void completeCf(CompletableFuture<Object> cf, Object value, @Nullable Throwable ex) {try {// 写入到新CF中if (ex == null) cf.complete(value);else cf.completeExceptionally(ex);} catch (Throwable t) {if (ex != null) t.addSuppressed(ex);reportUncaughtException("completeCf", t);throw t; // rethrow exception, report to caller}
}

基本思路将结果写入到新的 CompletableFuture 中,为了避免后续回调使用 Delayer 线程,改用新增的线程,保证线程传导的安全性。

提示:有时我们需要关注链式调用返回的是新值还是原有对象,比如 CompletableFuture#orTimeout 返回的是当前对象this, CFFU中返回的是新的 CompletableFuture。

4. 最佳实践的启示

  1. 使用优秀的 CompletableFuture 类库: CFFU,避免编程出错,减轻开发负担。
  2. 可参考我在《深入理解 Future, CompletableFuture, ListenableFuture,回调机制》一文中所讲的,如果使用CompletableFuture,应该尽量显示使用async*方法,同时显式传入执行器executor参数。
  3. 改为使用 Guava 中的 ListenableFuture。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/878102.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024.2.2 鲜花

P2305 [NOI2014] 购票aLIEz 決めつけばかり 自惚れを着たチープな hokori で 音荒げても 棚に隠した哀れな 恥に濡れた鏡の中 都合の傷だけひけらかして 手軽な強さで勝取る術を どれだけ磨いでも気はやつれる ふらついた思想通りだ 愛-same-CRIER 愛撫-save-LIAR Eid-聖-Risin…

昆明理工大学25考研冶金工程预计调剂145人

冶金工程考研809冶金物理化学有色冶金学有色金属冶金冶金过程及设备F002钢铁冶金学

8.数据结构

空气在他的呼吸间化作赤红烈焰,烈火在他掌中咆哮翻涌,如同猛兽般肆虐纵横,每一缕火舌都在嘶吼着征服与毁灭。他以战神的姿态掌控炽炎,以焚天煮海之势,在这场杀戮盛宴中肆意狂舞!数据结构 开题顺序: \(WHABCEI\) \(A\) CF2042D Recommendations扫描线维护 \(\le l\) 的最…

ollama mac使用

教程地址:https://www.youtube.com/watch?v=SRroLOci0CA 安装完成后,常用命令。 启动服务:ollama run deepseek-r1:8B 使用:停止服务:本文来自博客园,作者:NeverLateThanBetter,转载请注明原文链接:https://www.cnblogs.com/do-it-520/p/18697037韶华易逝,不能虚度年…

07. 文件操作

一、文件的查找我们可以使用 find 命令 从指定目录向下递归地遍历其各个子目录,将满足的文件显示在终端中。 find [搜索范围] [选项]其中,选项的可选值如下:-name 文件名:按照指定的文件名查找文件,如果不知道文件的全名,可以使用 * 进行模糊匹配。 -user 用户名:查找属…

《计算机网络》笔记——第五章 运输层

计算机网络(第7版)谢希仁目录第5章 运输层概述运输层的两个主要协议端口用户数据报协议UDPUDP的首部格式传输控制协议TCPTCP的连接可靠传输的工作原理停止等待协议连续ARQ协议TCP报文段的首部格式TCP可靠传输的实现滑动窗口超时重传时间(RTO)的选择选择确认SACKTCP的流量控制…

[Paper Reading] DeepSeek-V3 Technical Report

目录DeepSeek-V3 Technical Report解读TL;DR优势训练数据参数量Method架构MLA(Multi-Head Latent Attention)DeepSeekMoEMoEDeepSeekMoEMTP(Multi-Token Prediction)基建FP8训练部署PrefillingDecodingPre-TrainingDataLong Context ExtensionPost-TrainingSFTReinforcement Le…

表单标签3

如何点击用户名来唤醒对应光标 中for id 两者的对象一致

省选模拟4

省选模拟4 A 小丑做法,设 \(f_{S,i,j}\) 为使用边权 \(\le j\) 的边连通了集合 \(S\),里面使用了 \(i\) 个 \(a\) 的最小生成树。 转移朴素枚举,复杂度 \(O(3^nm^3)\) B 是原题。 注意到一个点走过一轮后,从父亲离开后下一次访问会完全访问。 因此可以 dfs 求得一个节点会在…

闲话 25.2.2

the Kernel Method: a collection of examples 读后感闲话 我怎么感觉我读了这个论文,还不知道 kernel method 是啥啊。 没人总结这个,可能未来要读一些新东西。 推歌:时间的彼端 by 暗猫の祝福 et al. the Kernel Method: a collection of examples 读后感 \(1.\) 第一次出…