1. 前言
在实际做项目中,我们经常使用多线程、异步的来帮我们做一些事情。
比如用户抽取奖品,异步的给他发一个push。
又比如一段前后不相关的业务逻辑,原本是顺序执行,耗时=(A + B + C),现在使用多线程加快执行速度,耗时=Max(A, B, C)。
这时候很多时候为了方便,我们就直接使用CompletableFuture
来处理,但它真的好多坑,让我们一一细说。
2. CompletableFuture原理
2.1 CompletableFuture API
在CompletableFuture中提交任务有以下几种方式
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
这四个方法都是用来提交任务的,不同的是supplyAsync提交的任务有返回值,runAsync提交的任务没有返回值。两个接口都有一个重载的方法,第二个入参为指定的线程池,如果不指定,则默认使用ForkJoinPool.commonPool()线程池。
2.2 ForkJoinPool
Fork/Join 框架是一种并行计算框架,设计目的是提高具有递归性质任务的执行速度。典型的任务是将问题逐步分解成较小的任务,直到每一个子任务足够简单可以直接解决,然后再将结果聚合起来。
Fork/Join 框架基于"工作窃取"算法 (Work Stealing Algorithm),该算法的核心思想是每个工作线程有自己的任务队列(双端队列, Deque
)。当一个线程完成了自己队列中的任务时,便会窃取其他线程队列中的任务执行,这样就不会因为某个线程在等待而浪费 CPU 资源。
Fork/Join 框架非常适合以下这些工作负载:
- 递归任务:如斐波那契数列、归并排序等分治算法。
- 大规模数据处理:快速对集合、数组等进行并行操作。
- 图像处理:图像处理等数据量大的任务可以被分成多个小任务并行处理。
也就是说ForkJoinPool比较适用于CPU密集型,而不太适合于IO密集型。但是我们业务中大多数都是IO密集型,比如等待数据库的返回,等待下游RPC的返回,等待子方法的返回等等
2.3 ForkJoinPool在CompletableFuture中的应用
先说结论:
-
如果你在使用
CompletableFuture
没有指定线程池,就会使用默认的ForkJoinPool
-
CompletableFuture是否使用默认线程池的依据,和机器的CPU核心数有关。当CPU核心数-1大于1时,才会使用默认的线程池,否则将会为每个CompletableFuture的任务创建一个新线程去执行。
-
如果你的CPU核心数为4核,那么也就是最多也只有3个核心线程(3个线程,你确定够用?)
3. CompletableFuture坑
3.1 ForkJoinPool线程不够用,处于等待状态
小明为了加快代码的运行,将原来的A+B+C的运行逻辑,改成了(A,B,C)的运行逻辑,使用了3个CompletableFuture
来执行,耗时从原本的900ms,缩短到了300ms,简单代码如下:
public void test1() {a();b();c();
}public void test2() {CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> a());CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> b());CompletableFuture<Integer> f3 = CompletableFuture.supplyAsync(() -> c());
}
上线后,小明心情良好,坐等升职加薪,没想到第二天却遇到了线上告警,接口频繁的超时,比之前还要慢,有些达到了10s+,小明实在想不明白。
后来排查发现,在项目中有大量使用到CompletableFuture.supplyAsync
的地方,而每台机器8核,也就是7个线程,根本不够用,因此大量都是在等待线程中度过,因此耗时越来越严重,最终形成了雪崩,接口直接无限超时。
回答:使用CompletableFuture
必须做到线程池隔离,不能使用默认的ForkJoinPool
线程池
3.2 CompletableFuture反而更慢了?
小明经过这次事件学聪明了,使用CompletableFuture
都自己写一个线程池。过了几天线上又告警出来了,大量接口超时,小明又蒙逼了,小明的代码如下:
ExecutorService es = Executors.newFixedThreadPool(5);
public void test1() {CompletableFuture.runAsync(a(1), es);CompletableFuture.runAsync(b(1), es);CompletableFuture.runAsync(c(1), es);
}
后来排查发现,springmvc tomcat默认线程池是200,而你的线程池只有5个,也就是说,当接口请求了攀升。
比如现在有200个请求过来,执行到test1的时候,如果不使用线程池,反而没任何问题。但是使用到了线程池,5个线程池根本不够用,等待线程的释放,那么会越来越慢,最终拖垮整个服务。
3.3 CompletableFuture死锁?
小明说我再也不使用CompletableFuture
了,小明说我直接调大线程池到200,那肯定没问题了,读者们思考下是否可行。答案是绝对不可行的,核心线程设置那么大,对cpu消耗非常严重,一定要设置合理的范围内。
再来看一个死锁问题,终于不是小明的锅了,这次轮到了小红,以下是死锁的代码:
ExecutorService es = Executors.newFixedThreadPool(5);
public void test() {for (int i = 0; i < 5; i++) {CompletableFuture.runAsync(() -> a(), es); }
}public void a() {CompletableFuture<Integer> f = CompletableFuture.supplyAsync(() -> 1, es);try {f.get();} catch (Exception e) {}
}
这里不卖关子了。由于是5个线程,在test方法中,将5个线程全部使用,然后test调用子方法a的时候。由于共用同一个线程池es,a方法永远获取不到线程池,a方法永远不可能执行成功,那么test方法也永远执行不了成功,那么就会处于永远阻塞死锁的这么一个线程。
因此解决办法就是,不同的业务尽量不要使用同一个线程池,为自己业务定制自己的线程池,而不是为了方便,共用一个commonPool。
4. 最后
通过以上对CompletableFuture
的分析,以及一些实际踩坑的案例,相信你对CompletableFuture
用法更加的了解了。
最后还是想说明一点,在业务代码中,能不使用多线程就不使用多线程,因为它带来的副作用远远比带来的好处要多的多得多,除非你非常清楚其中的原理。
CompletableFuture
你真的懂了么?欢迎评论区留言讨论。