并发包中的CompletableFuture介绍及使用示例

一、抛砖引玉——优化一个计数任务

1.1、需求:计算从1加累加到100的结果

1.1.1、普通的实现代码示例
package com.example.mavendemo.completablefuture;public class NormalTest {public static void main(String[] args) {long startTime = System.currentTimeMillis();long calculate100 = calculateNumber(1, 100);System.out.println("calculate100 =" + calculate100);System.out.println("耗时:" + (System.currentTimeMillis() - startTime) );}public static long calculateNumber(int startNumber, int endNumber){int result = 0;for (int i = startNumber; i <= endNumber; i++) {try {//睡眠5ms(为了演示效果)Thread.sleep(5L);} catch (InterruptedException e) {throw new RuntimeException(e);}result +=i;}return result;}
}
//执行结果:
calculate1000 =5050
耗时:623
1.1.2、使用java并发包中的类实现
1.1.2.1、工具类代码准备
package com.example.mavendemo.completablefuture;import java.util.StringJoiner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class CommonUtils {/*** 初始化线程池* @return*/public static ThreadPoolExecutor initThreadPoolExecutor() {int corePoolSize = Runtime.getRuntime().availableProcessors();ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2 * corePoolSize, 2 * corePoolSize, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(null, r,"custom-thread-pool",0);return t;}}, new ThreadPoolExecutor.CallerRunsPolicy());return threadPoolExecutor;}/*** 带当前时间、线程信息打印的日志方法* @param info*/public static void printThreadLog(String info){String result = new StringJoiner(" | ").add(String.valueOf(System.currentTimeMillis())).add(String.format("%s-%s", Thread.currentThread().getId(), Thread.currentThread().getName())).add(info).toString();System.out.println(result);}/*** 休眠毫秒* @param mills*/public static void sleepMills(long mills){try {Thread.sleep(mills);} catch (InterruptedException e) {throw new RuntimeException(e);}}/*** 计算数字开始和结束之前的数字累加之和* @param startNumber* @param endNumber* @return*/public static long calculateNumber(int startNumber, int endNumber){int result = 0;for (int i = startNumber; i <= endNumber; i++) {try {//睡眠5ms(为了演示效果)Thread.sleep(5L);} catch (InterruptedException e) {throw new RuntimeException(e);}result +=i;}return result;}
}
1.1.2.2、使用Future结合线程池实现的代码示例
package com.example.mavendemo.completablefuture;import com.google.common.collect.Lists;import java.util.List;
import java.util.concurrent.*;public class FutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();long startTime = System.currentTimeMillis();Long total = 0L;
//        Future<Long> calculate20Future = threadPoolExecutor.submit(()-> calculateNumber(1, 20));
//        Future<Long> calculate40Future = threadPoolExecutor.submit(()-> calculateNumber(21, 40));
//        Future<Long> calculate60Future = threadPoolExecutor.submit(()-> calculateNumber(41, 60));
//        Future<Long> calculate80Future = threadPoolExecutor.submit(()-> calculateNumber(61, 80));
//        Future<Long> calculate100Future = threadPoolExecutor.submit(()-> calculateNumber(81, 100));
//
//        Long calculate20 = calculate20Future.get();
//        Long calculate40 = calculate40Future.get();
//        Long calculate60 = calculate60Future.get();
//        Long calculate80 = calculate80Future.get();
//        Long calculate100 = calculate100Future.get();
//        total = (calculate20 + calculate40 + calculate60 + calculate80 + calculate100) ;List<Future<Long>> futureList = Lists.newArrayList();int i = 100 /20;for (int j = 1; j <= i; j++) {int finalJ = j;futureList.add(threadPoolExecutor.submit(()-> CommonUtils.calculateNumber((finalJ -1) * 20 + 1, finalJ * 20)));}CommonUtils.printThreadLog("正在获取结果...");for (Future<Long> longFuture : futureList) {total += longFuture.get();}CommonUtils.printThreadLog("calculate100 =" + total + ", 耗时:" + (System.currentTimeMillis() - startTime) );threadPoolExecutor.shutdown();}}
//执行结果:
1712386717326 | 1-main | 正在获取结果...
1712386717449 | 1-main | calculate1000 =5050, 耗时:242
1.1.2.3、使用CompletableFuture实现的代码示例
package com.example.mavendemo.completablefuture;import com.google.common.collect.Lists;import java.util.List;
import java.util.concurrent.*;public class CompleteTableFutureTest
{public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();long startTime = System.currentTimeMillis();List<CompletableFuture<Long>> futureList = Lists.newArrayList();int i = 100 /20;for (int j = 1; j <= i; j++) {int finalJ = j;futureList.add(CompletableFuture.supplyAsync(()-> CommonUtils.calculateNumber((finalJ -1) * 20 + 1, finalJ * 20)));}CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()]));CommonUtils.printThreadLog("正在获取结果...");long total = 0L;for (Future<Long> longFuture : futureList) {total += longFuture.get();}CommonUtils.printThreadLog("calculate100 =" + total + ", 耗时:" + (System.currentTimeMillis() - startTime) );//销毁线程threadPoolExecutor.shutdown();}}
//执行结果:
1712386733592 | 1-main | 正在获取结果...
1712386733715 | 1-main | calculate1000 =5050, 耗时:221

一、 Future和CompletableFuture的对比

1.1、 Future的使用局限性

通过上面的代码,我们发现,Future的不足之处:

  • 在没有阻塞的情况下,无法对Future的结果执行进一步的操作。Future不会告诉你它什么时候执行完成,你如果想要得到结果,只能调用get()方法,该方法会阻塞直到拿到结果。
  • 不能将多个Future合并到一起。假设有多种不同的Future任务,你想在它们全部执行完成后再运行某个任务, Future很难独立完成这样的Case。

1.2、CompletableFuture的优势

在这里插入图片描述
在这里插入图片描述

CompletableFuture实现了Future和CompletionStage接口,诞生于jdk1.8。

CompletableFuture相对于Future具有以下优势:

  • 为快速创建、链接以来和组合多个Future提供了大量的便利方法。

  • 提供了适用于各种开发场景的回调函数,它还提供了非常全面的异常处理支持。

  • 无缝衔接Lambda表达式和Stream - API。

    备注:Completion /kəmˈpliːʃ(ə)n/: n,完成,结束

二、 CompletableFuture的使用

2.1、创建异步任务

2.1.1、runAsync

CompletableFuture.runAsync(…)方法,它接受一个Runnable接口的实现类对象,方法返回CompletableFuture对象, 也就是没有返回值,同时还支持自定义线程池传入。

public static CompletableFuture<Void> runAsync(Runnable runnable)public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

需求:完成一个从1加到100的任务,然后将这个任务的结果打印出来。

方法的使用代码示例:

package com.example.mavendemo.completablefuture;import java.util.concurrent.*;public class RunAsyncTest {public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();//不指定线程池,默认使用内部线程池-forjoinPool
//        CompletableFuture.runAsync(new Runnable() {
//            @Override
//            public void run() {
//                try {
//                    CommonUtils.printThreadLog("线程拿到结果:" + CommonUtils.calculateNumber(1, 100));
//                } catch (InterruptedException e) {
//                    throw new RuntimeException(e);
//                }
//            }
//        });//自定义线程池传入
//        CompletableFuture.runAsync(new Runnable() {
//            @Override
//            public void run() {
//                try {
//                    CommonUtils.printThreadLog("线程拿到结果:" + CommonUtils.calculateNumber(1, 100));
//                } catch (InterruptedException e) {
//                    throw new RuntimeException(e);
//                }
//            }
//        }, threadPoolExecutor);//指定线程池 && lambda表达式写法CompletableFuture.runAsync(() -> {try {CommonUtils.printThreadLog("线程拿到结果:" + CommonUtils.calculateNumber(1, 100));} catch (Exception e) {throw new RuntimeException(e);}}, threadPoolExecutor);//主线程休眠4秒,等待CompletableTable内部线程池执行完毕。CommonUtils.printThreadLog("等待CompletableTable内部线程池执行完毕...");//手动关闭线程池threadPoolExecutor.shutdown();}}//执行结果:
1712300943769 | 1-main | 等待CompletableTable内部线程池执行完毕...
1712300949109 | 11-custom-thread-pool | 线程拿到结果:5050

使用CompletableFuture.runAsync(…)方法时,如果我想等待结果,怎么办?可以结合CountDownLatch加锁,示例如下:

package com.example.mavendemo.completablefuture;import java.util.concurrent.*;public class RunAsyncTest {public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();CountDownLatch countDownLatch = new CountDownLatch(1);//指定线程池 && lambda写法 && 使用CountDownLatch加锁CompletableFuture.runAsync(() -> {try {CommonUtils.printThreadLog("CompletableFuture拿到结果:" + CommonUtils.calculateNumber(1, 100));} catch (Exception e) {throw new RuntimeException(e);} finally {countDownLatch.countDown();}}, threadPoolExecutor);//等待CompletableTable线程池执行完毕。CommonUtils.printThreadLog("等待CompletableTable内部线程池执行完毕...");countDownLatch.await();//手动关闭线程池threadPoolExecutor.shutdown();}}
//执行结果:
1712301073706 | 1-main | 等待CompletableTable内部线程池执行完毕...
1712301079087 | 11-custom-thread-pool | CompletableFuture拿到结果:5050
2.1.2、supplyAsync

​ CompletableFuture.supplyAsync(…)方法,返回CompletableFuture对象, 也就是有返回值(这里的U是泛型),同时还支持自定义线程池传入。

  public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

注意:入参Supplier表示结果的提供者, 这是一个函数式接口,里面只有一个get(), 用于给带返回值的异步任务使用。

需求:完成一个从1加到100的任务,然后将这个任务的结果打印出来。

方法的使用代码示例:

package com.example.mavendemo.completablefuture;import java.util.concurrent.*;
import java.util.function.Supplier;public class SupplyAsyncTest {public static void main(String[] args) throws InterruptedException, ExecutionException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();//        CompletableFuture<Long> completableFuture = CompletableFuture.supplyAsync(new Supplier<Long>() {
//            @Override
//            public Long get() {
//                try {
//                    long result = calculateNumber(100);
//                    return result;
//                } catch (InterruptedException e) {
//                    throw new RuntimeException(e);
//                }
//            }
//        });//指定线程池 && lambda写法CompletableFuture<Long> completableFuture = CompletableFuture.supplyAsync(() -> {try {long result = CommonUtils.calculateNumber(1, 100);return result;} catch (Exception e) {throw new RuntimeException(e);}}, threadPoolExecutor);System.out.println("等待CompletableTable执行完毕...");//等待CompletableTable线程池执行完毕(get()会阻塞直到拿到结果)Long summary = completableFuture.get();System.out.println("CompletableTable行完毕的结果: " + summary);//手动关闭线程池threadPoolExecutor.shutdown();}}
//执行结果:
等待CompletableTable内部线程池执行完毕...
CompletableTable行完毕的结果: 5050
2.1.3、异步任务中的线程池

​ CompletableFuture默认会使用自己的线程池来执行任务,默认线程池由ForkJoinPool.commonPool()创建的。它的一些方法中也支持自定义线程池传入替换默认的线程池。

部分源码如下:

private static final Executor asyncPool = useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();public static CompletableFuture<Void> runAsync(Runnable runnable) {//asyncPool是默认执行器——ForkJoinPool.commonPool()创建的return asyncRunStage(asyncPool, runnable);
}public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {//asyncPool是默认执行器——ForkJoinPool.commonPool()创建的return asyncSupplyStage(asyncPool, supplier);
}//runAsync和supplyAsync方法支持自定义线程传入的重载方法
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

2.2、异步任务回调

Comp le tableFuture.get()方法是阻塞的,调用时它会阻塞等待直到这个Future完成,并在完成后返回结果。但是,很多时候这不是我们想要的。

对于构建异步系统,我们应该能够将回调附加到CompletableFuture上,当这个Future完成时,该回调应自动被调用。这样,我们就不必等待结果了,然后在Future的回调函数内便携完成Future之后需要执行的逻辑。

2.2.1、thenApply

使用thenApply(…)方法可以处理和转换CompletableFuture的结果。它以Function<T, R>作为参数。Function<T, R>是一个函数式接口,表示一个转换操作,它接受类型T的参数并产生类型R的结果。

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn, Executor executor)

需求:完成一个从1加到101的任务,然后判断任务的结果是不是偶数。

使用代码示例:

package com.example.mavendemo.completablefuture;import java.util.concurrent.*;public class ThenApplyTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();//指定线程池 && lambda写法CompletableFuture<Long> completableFuture = CompletableFuture.supplyAsync(() -> {try {long result = CommonUtils.calculateNumber(1, 101);return result;} catch (Exception e) {throw new RuntimeException(e);}}, threadPoolExecutor);CompletableFuture<Boolean> evenNumberFuture = completableFuture.thenApply(content -> content % 2 == 0);CommonUtils.printThreadLog("等待CompletableTable执行完毕...");//等待CompletableTable线程池执行完毕(get()会阻塞直到拿到结果)boolean evenNumber = evenNumberFuture.get();CommonUtils.printThreadLog("CompletableTable执行完毕的结果, 是偶数: " + evenNumber);//手动关闭线程池threadPoolExecutor.shutdown();}}//执行结果:
1712388667746 | 1-main | 等待CompletableTable执行完毕...
1712388668370 | 1-main | CompletableTable执行完毕的结果, 是偶数: false

​ thenApply(Function<T, R>)可以对异步任务的结果进一步应用Function转换,转换后的结果可以在主线程获取,也可以进行下一步的转换。下面使用链式操作写个demo, 代码示例如下:

package com.example.mavendemo.completablefuture;import java.util.concurrent.*;public class ThenApplyTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();//指定线程池 && lambda写法CompletableFuture<Boolean> evenNumberFuture = CompletableFuture.supplyAsync(() -> {try {long result = CommonUtils.calculateNumber(1, 101);return result;} catch (Exception e) {throw new RuntimeException(e);}}, threadPoolExecutor).thenApply(content -> content % 2 == 0);System.out.println("等待CompletableTable执行完毕...");//等待CompletableTable线程池执行完毕(get()会阻塞直到拿到结果)boolean evenNumber = evenNumberFuture.get();System.out.println("CompletableTable执行完毕的结果, 是偶数: " + evenNumber);//手动关闭线程池threadPoolExecutor.shutdown();}}
//执行结果:
等待CompletableTable执行完毕...
CompletableTable执行完毕的结果, 是偶数: false
2.2.2、thenAccept

如果你不想回调函数返回结果,且也想在Future完成后运行一些代码,那么thenAccept(…)方法很适合。

public CompletableFuture<Void> thenAccept(Consumer<? super T> action);

该方法只有一个入参Consumer, 它可以对上一个异步任务的执行结果进行消费使用,方法返回CompletableFuture。

该方法通常用作回调链中的最后一个回调。

需求:完成从1加到100,当是奇数的时候,才累加,并判断累加后的数字是偶数还是奇数。

代码示例:

package com.example.mavendemo.completablefuture;import com.google.common.collect.Lists;import java.util.List;
import java.util.concurrent.*;public class ThenAcceptTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();//指定线程池 && lambda写法CompletableFuture.supplyAsync(() -> calculateNumber(1, 100), threadPoolExecutor).thenApply(content -> {long total = 0L;for (Integer integer : content) {total += integer;}CommonUtils.printThreadLog("最后相加的结果值是:" + total);return total;}).thenAccept(total -> {boolean bool = total % 2 == 0;CommonUtils.printThreadLog("最后相加的结果值是偶数吗:" + bool);});System.out.println("等待CompletableTable内部线程池执行完毕...");//手动关闭线程池threadPoolExecutor.shutdown();}public static List<Integer> calculateNumber(int startNumber, int endNumber) {List<Integer> resultList = Lists.newArrayList();for (int i = startNumber; i <= endNumber; i++) {//奇数才收集if(i % 2 != 0) {resultList.add(i);}}return resultList;}
}
//执行结果:
等待CompletableTable内部线程池执行完毕...
1712388999405 | 11-custom-thread-pool | 最后相加的结果值是:2500
1712388999436 | 11-custom-thread-pool | 最后相加的结果值是偶数吗:true
2.2.3、thenRun

如果我们只是想从CompletableFuture的链式操作得到一个完成的通知,甚至都不使用上一步链式的操作的结果,那么CompletableFuture.thenRun()会是你最佳的选择,它需要一个Runnable并返回CompletableFuture.

public CompletableFuture<Void> thenRun(Runnable action) ;

代码示例:

package com.example.mavendemo.completablefuture;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;public class ThenRunTest {public static void main(String[] args) {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();CompletableFuture.runAsync(()->{CommonUtils.printThreadLog("开始计算...");long count = CommonUtils.calculateNumber(1, 100);CommonUtils.printThreadLog("结果值是" + count);}, threadPoolExecutor).thenRun(()->{CommonUtils.printThreadLog("得到计算完成的通知。");});threadPoolExecutor.shutdown();}
}
//执行结果:
1712303391158 | 11-custom-thread-pool | 开始计算...
1712303396562 | 11-custom-thread-pool | 结果值是5050
1712303396564 | 11-custom-thread-pool | 得到计算完成的通知。
2.2.4、异步提升优化——异步回调方法

CompletableFuture提供的所有回调方法都有异步调用方法(以Async结尾命名的方法), 如下:

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);

未使用优化方法的代码示例:

package com.example.mavendemo.completablefuture;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;public class thenApplySyncTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();//指定线程池 && lambda写法CompletableFuture<Boolean> evenNumberFuture = CompletableFuture.supplyAsync(() -> {try {long result = CommonUtils.calculateNumber(1, 101);CommonUtils.printThreadLog("累加结果:" + result);return result;} catch (Exception e) {throw new RuntimeException(e);}}, threadPoolExecutor).thenApply(content -> {boolean result = content % 2 == 0;CommonUtils.printThreadLog("是偶数:" + result);return result;});System.out.println("等待CompletableTable执行完毕...");//等待CompletableTable线程池执行完毕(get()会阻塞直到拿到结果)boolean evenNumber = evenNumberFuture.get();System.out.println("CompletableTable执行完毕的结果, 是偶数: " + evenNumber);//手动关闭线程池threadPoolExecutor.shutdown();}
}
//执行结果(使用同一个线程):
等待CompletableTable执行完毕...
1712327657967 | 11-custom-thread-pool | 累加结果:5151
1712327657998 | 11-custom-thread-pool | 是偶数:false
CompletableTable执行完毕的结果, 是偶数: false

使用优化后的异步方法:

package com.example.mavendemo.completablefuture;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;public class thenApplySyncTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();//指定线程池 && lambda写法CompletableFuture<Boolean> evenNumberFuture = CompletableFuture.supplyAsync(() -> {try {long result = CommonUtils.calculateNumber(1, 101);CommonUtils.printThreadLog("累加结果:" + result);return result;} catch (Exception e) {throw new RuntimeException(e);}}, threadPoolExecutor).thenApplyAsync(content -> {boolean result = content % 2 == 0;CommonUtils.printThreadLog("是偶数:" + result);return result;}, threadPoolExecutor);System.out.println("等待CompletableTable执行完毕...");//等待CompletableTable线程池执行完毕(get()会阻塞直到拿到结果)boolean evenNumber = evenNumberFuture.get();System.out.println("CompletableTable执行完毕的结果, 是偶数: " + evenNumber);//手动关闭线程池threadPoolExecutor.shutdown();}
}
//执行结果(使用的是不同线程):
等待CompletableTable执行完毕...
1712327753847 | 11-custom-thread-pool | 累加结果:5151
1712327753876 | 12-custom-thread-pool | 是偶数:false
CompletableTable执行完毕的结果, 是偶数: false

2.3、异步任务编排

2.3.1、thenCombine(…)编排两个没有依赖关系的异步任务

Combine /kəmˈbaɪn/ v.(使)结合,组合;(使)化合;合并.

当有两个个Future独立运行并在运行完之后执行回调操作,那么CompletableFuture.thenCombine(…)方法是非常适合这样的场景。

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,//T是第一个任务的结果,U是第二任务的结果,V是经BiFunction应用转换后的结果BiFunction<? super T,? super U,? extends V> fn)

注意:当2个Future都完成时,才将2个异步任务的结果传递给thenCombine()的回调函数做进一步处理。

需求:一个任务计算从1到100的累加,一个任务计算从1到1000的累加,最后将两个结果累加起来并打印。

代码示例:

package com.example.mavendemo.completablefuture;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;public class ThenCombineTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();CompletableFuture<Long> calculateNumber100Future = CompletableFuture.supplyAsync(() -> CommonUtils.calculateNumber(1, 100), threadPoolExecutor);CompletableFuture<Long> calculateNumber1000Future = CompletableFuture.supplyAsync(() -> CommonUtils.calculateNumber(1, 1000), threadPoolExecutor);CompletableFuture<Long> totalFuture = calculateNumber100Future.thenCombine(calculateNumber1000Future, (calculateNumber100, calculateNumber1000) -> calculateNumber100 + calculateNumber1000);CommonUtils.printThreadLog("正在获取结果...");CommonUtils.printThreadLog("结果值:" + totalFuture.get());threadPoolExecutor.shutdown();}
}
//执行结果
1712306901623 | 1-main | 正在获取结果...
1712306901648 | 1-main | 结果值:505550

thenCombine也存在异步回调变体版本:

public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)
2.3.2、allOf合并多个异步任务

如果要编排任意数量的CompletableFuture怎么办?CompletableFuture.allOf(…)可以做到。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

CompletableFuture.allOf(…)适用多个需要独立运行的Future, 并在所有这些Future都完成后执行的操作。

需求:一个任务计算从1到100的累加,一个任务计算从1到1000的累加,一个任务计算从1到10000的累加,最后将三个结果累加起来并打印。

代码示例:

package com.example.mavendemo.completablefuture;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;public class AllOfTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();List<CompletableFuture<Long>> futureList = new ArrayList<>();CompletableFuture<Long> calculateNumber100Future = CompletableFuture.supplyAsync(() -> CommonUtils.calculateNumber(1, 100), threadPoolExecutor);CompletableFuture<Long> calculateNumber1000Future = CompletableFuture.supplyAsync(() -> CommonUtils.calculateNumber(1, 1000), threadPoolExecutor);CompletableFuture<Long> calculateNumber10000Future = CompletableFuture.supplyAsync(() -> CommonUtils.calculateNumber(1, 10000), threadPoolExecutor);futureList.add(calculateNumber100Future);futureList.add(calculateNumber1000Future);futureList.add(calculateNumber10000Future);//转成数组CompletableFuture[] completableFutureArray = futureList.toArray(new CompletableFuture[futureList.size()]);//调用allOf方法CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(completableFutureArray);CompletableFuture<Object> totalFuture = allOfFuture.thenApply(v -> futureList.stream().map(future -> future.join()).reduce(0L, Long::sum));CommonUtils.printThreadLog("正在获取结果...");CommonUtils.printThreadLog("结果值:" + totalFuture.get());threadPoolExecutor.shutdown();}
}
//执行结果:
1712389791446 | 1-main | 正在获取结果...
1712389852987 | 1-main | 结果值:50510550
2.3.2、anyOf获取多个异步任务中最先执行完的结果

CompletableFuture.anyOf(…)适用多个需要独立运行的Future, 在这些Future任务中的任何一个完成后执行的操作。

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

注意:使用anyOf()需要注意的是,Future返回的结果类型是Object。

需求:一个任务计算从1到100的累加,一个任务计算从1到1000的累加,一个任务计算从1到10000的累加,最后将三个结果累加起来并打印。

代码示例:

package com.example.mavendemo.completablefuture;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;public class AnyOfTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();List<CompletableFuture<Long>> futureList = new ArrayList<>();CompletableFuture<Long> calculateNumber100Future = CompletableFuture.supplyAsync(() -> {long count = CommonUtils.calculateNumber(1, 100);//休眠5秒钟CommonUtils.sleepMills(5000L);return count;}, threadPoolExecutor);CompletableFuture<Long> calculateNumber1000Future = CompletableFuture.supplyAsync(() -> {long count = CommonUtils.calculateNumber(1, 1000);//休眠1秒钟CommonUtils.sleepMills(1000L);return count;}, threadPoolExecutor);CompletableFuture<Long> calculateNumber10000Future = CompletableFuture.supplyAsync(() -> {long count = CommonUtils.calculateNumber(1, 10000);//休眠10秒钟CommonUtils.sleepMills(10000L);return count;}, threadPoolExecutor);futureList.add(calculateNumber100Future);futureList.add(calculateNumber1000Future);futureList.add(calculateNumber10000Future);//调用anyOf方法CompletableFuture<Object> allOfFuture = CompletableFuture.anyOf(calculateNumber100Future, calculateNumber1000Future, calculateNumber10000Future);CommonUtils.printThreadLog("正在获取结果...");CommonUtils.printThreadLog("结果值:" + allOfFuture.get());threadPoolExecutor.shutdown();}
}
//执行结果:
1712389929613 | 1-main | 正在获取结果...
1712389935231 | 1-main | 结果值:5050

2.4、异步任务的异常处理

如果我们在使用CompletableFuture过程中出现了异常,如何打印?

2.4.1、exceptionally(…)用于处理回调链上的异常

exceptionally/ɪkˈsepʃənəli/adv. 异常地;特殊地;例外地

当CompletableFuture链式使用时,如果出现任何异常,都不会继续向下执行,这时候方法exceptionally就可以捕获异常处理。

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

正常捕获异常:

package com.example.mavendemo.completablefuture;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;public class ExceptionallyTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()-> 1 + 2).thenApply(result -> result + 3).thenApply(result -> 1/0);try {CommonUtils.printThreadLog(completableFuture.get().toString());} catch (Exception e) {CommonUtils.printThreadLog("发现异常" +  e.getMessage());}}
}
//执行结果:
1712390409902 | 1-main | 发现异常java.lang.ArithmeticException: / by zero

使用链式异常捕获代码示例:

package com.example.mavendemo.completablefuture;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;public class ExceptionallyTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()-> 1 + 2).thenApply(result -> result + 3).thenApply(result -> 1/0)//异常捕获方法.exceptionally(e ->{e.printStackTrace();System.err.println(e.getMessage());return 0;});CommonUtils.printThreadLog(completableFuture.get().toString());}
}
//执行结果:
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zeroat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:618)at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)at com.example.mavendemo.completablefuture.ExceptionallyTest.main(ExceptionallyTest.java:13)
Caused by: java.lang.ArithmeticException: / by zeroat com.example.mavendemo.completablefuture.ExceptionallyTest.lambda$main$2(ExceptionallyTest.java:13)at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)... 3 more
java.lang.ArithmeticException: / by zero

2.5、异步任务交互获得结果

2.5.1、get()方法和join()方法

get()和join()都是CompletableFuture提供的以阻塞方式获得结果的方式,其中get()方法抛出检查时异常,必须要处理。

public T get() throws InterruptedException, ExecutionException;
public T join();

代码示例如下:

package com.example.mavendemo.completablefuture;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class GetOrJoinTest {public static void main(String[] args) {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "hello completabelFuture");String ret = null;try {//get()方法抛出检查时异常,必须要处理ret = completableFuture.get();} catch (InterruptedException e) {throw new RuntimeException(e);} catch (ExecutionException e) {throw new RuntimeException(e);}CommonUtils.printThreadLog("get()结果:" + ret);//join()犯法抛出运行时异常,可以不需要处理ret = completableFuture.join();CommonUtils.printThreadLog("join()结果:" + ret);}
}
//执行结果:
1712321201059 | 1-main | get()结果:hello completabelFuture
1712321201107 | 1-main | join()结果:hello completabelFuture

get()方法还有个设置超时时间的参数, 如下:

public T get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException

代码示例:

package com.example.mavendemo.completablefuture;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;public class GetOrJoinTest {public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {CommonUtils.sleepMills(10000L);return "hello completabelFuture";});String ret = null;try {//get()方法抛出检查时异常,必须要处理ret = completableFuture.get(3, TimeUnit.SECONDS);} catch (Exception e) {if(e instanceof TimeoutException){CommonUtils.printThreadLog( "超时了");ret = "超时了";} else {throw e;}}CommonUtils.printThreadLog("get()结果:" + ret);}
}
//执行结果:
1712321892289 | 1-main | 超时了
1712321892349 | 1-main | get()结果:超时了

2.6、ParallelStream VS CompletableFuture

2.6.1、并行流的局限性

java8提供了并行流的操作,默认使用的是ForkjoinPool线程池。并行流的运行线程数量是由机器的逻辑内核数决定的。

代码示例:

package com.example.mavendemo.completablefuture;import com.google.common.collect.Lists;import java.util.List;
import java.util.stream.Collectors;public class ParallelStreamTaskTest {public static void main(String[] args) {List<Task> taskList = Lists.newArrayList();for (int i = 0; i < 10; i++) {Task task = new Task();taskList.add(task);}long startTime = System.currentTimeMillis();List<String> result = taskList.parallelStream().map(task -> task.doWork()).collect(Collectors.toList());CommonUtils.printThreadLog("result.size: "+ result.size()+", 耗时:" + (System.currentTimeMillis() - startTime));}static class Task{public String doWork(){CommonUtils.sleepMills(1000L);CommonUtils.printThreadLog("doWork");return "doWork";}}
}
//执行结果(前面8个结果先打印出来,后面两个结果再打印出来):
1712326487964 | 12-ForkJoinPool.commonPool-worker-2 | doWork
1712326487964 | 13-ForkJoinPool.commonPool-worker-3 | doWork
1712326487964 | 17-ForkJoinPool.commonPool-worker-7 | doWork
1712326487964 | 11-ForkJoinPool.commonPool-worker-1 | doWork
1712326487964 | 1-main | doWork
1712326487964 | 15-ForkJoinPool.commonPool-worker-4 | doWork
1712326487964 | 16-ForkJoinPool.commonPool-worker-6 | doWork
1712326487964 | 14-ForkJoinPool.commonPool-worker-5 | doWork1712326489037 | 12-ForkJoinPool.commonPool-worker-2 | doWork
1712326489038 | 17-ForkJoinPool.commonPool-worker-7 | doWork1712326489039 | 1-main | result.size: 10, 耗时:2187

从结果可以得出并行流的运行线程数量是由机器的逻辑内核数决定的(查看mac逻辑内核数量命令:sysctl hw.logicalcpu)。

在这里插入图片描述

2.6.2、CompletableFuture在流式操作中的优势

CompletableFuture支持自定义的线程池传入, 线程池的参数可以根据业务类型进行调整, 线程池的使用更灵活。

package com.example.mavendemo.completablefuture;import com.google.common.collect.Lists;import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import java.util.stream.Stream;public class CompleteTableFutureTaskTest {public static void main(String[] args) {ThreadPoolExecutor threadPoolExecutor = CommonUtils.initThreadPoolExecutor();List<CompleteTableFutureTaskTest.Task> taskList = Lists.newArrayList();for (int i = 0; i < 10; i++) {CompleteTableFutureTaskTest.Task task = new CompleteTableFutureTaskTest.Task();taskList.add(task);}long startTime = System.currentTimeMillis();List<CompletableFuture<String>> completableFutureList = taskList.stream().map(task -> CompletableFuture.supplyAsync(()-> task.doWork(), threadPoolExecutor)).collect(Collectors.toList());List<String> result = completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());CommonUtils.printThreadLog("result.size: "+ result.size()+", 耗时:" + (System.currentTimeMillis() - startTime));//手动销毁线程池threadPoolExecutor.shutdown();}static class Task{public String doWork(){CommonUtils.sleepMills(1000L);CommonUtils.printThreadLog("doWork");return "doWork";}}
}
//执行结果(10条结果,一次性打出来):
1712326568638 | 17-custom-thread-pool | doWork
1712326568638 | 15-custom-thread-pool | doWork
1712326568635 | 16-custom-thread-pool | doWork
1712326568635 | 12-custom-thread-pool | doWork
1712326568636 | 11-custom-thread-pool | doWork
1712326568635 | 19-custom-thread-pool | doWork
1712326568638 | 18-custom-thread-pool | doWork
1712326568635 | 14-custom-thread-pool | doWork
1712326568636 | 20-custom-thread-pool | doWork
1712326568636 | 13-custom-thread-pool | doWork
1712326568746 | 1-main | result.size: 10, 耗时:1734

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/596033.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C#开发中一些常用的工具类分享

一、配置文件读写类 用于在开发时候C#操作配置文件读写信息 1、工具类 ReadIni 代码 using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Runtime.InteropServices; using System.Text; using System.Threading.Tasks;namesp…

LeetCode 1017. 负二进制转换

解题思路 相关代码 class Solution {public String baseNeg2(int n) {if(n0) return "0";String s"";while(n!0)if(Math.abs(n)%20){nn/(-2);ss0;}else{ss1; n (n-1)/(-2);}String t reverse(s);return t;}public String reverse(String s){Str…

ZYNQ学习Linux 基础外设的使用

基本都是摘抄正点原子的文章&#xff1a;《领航者 ZYNQ 之嵌入式Linux 开发指南 V3.2.pdf》&#xff0c;因初次学习&#xff0c;仅作学习摘录之用&#xff0c;有不懂之处后续会继续更新~ 工程的创建参考&#xff1a;《ZYNQ学习之Petalinux 设计流程实战》 一、GPIO 之 LED 的使…

自定义实现shell/bash

文章目录 函数和进程之间的相似性shell打印提示符&#xff0c;以及获取用户输入分割用户的输入判断是否是内建命令执行相关的命令 全部代码 正文开始前给大家推荐个网站&#xff0c;前些天发现了一个巨牛的 人工智能学习网站&#xff0c; 通俗易懂&#xff0c;风趣幽默&#…

(二)小案例银行家应用程序-创建DOM元素

● 上图的数据很明显是从我们账户数组中拿到了&#xff0c;我们刚刚学习了forEach&#xff0c;所以我们使用forEach来创建我们的DOM元素&#xff1b; const displayMovements function (movements) {movements.forEach((mov, i) > {const type mov > 0 ? deposit : w…

通用开发技能系列:Scrum、Kanban等敏捷管理策略

云原生学习路线导航页&#xff08;持续更新中&#xff09; 本文是 通用开发技能系列 文章&#xff0c;主要对编程通用技能 Scrum、Kanban等敏捷管理策略 进行学习 1.什么是敏捷开发 敏捷是一个描述软件开发方法的术语&#xff0c;它强调增量交付、团队协作、持续规划和持续学习…

github本地仓库push到远程仓库

1.从远程仓库clone到本地 2.生成SSH秘钥&#xff0c;为push做准备 在Ubuntu命令行输入一下内容 [rootlocalhost ~]# ssh-keygen -t rsa < 建立密钥对&#xff0c;-t代表类型&#xff0c;有RSA和DSA两种 Generating public/private rsa key pair. Enter file in whi…

HTTP详解及代码实现

HTTP详解及代码实现 HTTP超文本传输协议 URL简述状态码常见的状态码 请求方法请求报文响应报文HTTP常见的HeaderHTTP服务器代码 HTTP HTTP的也称为超文本传输协议。解释HTTP我们可以将其分为三个部分来解释&#xff1a;超文本&#xff0c;传输&#xff0c;协议。 超文本 加粗样…

上线部署流程

音频地址&#xff1a;上线部署流程_小蒋聊技术在线播放免费听 - 喜马拉雅手机版 时间&#xff1a;2024年04月06日 作者&#xff1a;小蒋聊技术 邮箱&#xff1a;wei_wei10163.com 微信&#xff1a;wei_wei10 背景 大家好&#xff0c;欢迎来到小蒋聊技术&#xff0c;小蒋准…

正排索引 vs 倒排索引 - 搜索引擎具体原理

阅读导航 一、正排索引1. 概念2. 实例 二、倒排索引1. 概念2. 实例 三、正排 VS 倒排1. 正排索引优缺点2. 倒排索引优缺点3. 应用场景 三、搜索引擎原理1. 宏观原理2. 具体原理 一、正排索引 1. 概念 正排索引是一种索引机制&#xff0c;它将文档或数据记录按照某种特定的顺序…

Python 基于列表实现的通讯录管理系统(有完整源码)

目录 通讯录管理系统 PersonInformation类 ContactList类 menu函数 main函数 程序的运行流程 完整代码 运行示例 通讯录管理系统 这是一个基于文本的界面程序&#xff0c;用户可以通过命令行与之交互&#xff0c;它使用了CSV文件来存储和读取联系人信息&#xff0c;这…

开源数学计算软件Maxima基础学习

在Maxima中计算四则运算可以直接使用数学符号&#xff0c;在输入完公式后使用 EnterShift 快捷键进行计算 (%i1)11 输出 (%o1)2 这里面的 (%i1) 代表 input1 第1号输入&#xff0c;(%o1) 代表 output1 第1号输出。在执行计算后&#xff0c;(%i1)11 这一行命令后会出现一个…