一、抛砖引玉——优化一个计数任务
1.1、需求:计算从1加累加到100的结果
1.1.1、普通的实现代码示例
package com.example.mavendemo.completablefuture;public class NormalTest {public static void main(String[] args) {long startTime = System.currentTimeMillis();long calculate100 = calculateNumber(1, 100);System.out.println("calculate100 =" + calculate100);System.out.println("耗时:" + (System.currentTimeMillis() - startTime) );}public static long calculateNumber(int startNumber, int endNumber){int result = 0;for (int i = startNumber; i <= endNumber; i++) {try {//睡眠5ms(为了演示效果)Thread.sleep(5L);} catch (InterruptedException e) {throw new RuntimeException(e);}result +=i;}return result;}
}
//执行结果:
calculate1000 =5050
耗时:623
1.1.2、使用java并发包中的类实现
1.1.2.1、工具类代码准备
package com.example.mavendemo.completablefuture;import java.util.StringJoiner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class CommonUtils {/*** 初始化线程池* @return*/public static ThreadPoolExecutor initThreadPoolExecutor() {int corePoolSize = Runtime.getRuntime().availableProcessors();ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2 * corePoolSize, 2 * corePoolSize, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(null, r,"custom-thread-pool",0);return t;}}, new ThreadPoolExecutor.CallerRunsPolicy());return threadPoolExecutor;}/*** 带当前时间、线程信息打印的日志方法* @param info*/public static void printThreadLog(String info){String result = new StringJoiner(" | ").add(String.valueOf(System.currentTimeMillis())).add(String.format("%s-%s", Thread.currentThread().getId(), Thread.currentThread().getName())).add(info).toString();System.out.println(result);}/*** 休眠毫秒* @param mills*/public static void sleepMills(long mills){try {Thread.sleep(mills);} catch (InterruptedException e) {throw new RuntimeException(e);}}/*** 计算数字开始和结束之前的数字累加之和* @param startNumber* @param endNumber* @return*/public static long calculateNumber(int startNumber, int endNumber){int result = 0;for (int i = startNumber; i <= endNumber; i++) {try {//睡眠5ms(为了演示效果)Thread.sleep(5L);} catch (InterruptedException e) {throw new RuntimeException(e);}result +=i;}return result;}
}
1.1.2.2、使用Future结合线程池实现的代码示例
package com.example.mavendemo.completablefuture;import com.google.common.collect.Lists;import java.util.List;
import java.util.concurrent.*;public class FutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();long startTime = System.currentTimeMillis();Long total = 0L;
// Future<Long> calculate20Future = threadPoolExecutor.submit(()-> calculateNumber(1, 20));
// Future<Long> calculate40Future = threadPoolExecutor.submit(()-> calculateNumber(21, 40));
// Future<Long> calculate60Future = threadPoolExecutor.submit(()-> calculateNumber(41, 60));
// Future<Long> calculate80Future = threadPoolExecutor.submit(()-> calculateNumber(61, 80));
// Future<Long> calculate100Future = threadPoolExecutor.submit(()-> calculateNumber(81, 100));
//
// Long calculate20 = calculate20Future.get();
// Long calculate40 = calculate40Future.get();
// Long calculate60 = calculate60Future.get();
// Long calculate80 = calculate80Future.get();
// Long calculate100 = calculate100Future.get();
// total = (calculate20 + calculate40 + calculate60 + calculate80 + calculate100) ;List<Future<Long>> futureList = Lists.newArrayList();int i = 100 /20;for (int j = 1; j <= i; j++) {int finalJ = j;futureList.add(threadPoolExecutor.submit(()-> CommonUtils.calculateNumber((finalJ -1) * 20 + 1, finalJ * 20)));}CommonUtils.printThreadLog("正在获取结果...");for (Future<Long> longFuture : futureList) {total += longFuture.get();}CommonUtils.printThreadLog("calculate100 =" + total + ", 耗时:" + (System.currentTimeMillis() - startTime) );threadPoolExecutor.shutdown();}}
//执行结果:
1712386717326 | 1-main | 正在获取结果...
1712386717449 | 1-main | calculate1000 =5050, 耗时:242
1.1.2.3、使用CompletableFuture实现的代码示例
package com.example.mavendemo.completablefuture;import com.google.common.collect.Lists;import java.util.List;
import java.util.concurrent.*;public class CompleteTableFutureTest
{public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();long startTime = System.currentTimeMillis();List<CompletableFuture<Long>> futureList = Lists.newArrayList();int i = 100 /20;for (int j = 1; j <= i; j++) {int finalJ = j;futureList.add(CompletableFuture.supplyAsync(()-> CommonUtils.calculateNumber((finalJ -1) * 20 + 1, finalJ * 20)));}CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));CommonUtils.printThreadLog("正在获取结果...");long total = 0L;for (Future<Long> longFuture : futureList) {total += longFuture.get();}CommonUtils.printThreadLog("calculate100 =" + total + ", 耗时:" + (System.currentTimeMillis() - startTime) );//销毁线程threadPoolExecutor.shutdown();}}
//执行结果:
1712386733592 | 1-main | 正在获取结果...
1712386733715 | 1-main | calculate1000 =5050, 耗时:221
一、 Future和CompletableFuture的对比
1.1、 Future的使用局限性
通过上面的代码,我们发现,Future的不足之处:
- 在没有阻塞的情况下,无法对Future的结果执行进一步的操作。Future不会告诉你它什么时候执行完成,你如果想要得到结果,只能调用get()方法,该方法会阻塞直到拿到结果。
- 不能将多个Future合并到一起。假设有多种不同的Future任务,你想在它们全部执行完成后再运行某个任务, Future很难独立完成这样的Case。
1.2、CompletableFuture的优势
CompletableFuture实现了Future和CompletionStage接口,诞生于jdk1.8。
CompletableFuture相对于Future具有以下优势:
-
为快速创建、链接以来和组合多个Future提供了大量的便利方法。
-
提供了适用于各种开发场景的回调函数,它还提供了非常全面的异常处理支持。
-
无缝衔接Lambda表达式和Stream - API。
备注:Completion /kəmˈpliːʃ(ə)n/: n,完成,结束
二、 CompletableFuture的使用
2.1、创建异步任务
2.1.1、runAsync
CompletableFuture.runAsync(…)方法,它接受一个Runnable接口的实现类对象,方法返回CompletableFuture对象, 也就是没有返回值,同时还支持自定义线程池传入。
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
需求:完成一个从1加到100的任务,然后将这个任务的结果打印出来。
方法的使用代码示例:
package com.example.mavendemo.completablefuture;import java.util.concurrent.*;public class RunAsyncTest {public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();//不指定线程池,默认使用内部线程池-forjoinPool
// CompletableFuture.runAsync(new Runnable() {
// @Override
// public void run() {
// try {
// CommonUtils.printThreadLog("线程拿到结果:" + CommonUtils.calculateNumber(1, 100));
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
// });//自定义线程池传入
// CompletableFuture.runAsync(new Runnable() {
// @Override
// public void run() {
// try {
// CommonUtils.printThreadLog("线程拿到结果:" + CommonUtils.calculateNumber(1, 100));
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
// }, threadPoolExecutor);//指定线程池 && lambda表达式写法CompletableFuture.runAsync(() -> {try {CommonUtils.printThreadLog("线程拿到结果:" + CommonUtils.calculateNumber(1, 100));} catch (Exception e) {throw new RuntimeException(e);}}, threadPoolExecutor);//主线程休眠4秒,等待CompletableTable内部线程池执行完毕。CommonUtils.printThreadLog("等待CompletableTable内部线程池执行完毕...");//手动关闭线程池threadPoolExecutor.shutdown();}}//执行结果:
1712300943769 | 1-main | 等待CompletableTable内部线程池执行完毕...
1712300949109 | 11-custom-thread-pool | 线程拿到结果:5050
使用CompletableFuture.runAsync(…)方法时,如果我想等待结果,怎么办?可以结合CountDownLatch加锁,示例如下:
package com.example.mavendemo.completablefuture;import java.util.concurrent.*;public class RunAsyncTest {public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();CountDownLatch countDownLatch = new CountDownLatch(1);//指定线程池 && lambda写法 && 使用CountDownLatch加锁CompletableFuture.runAsync(() -> {try {CommonUtils.printThreadLog("CompletableFuture拿到结果:" + CommonUtils.calculateNumber(1, 100));} catch (Exception e) {throw new RuntimeException(e);} finally {countDownLatch.countDown();}}, threadPoolExecutor);//等待CompletableTable线程池执行完毕。CommonUtils.printThreadLog("等待CompletableTable内部线程池执行完毕...");countDownLatch.await();//手动关闭线程池threadPoolExecutor.shutdown();}}
//执行结果:
1712301073706 | 1-main | 等待CompletableTable内部线程池执行完毕...
1712301079087 | 11-custom-thread-pool | CompletableFuture拿到结果:5050
2.1.2、supplyAsync
CompletableFuture.supplyAsync(…)方法,返回CompletableFuture对象, 也就是有返回值(这里的U是泛型),同时还支持自定义线程池传入。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
注意:入参Supplier表示结果的提供者, 这是一个函数式接口,里面只有一个get(), 用于给带返回值的异步任务使用。
需求:完成一个从1加到100的任务,然后将这个任务的结果打印出来。
方法的使用代码示例:
package com.example.mavendemo.completablefuture;import java.util.concurrent.*;
import java.util.function.Supplier;public class SupplyAsyncTest {public static void main(String[] args) throws InterruptedException, ExecutionException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();// CompletableFuture<Long> completableFuture = CompletableFuture.supplyAsync(new Supplier<Long>() {
// @Override
// public Long get() {
// try {
// long result = calculateNumber(100);
// return result;
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
// });//指定线程池 && lambda写法CompletableFuture<Long> completableFuture = CompletableFuture.supplyAsync(() -> {try {long result = CommonUtils.calculateNumber(1, 100);return result;} catch (Exception e) {throw new RuntimeException(e);}}, threadPoolExecutor);System.out.println("等待CompletableTable执行完毕...");//等待CompletableTable线程池执行完毕(get()会阻塞直到拿到结果)Long summary = completableFuture.get();System.out.println("CompletableTable行完毕的结果: " + summary);//手动关闭线程池threadPoolExecutor.shutdown();}}
//执行结果:
等待CompletableTable内部线程池执行完毕...
CompletableTable行完毕的结果: 5050
2.1.3、异步任务中的线程池
CompletableFuture默认会使用自己的线程池来执行任务,默认线程池由ForkJoinPool.commonPool()创建的。它的一些方法中也支持自定义线程池传入替换默认的线程池。
部分源码如下:
private static final Executor asyncPool = useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();public static CompletableFuture<Void> runAsync(Runnable runnable) {//asyncPool是默认执行器——ForkJoinPool.commonPool()创建的return asyncRunStage(asyncPool, runnable);
}public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {//asyncPool是默认执行器——ForkJoinPool.commonPool()创建的return asyncSupplyStage(asyncPool, supplier);
}//runAsync和supplyAsync方法支持自定义线程传入的重载方法
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
2.2、异步任务回调
Comp le tableFuture.get()方法是阻塞的,调用时它会阻塞等待直到这个Future完成,并在完成后返回结果。但是,很多时候这不是我们想要的。
对于构建异步系统,我们应该能够将回调附加到CompletableFuture上,当这个Future完成时,该回调应自动被调用。这样,我们就不必等待结果了,然后在Future的回调函数内便携完成Future之后需要执行的逻辑。
2.2.1、thenApply
使用thenApply(…)方法可以处理和转换CompletableFuture的结果。它以Function<T, R>作为参数。Function<T, R>是一个函数式接口,表示一个转换操作,它接受类型T的参数并产生类型R的结果。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn, Executor executor)
需求:完成一个从1加到101的任务,然后判断任务的结果是不是偶数。
使用代码示例:
package com.example.mavendemo.completablefuture;import java.util.concurrent.*;public class ThenApplyTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();//指定线程池 && lambda写法CompletableFuture<Long> completableFuture = CompletableFuture.supplyAsync(() -> {try {long result = CommonUtils.calculateNumber(1, 101);return result;} catch (Exception e) {throw new RuntimeException(e);}}, threadPoolExecutor);CompletableFuture<Boolean> evenNumberFuture = completableFuture.thenApply(content -> content % 2 == 0);CommonUtils.printThreadLog("等待CompletableTable执行完毕...");//等待CompletableTable线程池执行完毕(get()会阻塞直到拿到结果)boolean evenNumber = evenNumberFuture.get();CommonUtils.printThreadLog("CompletableTable执行完毕的结果, 是偶数: " + evenNumber);//手动关闭线程池threadPoolExecutor.shutdown();}}//执行结果:
1712388667746 | 1-main | 等待CompletableTable执行完毕...
1712388668370 | 1-main | CompletableTable执行完毕的结果, 是偶数: false
thenApply(Function<T, R>)可以对异步任务的结果进一步应用Function转换,转换后的结果可以在主线程获取,也可以进行下一步的转换。下面使用链式操作写个demo, 代码示例如下:
package com.example.mavendemo.completablefuture;import java.util.concurrent.*;public class ThenApplyTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();//指定线程池 && lambda写法CompletableFuture<Boolean> evenNumberFuture = CompletableFuture.supplyAsync(() -> {try {long result = CommonUtils.calculateNumber(1, 101);return result;} catch (Exception e) {throw new RuntimeException(e);}}, threadPoolExecutor).thenApply(content -> content % 2 == 0);System.out.println("等待CompletableTable执行完毕...");//等待CompletableTable线程池执行完毕(get()会阻塞直到拿到结果)boolean evenNumber = evenNumberFuture.get();System.out.println("CompletableTable执行完毕的结果, 是偶数: " + evenNumber);//手动关闭线程池threadPoolExecutor.shutdown();}}
//执行结果:
等待CompletableTable执行完毕...
CompletableTable执行完毕的结果, 是偶数: false
2.2.2、thenAccept
如果你不想回调函数返回结果,且也想在Future完成后运行一些代码,那么thenAccept(…)方法很适合。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
该方法只有一个入参Consumer, 它可以对上一个异步任务的执行结果进行消费使用,方法返回CompletableFuture。
该方法通常用作回调链中的最后一个回调。
需求:完成从1加到100,当是奇数的时候,才累加,并判断累加后的数字是偶数还是奇数。
代码示例:
package com.example.mavendemo.completablefuture;import com.google.common.collect.Lists;import java.util.List;
import java.util.concurrent.*;public class ThenAcceptTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();//指定线程池 && lambda写法CompletableFuture.supplyAsync(() -> calculateNumber(1, 100), threadPoolExecutor).thenApply(content -> {long total = 0L;for (Integer integer : content) {total += integer;}CommonUtils.printThreadLog("最后相加的结果值是:" + total);return total;}).thenAccept(total -> {boolean bool = total % 2 == 0;CommonUtils.printThreadLog("最后相加的结果值是偶数吗:" + bool);});System.out.println("等待CompletableTable内部线程池执行完毕...");//手动关闭线程池threadPoolExecutor.shutdown();}public static List<Integer> calculateNumber(int startNumber, int endNumber) {List<Integer> resultList = Lists.newArrayList();for (int i = startNumber; i <= endNumber; i++) {//奇数才收集if(i % 2 != 0) {resultList.add(i);}}return resultList;}
}
//执行结果:
等待CompletableTable内部线程池执行完毕...
1712388999405 | 11-custom-thread-pool | 最后相加的结果值是:2500
1712388999436 | 11-custom-thread-pool | 最后相加的结果值是偶数吗:true
2.2.3、thenRun
如果我们只是想从CompletableFuture的链式操作得到一个完成的通知,甚至都不使用上一步链式的操作的结果,那么CompletableFuture.thenRun()会是你最佳的选择,它需要一个Runnable并返回CompletableFuture.
public CompletableFuture<Void> thenRun(Runnable action) ;
代码示例:
package com.example.mavendemo.completablefuture;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;public class ThenRunTest {public static void main(String[] args) {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();CompletableFuture.runAsync(()->{CommonUtils.printThreadLog("开始计算...");long count = CommonUtils.calculateNumber(1, 100);CommonUtils.printThreadLog("结果值是" + count);}, threadPoolExecutor).thenRun(()->{CommonUtils.printThreadLog("得到计算完成的通知。");});threadPoolExecutor.shutdown();}
}
//执行结果:
1712303391158 | 11-custom-thread-pool | 开始计算...
1712303396562 | 11-custom-thread-pool | 结果值是5050
1712303396564 | 11-custom-thread-pool | 得到计算完成的通知。
2.2.4、异步提升优化——异步回调方法
CompletableFuture提供的所有回调方法都有异步调用方法(以Async结尾命名的方法), 如下:
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);
未使用优化方法的代码示例:
package com.example.mavendemo.completablefuture;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;public class thenApplySyncTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();//指定线程池 && lambda写法CompletableFuture<Boolean> evenNumberFuture = CompletableFuture.supplyAsync(() -> {try {long result = CommonUtils.calculateNumber(1, 101);CommonUtils.printThreadLog("累加结果:" + result);return result;} catch (Exception e) {throw new RuntimeException(e);}}, threadPoolExecutor).thenApply(content -> {boolean result = content % 2 == 0;CommonUtils.printThreadLog("是偶数:" + result);return result;});System.out.println("等待CompletableTable执行完毕...");//等待CompletableTable线程池执行完毕(get()会阻塞直到拿到结果)boolean evenNumber = evenNumberFuture.get();System.out.println("CompletableTable执行完毕的结果, 是偶数: " + evenNumber);//手动关闭线程池threadPoolExecutor.shutdown();}
}
//执行结果(使用同一个线程):
等待CompletableTable执行完毕...
1712327657967 | 11-custom-thread-pool | 累加结果:5151
1712327657998 | 11-custom-thread-pool | 是偶数:false
CompletableTable执行完毕的结果, 是偶数: false
使用优化后的异步方法:
package com.example.mavendemo.completablefuture;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;public class thenApplySyncTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();//指定线程池 && lambda写法CompletableFuture<Boolean> evenNumberFuture = CompletableFuture.supplyAsync(() -> {try {long result = CommonUtils.calculateNumber(1, 101);CommonUtils.printThreadLog("累加结果:" + result);return result;} catch (Exception e) {throw new RuntimeException(e);}}, threadPoolExecutor).thenApplyAsync(content -> {boolean result = content % 2 == 0;CommonUtils.printThreadLog("是偶数:" + result);return result;}, threadPoolExecutor);System.out.println("等待CompletableTable执行完毕...");//等待CompletableTable线程池执行完毕(get()会阻塞直到拿到结果)boolean evenNumber = evenNumberFuture.get();System.out.println("CompletableTable执行完毕的结果, 是偶数: " + evenNumber);//手动关闭线程池threadPoolExecutor.shutdown();}
}
//执行结果(使用的是不同线程):
等待CompletableTable执行完毕...
1712327753847 | 11-custom-thread-pool | 累加结果:5151
1712327753876 | 12-custom-thread-pool | 是偶数:false
CompletableTable执行完毕的结果, 是偶数: false
2.3、异步任务编排
2.3.1、thenCombine(…)编排两个没有依赖关系的异步任务
Combine /kəmˈbaɪn/ v.(使)结合,组合;(使)化合;合并.
当有两个个Future独立运行并在运行完之后执行回调操作,那么CompletableFuture.thenCombine(…)方法是非常适合这样的场景。
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,//T是第一个任务的结果,U是第二任务的结果,V是经BiFunction应用转换后的结果BiFunction<? super T,? super U,? extends V> fn)
注意:当2个Future都完成时,才将2个异步任务的结果传递给thenCombine()的回调函数做进一步处理。
需求:一个任务计算从1到100的累加,一个任务计算从1到1000的累加,最后将两个结果累加起来并打印。
代码示例:
package com.example.mavendemo.completablefuture;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;public class ThenCombineTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();CompletableFuture<Long> calculateNumber100Future = CompletableFuture.supplyAsync(() -> CommonUtils.calculateNumber(1, 100), threadPoolExecutor);CompletableFuture<Long> calculateNumber1000Future = CompletableFuture.supplyAsync(() -> CommonUtils.calculateNumber(1, 1000), threadPoolExecutor);CompletableFuture<Long> totalFuture = calculateNumber100Future.thenCombine(calculateNumber1000Future, (calculateNumber100, calculateNumber1000) -> calculateNumber100 + calculateNumber1000);CommonUtils.printThreadLog("正在获取结果...");CommonUtils.printThreadLog("结果值:" + totalFuture.get());threadPoolExecutor.shutdown();}
}
//执行结果
1712306901623 | 1-main | 正在获取结果...
1712306901648 | 1-main | 结果值:505550
thenCombine也存在异步回调变体版本:
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)
2.3.2、allOf合并多个异步任务
如果要编排任意数量的CompletableFuture怎么办?CompletableFuture.allOf(…)可以做到。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
CompletableFuture.allOf(…)适用多个需要独立运行的Future, 并在所有这些Future都完成后执行的操作。
需求:一个任务计算从1到100的累加,一个任务计算从1到1000的累加,一个任务计算从1到10000的累加,最后将三个结果累加起来并打印。
代码示例:
package com.example.mavendemo.completablefuture;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;public class AllOfTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();List<CompletableFuture<Long>> futureList = new ArrayList<>();CompletableFuture<Long> calculateNumber100Future = CompletableFuture.supplyAsync(() -> CommonUtils.calculateNumber(1, 100), threadPoolExecutor);CompletableFuture<Long> calculateNumber1000Future = CompletableFuture.supplyAsync(() -> CommonUtils.calculateNumber(1, 1000), threadPoolExecutor);CompletableFuture<Long> calculateNumber10000Future = CompletableFuture.supplyAsync(() -> CommonUtils.calculateNumber(1, 10000), threadPoolExecutor);futureList.add(calculateNumber100Future);futureList.add(calculateNumber1000Future);futureList.add(calculateNumber10000Future);//转成数组CompletableFuture[] completableFutureArray = futureList.toArray(new CompletableFuture[futureList.size()]);//调用allOf方法CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(completableFutureArray);CompletableFuture<Object> totalFuture = allOfFuture.thenApply(v -> futureList.stream().map(future -> future.join()).reduce(0L, Long::sum));CommonUtils.printThreadLog("正在获取结果...");CommonUtils.printThreadLog("结果值:" + totalFuture.get());threadPoolExecutor.shutdown();}
}
//执行结果:
1712389791446 | 1-main | 正在获取结果...
1712389852987 | 1-main | 结果值:50510550
2.3.2、anyOf获取多个异步任务中最先执行完的结果
CompletableFuture.anyOf(…)适用多个需要独立运行的Future, 在这些Future任务中的任何一个完成后执行的操作。
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
注意:使用anyOf()需要注意的是,Future返回的结果类型是Object。
需求:一个任务计算从1到100的累加,一个任务计算从1到1000的累加,一个任务计算从1到10000的累加,最后将三个结果累加起来并打印。
代码示例:
package com.example.mavendemo.completablefuture;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;public class AnyOfTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();List<CompletableFuture<Long>> futureList = new ArrayList<>();CompletableFuture<Long> calculateNumber100Future = CompletableFuture.supplyAsync(() -> {long count = CommonUtils.calculateNumber(1, 100);//休眠5秒钟CommonUtils.sleepMills(5000L);return count;}, threadPoolExecutor);CompletableFuture<Long> calculateNumber1000Future = CompletableFuture.supplyAsync(() -> {long count = CommonUtils.calculateNumber(1, 1000);//休眠1秒钟CommonUtils.sleepMills(1000L);return count;}, threadPoolExecutor);CompletableFuture<Long> calculateNumber10000Future = CompletableFuture.supplyAsync(() -> {long count = CommonUtils.calculateNumber(1, 10000);//休眠10秒钟CommonUtils.sleepMills(10000L);return count;}, threadPoolExecutor);futureList.add(calculateNumber100Future);futureList.add(calculateNumber1000Future);futureList.add(calculateNumber10000Future);//调用anyOf方法CompletableFuture<Object> allOfFuture = CompletableFuture.anyOf(calculateNumber100Future, calculateNumber1000Future, calculateNumber10000Future);CommonUtils.printThreadLog("正在获取结果...");CommonUtils.printThreadLog("结果值:" + allOfFuture.get());threadPoolExecutor.shutdown();}
}
//执行结果:
1712389929613 | 1-main | 正在获取结果...
1712389935231 | 1-main | 结果值:5050
2.4、异步任务的异常处理
如果我们在使用CompletableFuture过程中出现了异常,如何打印?
2.4.1、exceptionally(…)用于处理回调链上的异常
exceptionally/ɪkˈsepʃənəli/adv. 异常地;特殊地;例外地
当CompletableFuture链式使用时,如果出现任何异常,都不会继续向下执行,这时候方法exceptionally就可以捕获异常处理。
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
正常捕获异常:
package com.example.mavendemo.completablefuture;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;public class ExceptionallyTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()-> 1 + 2).thenApply(result -> result + 3).thenApply(result -> 1/0);try {CommonUtils.printThreadLog(completableFuture.get().toString());} catch (Exception e) {CommonUtils.printThreadLog("发现异常" + e.getMessage());}}
}
//执行结果:
1712390409902 | 1-main | 发现异常java.lang.ArithmeticException: / by zero
使用链式异常捕获代码示例:
package com.example.mavendemo.completablefuture;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;public class ExceptionallyTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()-> 1 + 2).thenApply(result -> result + 3).thenApply(result -> 1/0)//异常捕获方法.exceptionally(e ->{e.printStackTrace();System.err.println(e.getMessage());return 0;});CommonUtils.printThreadLog(completableFuture.get().toString());}
}
//执行结果:
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zeroat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:618)at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)at com.example.mavendemo.completablefuture.ExceptionallyTest.main(ExceptionallyTest.java:13)
Caused by: java.lang.ArithmeticException: / by zeroat com.example.mavendemo.completablefuture.ExceptionallyTest.lambda$main$2(ExceptionallyTest.java:13)at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)... 3 more
java.lang.ArithmeticException: / by zero
2.5、异步任务交互获得结果
2.5.1、get()方法和join()方法
get()和join()都是CompletableFuture提供的以阻塞方式获得结果的方式,其中get()方法抛出检查时异常,必须要处理。
public T get() throws InterruptedException, ExecutionException;
public T join();
代码示例如下:
package com.example.mavendemo.completablefuture;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class GetOrJoinTest {public static void main(String[] args) {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "hello completabelFuture");String ret = null;try {//get()方法抛出检查时异常,必须要处理ret = completableFuture.get();} catch (InterruptedException e) {throw new RuntimeException(e);} catch (ExecutionException e) {throw new RuntimeException(e);}CommonUtils.printThreadLog("get()结果:" + ret);//join()犯法抛出运行时异常,可以不需要处理ret = completableFuture.join();CommonUtils.printThreadLog("join()结果:" + ret);}
}
//执行结果:
1712321201059 | 1-main | get()结果:hello completabelFuture
1712321201107 | 1-main | join()结果:hello completabelFuture
get()方法还有个设置超时时间的参数, 如下:
public T get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException
代码示例:
package com.example.mavendemo.completablefuture;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;public class GetOrJoinTest {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {CommonUtils.sleepMills(10000L);return "hello completabelFuture";});String ret = null;try {//get()方法抛出检查时异常,必须要处理ret = completableFuture.get(3, TimeUnit.SECONDS);} catch (Exception e) {if(e instanceof TimeoutException){CommonUtils.printThreadLog( "超时了");ret = "超时了";} else {throw e;}}CommonUtils.printThreadLog("get()结果:" + ret);}
}
//执行结果:
1712321892289 | 1-main | 超时了
1712321892349 | 1-main | get()结果:超时了
2.6、ParallelStream VS CompletableFuture
2.6.1、并行流的局限性
java8提供了并行流的操作,默认使用的是ForkjoinPool线程池。并行流的运行线程数量是由机器的逻辑内核数决定的。
代码示例:
package com.example.mavendemo.completablefuture;import com.google.common.collect.Lists;import java.util.List;
import java.util.stream.Collectors;public class ParallelStreamTaskTest {public static void main(String[] args) {List<Task> taskList = Lists.newArrayList();for (int i = 0; i < 10; i++) {Task task = new Task();taskList.add(task);}long startTime = System.currentTimeMillis();List<String> result = taskList.parallelStream().map(task -> task.doWork()).collect(Collectors.toList());CommonUtils.printThreadLog("result.size: "+ result.size()+", 耗时:" + (System.currentTimeMillis() - startTime));}static class Task{public String doWork(){CommonUtils.sleepMills(1000L);CommonUtils.printThreadLog("doWork");return "doWork";}}
}
//执行结果(前面8个结果先打印出来,后面两个结果再打印出来):
1712326487964 | 12-ForkJoinPool.commonPool-worker-2 | doWork
1712326487964 | 13-ForkJoinPool.commonPool-worker-3 | doWork
1712326487964 | 17-ForkJoinPool.commonPool-worker-7 | doWork
1712326487964 | 11-ForkJoinPool.commonPool-worker-1 | doWork
1712326487964 | 1-main | doWork
1712326487964 | 15-ForkJoinPool.commonPool-worker-4 | doWork
1712326487964 | 16-ForkJoinPool.commonPool-worker-6 | doWork
1712326487964 | 14-ForkJoinPool.commonPool-worker-5 | doWork1712326489037 | 12-ForkJoinPool.commonPool-worker-2 | doWork
1712326489038 | 17-ForkJoinPool.commonPool-worker-7 | doWork1712326489039 | 1-main | result.size: 10, 耗时:2187
从结果可以得出并行流的运行线程数量是由机器的逻辑内核数决定的(查看mac逻辑内核数量命令:sysctl hw.logicalcpu)。
2.6.2、CompletableFuture在流式操作中的优势
CompletableFuture支持自定义的线程池传入, 线程池的参数可以根据业务类型进行调整, 线程池的使用更灵活。
package com.example.mavendemo.completablefuture;import com.google.common.collect.Lists;import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import java.util.stream.Stream;public class CompleteTableFutureTaskTest {public static void main(String[] args) {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();List<CompleteTableFutureTaskTest.Task> taskList = Lists.newArrayList();for (int i = 0; i < 10; i++) {CompleteTableFutureTaskTest.Task task = new CompleteTableFutureTaskTest.Task();taskList.add(task);}long startTime = System.currentTimeMillis();List<CompletableFuture<String>> completableFutureList = taskList.stream().map(task -> CompletableFuture.supplyAsync(()-> task.doWork(), threadPoolExecutor)).collect(Collectors.toList());List<String> result = completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());CommonUtils.printThreadLog("result.size: "+ result.size()+", 耗时:" + (System.currentTimeMillis() - startTime));//手动销毁线程池threadPoolExecutor.shutdown();}static class Task{public String doWork(){CommonUtils.sleepMills(1000L);CommonUtils.printThreadLog("doWork");return "doWork";}}
}
//执行结果(10条结果,一次性打出来):
1712326568638 | 17-custom-thread-pool | doWork
1712326568638 | 15-custom-thread-pool | doWork
1712326568635 | 16-custom-thread-pool | doWork
1712326568635 | 12-custom-thread-pool | doWork
1712326568636 | 11-custom-thread-pool | doWork
1712326568635 | 19-custom-thread-pool | doWork
1712326568638 | 18-custom-thread-pool | doWork
1712326568635 | 14-custom-thread-pool | doWork
1712326568636 | 20-custom-thread-pool | doWork
1712326568636 | 13-custom-thread-pool | doWork
1712326568746 | 1-main | result.size: 10, 耗时:1734