Java中CompletableFuture 异步编排的基本使用

一、前言

        在复杂业务场景中,有些数据需要远程调用,导致查询时间缓慢,影响以下代码逻辑运行,并且这些浪费时间的逻辑与以后的请求并没有关系,这样会大大增加服务的时间。

        假如商品详情页的每个查询,需要如下标注的时间才能完成 。那么,用户需要 5.5s 后才能看到商品详情页的内容。很显然是不能接受的。 如果有多个线程同时完成这 6 步操作,也许只需要 1.5s 即可完成响应。

 

        在 Java 8 , 新增加了一个包含 50 个方法左右的类 : CompletableFuture ,提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以 通过回调的方式处理计算结果,并且提供了转换和组合 CompletableFuture 的方法。 CompletableFuture 类实现了 Future 接口,所以你还是可以像以前一样通过 `get` 方法阻塞或 者轮询的方式获得结果,但是这种方式不推荐使用。 CompletableFuture 和 FutureTask 同属于 Future 接口的实现类,都可以获取线程的执行结果。

1、创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作。

 

public static Completab1eFuture runAsync(Runnable runnable)
public static completableFuturecVoid> runAsync(Runnable runnable,Executor executor)
public static CompletableFuture supplyAsync(Suppliersupplier)
public static CompletableFuturecU> supplyAsync(Supplier supplier,Executor executor)
1 runXxxx 都是没有返回结果的, supplyXxx 都是可以获取返回结果的
2 、可以传入自定义的线程池,否则就用默认的线程池;
3、Async代表异步方法

 

1.1 runAsync 不带返回值

public class ThreadTest {//        ExecutorService executorService = Executors.newFixedThreadPool(10);public static ThreadPoolExecutor executor = new ThreadPoolExecutor(  5,200,10,TimeUnit.SECONDS,new LinkedBlockingDeque<>(  100000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) {CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {System.out.println("当前线程:"+Thread.currentThread().getName());int i = 10 / 2;System.out.println("运行结果...."+i);}, executor);}
}

1.2 supplyAsync 带返回值 

public class ThreadTest {//        ExecutorService executorService = Executors.newFixedThreadPool(10);public static ThreadPoolExecutor executor = new ThreadPoolExecutor(  5,200,10,TimeUnit.SECONDS,new LinkedBlockingDeque<>(  100000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 12 / 2;System.out.println("运行结果...." + i);return i;}, executor);Integer integer = supplyAsync.get();System.out.println("返回数据:"+integer);}
}

 2、计算完成时回调方法

public completableFuture whencomplete(BiConsumer<? super T,? super Throwable> action);
public CompletableFuturewhenCompleteAsync(BiConsumer <? super T,? super Throwable> action);
public completableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action,Executor executor);
public completableFutureexceptionally(Function<Throwable,? extends T> fn);
whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。
whenComplete 和 whenCompleteAsync 的区别:
whenComplete: 是执行当前任务的线程执行继续执行 whenComplete 的任务。
whenCompleteAsync: 是执行把 whenCompleteAsync 这个任务继续提交给线程池
来进行执行。
方法不以 Async 结尾, 意味着 Action 使用相同的线程执行, 而 Async 可能会使用其他线程执行(如果是使用相同的线程池, 也可能会被同一个线程选中执行)

2.1 whenCompleteAsync 完成回调 (没有异常情况情况)

public class ThreadTest {//        ExecutorService executorService = Executors.newFixedThreadPool(10);public static ThreadPoolExecutor executor = new ThreadPoolExecutor(  5,200,10,TimeUnit.SECONDS,new LinkedBlockingDeque<>(  100000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 12 / 2;System.out.println("运行结果...." + i);return i;}, executor).whenCompleteAsync((res, exception) -> {System.out.println("异步任务完成....感知到返回值为:"+res+"异常:"+exception);},executor);Integer integer = supplyAsync.get();System.out.println("返回数据:"+integer);}
}

 有异常情况

public class ThreadTest {//        ExecutorService executorService = Executors.newFixedThreadPool(10);public static ThreadPoolExecutor executor = new ThreadPoolExecutor(  5,200,10,TimeUnit.SECONDS,new LinkedBlockingDeque<>(  100000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 12 / 0;System.out.println("运行结果...." + i);return i;}, executor).whenCompleteAsync((res, exception) -> {System.out.println("异步任务完成....感知到返回值为:"+res+"异常:"+exception);},executor);Integer integer = supplyAsync.get();System.out.println("返回数据:"+integer);}
}

此处虽然得到了异常信息但是没有办法修改返回数据,使用exceptionally自定义异常时的返回值 

 2.2 exceptionally 异常感知及处理

异常情况

public class ThreadTest {//        ExecutorService executorService = Executors.newFixedThreadPool(10);public static ThreadPoolExecutor executor = new ThreadPoolExecutor(  5,200,10,TimeUnit.SECONDS,new LinkedBlockingDeque<>(  100000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 12 / 0;System.out.println("运行结果...." + i);return i;}, executor).whenCompleteAsync((res, exception) -> {System.out.println("异步任务完成....感知到返回值为:"+res+"异常:"+exception);},executor).exceptionally(throwable -> {return 0;});Integer integer = supplyAsync.get();System.out.println("返回数据:"+integer);}
}

 无异常,情况正常返回不会进exceptionally

public class ThreadTest {//        ExecutorService executorService = Executors.newFixedThreadPool(10);public static ThreadPoolExecutor executor = new ThreadPoolExecutor(  5,200,10,TimeUnit.SECONDS,new LinkedBlockingDeque<>(  100000),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 12 / 2;System.out.println("运行结果...." + i);return i;}, executor).whenCompleteAsync((res, exception) -> {System.out.println("异步任务完成....感知到返回值为:"+res+"异常:"+exception);},executor).exceptionally(throwable -> {return 0;});Integer integer = supplyAsync.get();System.out.println("返回数据:"+integer);}
}

2.3 最终处理 handle 方法

和 complete 一样, 可对结果做最后的处理(可处理异常),可改变返回值。

总结:使用R apply(T t, U u); 可以感知异常,和修改返回值的功能。

public completionStage handle(BiFunction<? super T,Throwable,? extends U> fn);
public completionStagehandleAsync(BiFunction<? super T,Throwable,? extends U> fn);
public > CompletionStage handleAsync(BiFunction<? super T,Throwable,? extends U> fn,Executor executor ) ;

 有异常情况

	public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 12 / 0;System.out.println("运行结果...." + i);return i;}, executor).handleAsync((res, throwable) -> {if (res!=null){return res*2;}if (throwable!=null){System.out.println("出现异常"+throwable.getMessage());return -1;}return 0;},executor);Integer integer = supplyAsync.get();System.out.println("返回数据:"+integer);}

无异常情况 

	public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程:" + Thread.currentThread().getName());int i = 12 / 6;System.out.println("运行结果...." + i);return i;}, executor).handleAsync((res, throwable) -> {if (res!=null){return res*2;}if (throwable!=null){System.out.println("出现异常"+throwable.getMessage());return -1;}return 0;},executor);Integer integer = supplyAsync.get();System.out.println("返回数据:"+integer);}
2.3.1总结 

总结:一般用handle,因为whencomplete如果异常不能给定默认返回结果,需要再调用exceptionally,而handle可以

该方法作用:获得前一任务的返回值【自己也可以是异步执行的】,也可以处理上一任务的异常,调用exceptionally修改前一任务的返回值【例如异常情况时给一个默认返回值】而handle方法可以简化操作


以下用法大致相同,只列举具体方法 

 2.4 线程串行化方法

public CompletableFuture thenApply(Function<? super T,? extends U> fn)
public Completab1eFuture thenApplyAsync(Function<? super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function<? super T,? extends U> fn,Executor executor)public completionstage thenAccept(Consumer<? super T> action);
public completionStage thenAcceptAsync(Consumer<? super T> action);
public CompletionStagecVoid> thenAcceptAsync(Consumer<? super T> action,Executor executor);public Completionstage thenRun(Runnable action);
public Completionstage thenRunAsync(Runnable action);
public completionStage thenRunAsync(Runnable action,Executor executor);

 thenApply:继续执行,感知上一任务的返回结果,并且自己的返回结果也被下一个任务所感知
thenAccept:继续执行,接受上一个任务的返回结果,自己执行完没有返回结果
thenRun:继续执行,不接受上一个任务的返回结果,自己执行完也没有返回结果
以上都要前置任务成功完成。
Function<? super T,? extends U>
T: 上一个任务返回结果的类型
U: 当前任务的返回值类型

 2.5 两任务组合 - 都要完成

public <U,V> CompletableFuture thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletableFuture thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletableFuture thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);public CompletableFuture thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);public CompletableFuture thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);public CompletableFuture thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor);public CompletableFuture runAfterBoth(CompletionStage<?> other, Runnable action);public CompletableFuture runAfterBothAsync(CompletionStage<?> other, Runnable action);public CompletableFuture runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor);

thenCombine:组合两个future,获取前两个future的返回结果,并返回当前任务的返回值
thenAcceptBoth:组合两个future,获取前两个future任务的返回结果,然后处理任务,没有返回值。
runAfterBoth:组合两个future,不需要获取之前任务future的结果,只需两个future处理完任务后,处理该任务。

 2.5.1 runAfterBothAsync
 public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {System.out.println("任务一线程开始:" + Thread.currentThread().getName());int i = 12 / 2;System.out.println("任务一运行结束...." + i);return i;}, executor);CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {System.out.println("任务二线程开始:" + Thread.currentThread().getName());System.out.println("任务二运行结束....");return "hello";}, executor);future01.runAfterBothAsync(future02,() -> {System.out.println("任务三开始...");});System.out.println("返回数据:");}

 2.6 两个任务 - 一个完成

  1.  applyToEither: 两个任务有一个执行完成, 获取它的返回值, 处理任务并有新的返回值。
  2. acceptEither: 两个任务有一个执行完成, 获取它的返回值, 处理任务, 没有新的返回值。
  3. runAfterEither: 两个任务有一个执行完成, 不需要获取 future 的结果, 处理任务, 也没有返回值。

2.7 多任务组合 

//allOf: 等待所有任务完成
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {return andTree(cfs, 0, cfs.length - 1);
}//anyOf: 只要有一个任务完成
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {return orTree(cfs, 0, cfs.length - 1);
}
 2.7.1 allOf
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {System.out.println("任务一线程开始:" + Thread.currentThread().getName());int i = 12 / 2;System.out.println("任务一运行结束...." + i);return i;}, executor);CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {System.out.println("任务二线程开始:" + Thread.currentThread().getName());try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务二运行结束....");return "hello";}, executor);CompletableFuture<Object> future03 = CompletableFuture.supplyAsync(() -> {System.out.println("任务三线程开始:" + Thread.currentThread().getName());try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务三运行结束....");return "hello2";}, executor);CompletableFuture<Void> allOf = CompletableFuture.allOf(future01, future02, future03);allOf.get();//等待所有任务完成System.out.println("返回数据:");}
 2.7.2 anyOf
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {System.out.println("任务一线程开始:" + Thread.currentThread().getName());int i = 12 / 2;System.out.println("任务一运行结束...." + i);return i;}, executor);CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {System.out.println("任务二线程开始:" + Thread.currentThread().getName());try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务二运行结束....");return "hello";}, executor);CompletableFuture<Object> future03 = CompletableFuture.supplyAsync(() -> {System.out.println("任务三线程开始:" + Thread.currentThread().getName());try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务三运行结束....");return "hello2";}, executor);CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future01, future02, future03);anyOf.get();//等待其中之一任务完成System.out.println("返回数据:");}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/338074.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【高等数学之泰勒公式】

一、从零开始 1.1、泰勒中值定理1 什么是泰勒公式?我们先看看权威解读: 那么我们从古至今到底是如何创造出泰勒公式的呢? 由上图可知&#xff0c;任一无穷小数均可以表示成用一系列数字的求和而得出的结果&#xff0c;我们称之为“无穷算法”。 那么同理我们想对任一曲线来…

Every Nobody Is Somebody 「每小人物都能成大事」

周星驰 NFT Nobody即将发售&#xff0c;Nobody共创平台 Every Nobody Is Somebody Nobody 关于Nobody&#xff1a;Nobody是一款Web3共创平台&#xff0c;旨在为创作者提供一个交流和合作的场所&#xff0c;促进创意的产生和共享。通过该平台&#xff0c;创作者可以展示自己的作…

程序员试用期转正工作总结

一、试用期工作总结 在公司的三个月试用期中&#xff0c;我完成了以下工作&#xff1a; 完成了XX个功能模块的开发&#xff0c;包括XX模块、XX模块和XX模块。参与了XX个项目的开发和上线&#xff0c;其中XX项目、XX项目和XX项目是我主导的。优化了现有系统的性能&#xff0c;特…

Linux最常用的几个系统管理命令

文章目录 Linux最常用的几个系统管理命令查看网络信息的原初 ifconfig默认无参数使用-s显示短列表配置IP地址修改MTU启动关闭网卡 显示进程状态 ps语法几个实例默认情况显示所有进程查找特定进程信息 任务管理器的 top常规使用显示完整命令设置信息更新次数设置信息更新时间显示…

自行车商城网站网页设计与制作web前端设计html+css+js成品。电脑网站制作代开发。vscodeDrea

【自行车商城网站网页设计与制作web前端设计htmlcssjs成品。电脑网站制作代开发。vscodeDrea】 https://www.bilibili.com/video/BV1wT4y1p7jq/?share_sourcecopy_web&vd_sourced43766e8ddfffd1f1a1165a3e72d7605

知识】分享几个摄像头的选型相关知识

【知识】分享几个摄像头的选型相关知识 目录 【知识】分享几个摄像头的选型相关知识一、前言二、正文1、先了解一下监控摄像头的种类1.1、云台型&#xff08;云台型一体摄像机&#xff09;1.2、枪机型&#xff08;枪型摄像机&#xff09;1.3、球机型&#xff08;球型摄像机&…

二叉树及其实现

二叉树 一.树的概念及结构1.1树的概念1.2相关概念 2.二叉树的概念及结构2.1 概念2.2 特殊的二叉树 3.二叉树的遍历3.1 前序、中序以及后序遍历3.2 层序遍历3.3 判断二叉树是否是完全二叉树3.4 二叉树的高度3.5 二叉树的叶子节点个数3.6 二叉树的第k层的节点个数3.7 二叉树销毁3…

SpringBoot+RocketMQ集群(dledger)部署完整学习笔记

文章目录 前言一、单台集群部署二、多台集群部署1.修改配置2.dashboard修改 三、整合springboot1.引入pom和修改yml2.编写消费者3.编写生产者4.测试效果 总结 前言 RocketMQ集群方式有好几种 官网地址 https://rocketmq.apache.org/zh/docs/4.x/deployment/01deploy 2m-2s-asy…

国产AI工具钉钉AI助理:开启个性化助手服务的新篇章

钉钉AI助理是钉钉平台的一项功能&#xff0c;它可以根据用户的需求提供个性化的AI助手服务。用户可以在AI助理页面一键创建个性化的AI助理&#xff0c;如个人的工作AI助理、旅游AI助理、资讯AI助理等。企业也可以充分使用企业所沉淀的知识库和业务数据&#xff0c;在获得授权后…

C++模板——(4)C++泛型编程与标准模板库简介

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd; 勤奋&#xff0c;机会&#xff0c;乐观…

Python基础学习(一)

Python基础语法学习记录 输出 将结果或内容呈现给用户 print("休对故人思故国&#xff0c;且将新火试新茶&#xff0c;诗酒趁年华") # 输出不换行&#xff0c;并且可以指定以什么字符结尾 print("青山依旧在",end ",") print("几度夕阳红…

66.网游逆向分析与插件开发-角色数据的获取-角色类的数据分析与C++还原

内容来源于&#xff1a;易道云信息技术研究院VIP课 ReClass.NET工具下载&#xff0c;它下方链接里的 逆向工具.zip 里的reclass目录下&#xff1a;注意它分x64、x32版本&#xff0c;启动是用管理员权限启动否则附加时有些进程附加不上 链接&#xff1a;https://pan.baidu.com/…