1 CompletableFuture介绍
平时多线程开发一般就是使用Runnable,Callable,Thread,FutureTask,ThreadPoolExecutor这些内容和并发编程息息相关。相对来对来说成本都不高,多多使用是可以熟悉这些内容。这些内容组合在一起去解决一些并发编程的问题时,很多时候没有办法很方便的去完成异步编程的操作。
Thread + Runnable:执行异步任务,但是没有返回结果
Thread + Callable + FutureTask:完整一个可以有返回结果的异步任务
-
获取返回结果,如果基于get方法获取,线程需要挂起在WaitNode里
-
获取返回结果,也可以基于isDone判断任务的状态,但是这里需要不断轮询
上述的方式都是有一定的局限性的。
比如说任务A,任务B,还有任务C。其中任务B还有任务C执行的前提是任务A先完成,再执行任务B和任务C。
如果任务的执行方式逻辑比较复杂,可能需要业务线程导出阻塞等待,或者是大量的任务线程去编一些任务执行的业务逻辑。对开发成本来说比较高。
CompletableFuture就是帮你处理这些任务之间的逻辑关系,编排好任务的执行方式后,任务会按照规划好的方式一步一步执行,不需要让业务线程去频繁的等待
2 CompletableFuture应用
CompletableFuture应用还是需要一些的成本的。
首先对CompletableFuture提供的函数式编程中三个函数有一个掌握
Supplier<U> // 生产者,没有入参,有返回结果
Consumer<T> // 消费者,有入参,但是没有返回结果
Function<T,U>// 函数,有入参,又有返回结果
1.supplyAsync
CompletableFuture如果不提供线程池的话,默认使用的ForkJoinPool,而ForkJoinPool内部是守护线程,如果main线程结束了,守护线程会跟着一起结束。
public static void main(String[] args) {// 生产者,可以指定返回结果CompletableFuture<String> firstTask = CompletableFuture.supplyAsync(() -> {System.out.println("异步任务开始执行");System.out.println("异步任务执行结束");return "返回结果";});String result1 = firstTask.join();String result2 = null;try {result2 = firstTask.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println(result1 + "," + result2);
}
2.runAsync
当前方式既不会接收参数,也不会返回任何结果,非常基础的任务编排方式
public static void main(String[] args) throws IOException {CompletableFuture.runAsync(() -> {System.out.println("任务go");System.out.println("任务done");});System.in.read();
}
3.thenApply,thenApplyAsync
有任务A,还有任务B。
任务B需要在任务A执行完毕后再执行。
而且任务B需要任务A的返回结果。
任务B自身也有返回结果。
thenApply可以拼接异步任务,前置任务处理完之后,将返回结果交给后置任务,然后后置任务再执行
thenApply提供了带有Async的方法,可以指定每个任务使用的具体线程池。
public static void main(String[] args) throws IOException {ExecutorService executor = Executors.newFixedThreadPool(10);/*CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {String id = UUID.randomUUID().toString();System.out.println("执行任务A:" + id);return id;});CompletableFuture<String> taskB = taskA.thenApply(result -> {System.out.println("任务B获取到任务A结果:" + result);result = result.replace("-", "");return result;});System.out.println("main线程拿到结果:" + taskB.join());*/CompletableFuture<String> taskB = CompletableFuture.supplyAsync(() -> {String id = UUID.randomUUID().toString();System.out.println("执行任务A:" + id + "," + Thread.currentThread().getName());return id;}).thenApplyAsync(result -> {System.out.println("任务B获取到任务A结果:" + result + "," + Thread.currentThread().getName());result = result.replace("-", "");return result;},executor);System.out.println("main线程拿到结果:" + taskB.join());
}
4.thenAccept,thenAcceptAsync
套路和thenApply一样,都是任务A和任务B的拼接
前置任务需要有返回结果,后置任务会接收前置任务的结果,返回后置任务没有返回值
public static void main(String[] args) throws IOException {CompletableFuture.supplyAsync(() -> {System.out.println("任务A");return "abcdefg";}).thenAccept(result -> {System.out.println("任务b,拿到结果处理:" + result);});System.in.read();
}
5.thenRun,thenRunAsync
套路和thenApply,thenAccept一样,都是任务A和任务B的拼接
前置任务没有返回结果,后置任务不接收前置任务结果,后置任务也会有返回结果
public static void main(String[] args) throws IOException {CompletableFuture.runAsync(() -> {System.out.println("任务A!!");}).thenRun(() -> {System.out.println("任务B!!");});System.in.read();
}
6.thenCombine,thenAcceptBoth,runAfterBoth
比如有任务A,任务B,任务C。任务A和任务B并行执行,等到任务A和任务B全部执行完毕后,再执行任务C。
A+B ------ C
基于前面thenApply,thenAccept,thenRun知道了一般情况三种任务的概念
thenCombine以及thenAcceptBoth还有runAfterBoth的区别是一样的。
public static void main(String[] args) throws IOException {CompletableFuture<Integer> taskC = CompletableFuture.supplyAsync(() -> {System.out.println("任务A");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return 78;}).thenCombine(CompletableFuture.supplyAsync(() -> {System.out.println("任务B");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return 66;}), (resultA, resultB) -> {System.out.println("任务C");int resultC = resultA + resultB;return resultC;});System.out.println(taskC.join());System.in.read();
}
7.applyToEither,acceptEither,runAfterEither
比如有任务A,任务B,任务C。任务A和任务B并行执行,只要任务A或者任务B执行完毕,开始执行任务C
A or B ----- C
applyToEither,acceptEither,runAfterEither三个方法拼接任务的方式都是一样的
区别依然是,可以接收结果并且返回结果,可以接收结果没有返回结果,不接收结果也没返回结果
public static void main(String[] args) throws IOException {CompletableFuture<Integer> taskC = CompletableFuture.supplyAsync(() -> {System.out.println("任务A");return 78;}).applyToEither(CompletableFuture.supplyAsync(() -> {System.out.println("任务B");return 66;}), resultFirst -> {System.out.println("任务C");return resultFirst;});System.out.println(taskC.join());System.in.read();
}
8.exceptionally,thenCompose,handle
exceptionally这个也是拼接任务的方式,但是只有前面业务执行时出现异常了,才会执行当前方法来处理
只有异常出现时,CompletableFuture的编排任务没有处理完时,才会触发thenCompose,handle
这两个也是异常处理的套路,可以根据方法描述发现,他的功能方向比exceptionally要更加丰富
thenCompose可以拿到返回结果同时也可以拿到出现的异常信息,但是thenCompose本身是Consumer不能返回结果。无法帮你捕获异常,但是可以拿到异常返回的结果。
handle可以拿到返回结果同时也可以拿到出现的异常信息,并且也可以指定返回托底数据。可以捕获异常的,异常不会抛出去。
public static void main(String[] args) throws IOException {CompletableFuture<Integer> taskC = CompletableFuture.supplyAsync(() -> {System.out.println("任务A");
// int i = 1 / 0;return 78;}).applyToEither(CompletableFuture.supplyAsync(() -> {System.out.println("任务B");return 66;}), resultFirst -> {System.out.println("任务C");return resultFirst;}).handle((r,ex) -> {System.out.println("handle:" + r);System.out.println("handle:" + ex);return -1;});/*.exceptionally(ex -> {System.out.println("exceptionally:" + ex);return -1;});*//*.whenComplete((r,ex) -> {System.out.println("whenComplete:" + r);System.out.println("whenComplete:" + ex);});*/System.out.println(taskC.join());System.in.read();}
9.allOf,anyOf
allOf的方式是让内部编写多个CompletableFuture的任务,多个任务都执行完后,才会继续执行你后续拼接的任务
allOf返回的CompletableFuture是Void,没有返回结果
public static void main(String[] args) throws IOException {CompletableFuture.allOf(CompletableFuture.runAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务A");}),CompletableFuture.runAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务B");}),CompletableFuture.runAsync(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务C");})).thenRun(() -> {System.out.println("任务D");});System.in.read();}
anyOf是基于多个CompletableFuture的任务,只要有一个任务执行完毕就继续执行后续,最先执行完的任务做作为返回结果的入参
public static void main(String[] args) throws IOException {CompletableFuture.anyOf(CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务A");return "A";}),CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务B");return "B";}),CompletableFuture.supplyAsync(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务C");return "C";})).thenAccept(r -> {System.out.println("任务D执行," + r + "先执行完毕的");});System.in.read();
}
3 CompletableFuture源码分析
1 当前任务执行方式
将任务和CompletableFuture封装到一起,再执行封装好的具体对象的run方法即可
// 提交任务到CompletableFuture
public static CompletableFuture<Void> runAsync(Runnable runnable) {// asyncPool:执行任务的线程池// runnable:具体任务。return asyncRunStage(asyncPool, runnable);
}// 内部执行的方法
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {// 对任务做非空校验if (f == null) throw new NullPointerException();// 直接构建了CompletableFuture的对象,作为最后的返回结果CompletableFuture<Void> d = new CompletableFuture<Void>();// 将任务和CompletableFuture对象封装为了AsyncRun的对象// 将封装好的任务交给了线程池去执行e.execute(new AsyncRun(d, f));// 返回构建好的CompletableFuturereturn d;
}// 封装任务的AsyncRun类信息
static final class AsyncRun extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {// 声明存储CompletableFuture对象以及任务的成员变量CompletableFuture<Void> dep; Runnable fn;// 将传入的属性赋值给成员变量AsyncRun(CompletableFuture<Void> dep, Runnable fn) {this.dep = dep; this.fn = fn;}// 当前对象作为任务提交给线程池之后,必然会执行当前方法public void run() {// 声明局部变量CompletableFuture<Void> d; Runnable f;// 将成员变量赋值给局部变量,并且做非空判断if ((d = dep) != null && (f = fn) != null) {// help GC,将成员变量置位null,只要当前任务结束后,成员变量也拿不到引用。dep = null; fn = null;// 先确认任务没有执行。if (d.result == null) {try {// 直接执行任务f.run();// 当前方法是针对Runnable任务的,不能将结果置位null// 要给没有返回结果的Runnable做一个返回结果d.completeNull();} catch (Throwable ex) {// 异常结束!d.completeThrowable(ex);}}d.postComplete();}}
}
2 任务编排的存储&执行方式
首先如果要在前继任务处理后,执行后置任务的话。
有两种情况:
-
前继任务如果没有执行完毕,后置任务需要先放在stack栈结构中存储
-
前继任务已经执行完毕了,后置任务就应该直接执行,不需要在往stack中存储了。
如果单独采用thenRun在一个任务后面指定多个后继任务,CompletableFuture无法保证具体的执行顺序,而影响执行顺序的是前继任务的执行时间,以及后置任务编排的时机。
3 任务编排流程
// 编排任务,前继任务搞定,后继任务再执行
public CompletableFuture<Void> thenRun(Runnable action) {// 执行了内部的uniRunStage方法,// null:线程池,现在没给。// action:具体要执行的任务return uniRunStage(null, action);
}// 内部编排任务方法
private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {// 后继任务不能为null,健壮性判断if (f == null) throw new NullPointerException();// 创建CompletableFuture对象d,与后继任务f绑定CompletableFuture<Void> d = new CompletableFuture<Void>();// 如果线程池不为null,代表异步执行,将任务压栈// 如果线程池是null,先基于uniRun尝试下,看任务能否执行if (e != null || !d.uniRun(this, f, null)) {// 如果传了线程池,这边需要走一下具体逻辑// e:线程池// d:后继任务的CompletableFuture// this:前继任务的CompletableFuture// f:后继任务UniRun<T> c = new UniRun<T>(e, d, this, f);// 将封装好的任务,push到stack栈结构// 只要前继任务没结束,这边就可以正常的将任务推到栈结构中// 放入栈中可能会失败push(c);// 无论压栈成功与否,都要尝试执行以下。c.tryFire(SYNC);}// 无论任务执行完毕与否,都要返回后继任务的CompletableFuturereturn d;
}
4 查看后置任务执行时机
任务在编排到前继任务时,因为前继任务已经结束了,这边后置任务会主动的执行
// 后置任务无论压栈成功与否,都需要执行tryFire方法
static final class UniRun<T> extends UniCompletion<T,Void> {Runnable fn;// executor:线程池// dep:后置任务的CompletableFuture// src:前继任务的CompletableFuture// fn:具体的任务UniRun(Executor executor, CompletableFuture<Void> dep,CompletableFuture<T> src, Runnable fn) {super(executor, dep, src); this.fn = fn;}final CompletableFuture<Void> tryFire(int mode) {// 声明局部变量CompletableFuture<Void> d; CompletableFuture<T> a;// 赋值局部变量// (d = dep) == null:赋值加健壮性校验if ((d = dep) == null ||// 调用uniRun。// a:前继任务的CompletableFuture// fn:后置任务// 第三个参数:传入的是this,是UniRun对象!d.uniRun(a = src, fn, mode > 0 ? null : this))// 进到这,说明前继任务没结束,等!return null;dep = null; src = null; fn = null;return d.postFire(a, mode);}
}// 是否要主动执行任务
final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) {// 方法要么正常结束,要么异常结束Object r; Throwable x;// a == null:健壮性校验// (r = a.result) == null:判断前继任务结束了么?// f == null:健壮性校验if (a == null || (r = a.result) == null || f == null)// 到这代表任务没结束。return false;// 后置任务执行了没? == null,代表没执行if (result == null) {// 如果前继任务的结果是异常结束。如果前继异常结束,直接告辞,封装异常结果if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)completeThrowable(x, r);else// 到这,前继任务正常结束,后置任务正常执行try {// 如果基于tryFire(SYNC)进来,这里的C不为null,执行c.claim// 如果是因为没有传递executor,c就是null,不会执行c.claimif (c != null && !c.claim())// 如果返回false,任务异步执行了,直接return falsereturn false;// 如果claim没有基于线程池运行任务,那这里就是同步执行// 直接f.run了。f.run();// 封装Null结果completeNull();} catch (Throwable ex) {// 封装异常结果completeThrowable(ex);}}return true;
}// 异步的线程池处理任务
final boolean claim() {Executor e = executor;if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {// 只要有线程池对象,不为nullif (e == null)return true;executor = null; // disable// 基于线程池的execute去执行任务e.execute(this);}return false;
}
前继任务执行完毕后,基于嵌套的方式执行后置。
// A:嵌套了B+C, B:嵌套了D+E
// 前继任务搞定,遍历stack执行后置任务
// A任务处理完,解决嵌套的B和C
final void postComplete() {// f:前继任务的CompletableFuture// h:存储后置任务的栈结构CompletableFuture<?> f = this; Completion h;// (h = f.stack) != null:赋值加健壮性判断,要确保栈中有数据while ((h = f.stack) != null ||// 循环一次后,对后续节点的赋值以及健壮性判断,要确保栈中有数据(f != this && (h = (f = this).stack) != null)) {// t:当前栈中任务的后续任务CompletableFuture<?> d; Completion t;// 拿到之前的栈顶h后,将栈顶换数据if (f.casStack(h, t = h.next)) {if (t != null) {if (f != this) {pushStack(h);continue;}h.next = null; // detach}// 执行tryFire方法,f = (d = h.tryFire(NESTED)) == null ? this : d;}}
}// 回来了 NESTED == -1
final CompletableFuture<Void> tryFire(int mode) {CompletableFuture<Void> d; CompletableFuture<T> a;if ((d = dep) == null ||!d.uniRun(a = src, fn, mode > 0 ? null : this))return null;dep = null; src = null; fn = null;// 内部会执行postComplete,运行B内部嵌套的D和Ereturn d.postFire(a, mode);
}
4 CompletableFuture执行流程图
5 CompletableFuture原理
CompletableFuture中包含两个字段:result和stack。result用于存储当前CF的结果,stack(Completion)表示当前CF完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的CF的计算,依赖动作可以有多个(表示有多个依赖它的CF),以栈(Treiber stack)的形式存储,stack表示栈顶元素。
CF基本结构
这种方式类似“观察者模式”,依赖动作(Dependency Action)都封装在一个单独Completion子类中。下面是Completion类关系结构图。CompletableFuture中的每个方法都对应了图中的一个Completion的子类,Completion本身是观察者的基类。
- UniCompletion继承了Completion,是一元依赖的基类,例如thenApply的实现类UniApply就继承自UniCompletion。
- BiCompletion继承了UniCompletion,是二元依赖的基类,同时也是多元依赖的基类。例如thenCombine的实现类BiRelay就继承自BiCompletion。
CF类图
6 CompletableFuture的设计思想
按照类似“观察者模式”的设计思想,原理分析可以从“观察者”和“被观察者”两个方面着手。由于回调种类多,但结构差异不大,所以这里单以一元依赖中的thenApply为例,不再枚举全部回调类型。如下图所示:
thenApply简图
6.1 被观察者
- 每个CompletableFuture都可以被看作一个被观察者,其内部有一个Completion类型的链表成员变量stack,用来存储注册到其中的所有观察者。当被观察者执行完成后会弹栈stack属性,依次通知注册到其中的观察者。上面例子中步骤fn2就是作为观察者被封装在UniApply中。
- 被观察者CF中的result属性,用来存储返回结果数据。这里可能是一次RPC调用的返回值,也可能是任意对象,在上面的例子中对应步骤fn1的执行结果。
6.2 观察者
CompletableFuture支持很多回调方法,例如thenAccept、thenApply、exceptionally等,这些方法接收一个函数类型的参数f,生成一个Completion类型的对象(即观察者),并将入参函数f赋值给Completion的成员变量fn,然后检查当前CF是否已处于完成状态(即result != null),如果已完成直接触发fn,否则将观察者Completion加入到CF的观察者链stack中,再次尝试触发,如果被观察者未执行完则其执行完毕之后通知触发。
- 观察者中的dep属性:指向其对应的CompletableFuture,在上面的例子中dep指向CF2。
- 观察者中的src属性:指向其依赖的CompletableFuture,在上面的例子中src指向CF1。
- 观察者Completion中的fn属性:用来存储具体的等待被回调的函数。这里需要注意的是不同的回调方法(thenAccept、thenApply、exceptionally等)接收的函数类型也不同,即fn的类型有很多种,在上面的例子中fn指向fn2。
7 整体流程
7.1 一元依赖
这里仍然以thenApply为例来说明一元依赖的流程:
- 将观察者Completion注册到CF1,此时CF1将Completion压栈。
- 当CF1的操作运行完成时,会将结果赋值给CF1中的result属性。
- 依次弹栈,通知观察者尝试运行。
执行流程简要说明
初步流程设计如上图所示,这里有几个关于注册与通知的并发问题,大家可以思考下:
Q1:在观察者注册之前,如果CF已经执行完成,并且已经发出通知,那么这时观察者由于错过了通知是不是将永远不会被触发呢 ? A1:不会。在注册时检查依赖的CF是否已经完成。如果未完成(即result == null)则将观察者入栈,如果已完成(result != null)则直接触发观察者操作。
Q2:在”入栈“前会有”result == null“的判断,这两个操作为非原子操作,CompletableFufure的实现也没有对两个操作进行加锁,完成时间在这两个操作之间,观察者仍然得不到通知,是不是仍然无法触发?
入栈校验
A2:不会。入栈之后再次检查CF是否完成,如果完成则触发。
Q3:当依赖多个CF时,观察者会被压入所有依赖的CF的栈中,每个CF完成的时候都会进行,那么会不会导致一个操作被多次执行呢 ?如下图所示,即当CF1、CF2同时完成时,如何避免CF3被多次触发。
多次触发
A3:CompletableFuture的实现是这样解决该问题的:观察者在执行之前会先通过CAS操作设置一个状态位,将status由0改为1。如果观察者已经执行过了,那么CAS操作将会失败,取消执行。
通过对以上3个问题的分析可以看出,CompletableFuture在处理并行问题时,全程无加锁操作,极大地提高了程序的执行效率。我们将并行问题考虑纳入之后,可以得到完善的整体流程图如下所示:
https://pic2.zhimg.com/v2-606323a07fb7e31cb91f46c879d99b8d_b.webp
完整流程
CompletableFuture支持的回调方法十分丰富,但是正如上一章节的整体流程图所述,他们的整体流程是一致的。所有回调复用同一套流程架构,不同的回调监听通过策略模式实现差异化。
知识来源:马士兵教育
CompletableFuture原理与实践-外卖商家端API的异步化 - 知乎