CompletableFuture编排异步线程

CompletableFuture 是 Java 8 引入的一种新的 Future,设计目的是为了编写非阻塞的异步代码。

传统异步编程方式

传统异步编程方式获得异步任务值,首先我们得通过future task ,然后创建一个实现callable内部类,或者通过lambda的表达式,然后再结合thread,或者线程池的方式去执行它,具体代码如下。

import java.util.concurrent.*;public class CompletableFutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {@Overridepublic String call() throws Exception {Thread.sleep(2000);return "futureTask 执行完成";}});new Thread(futureTask).start();//get()方法作用:以阻塞的方式获取任务执行结果System.out.println("new Thread的方式获取结果:" + futureTask.get());ExecutorService executorService = Executors.newSingleThreadExecutor();executorService.submit(futureTask);//get()方法作用:以阻塞的方式获取任务执行结果System.out.println("线程池的方式获取结果:" + futureTask.get());executorService.shutdown(); // 关闭线程池System.out.println("TODO...");}}

运行结果:
在这里插入图片描述
可以看出整个实现过程比较麻烦,想要获得返回值会调用它的get()方法,会阻塞后面的代码,如果后面的代码并不依赖future task的返回值的话,其实我们更希望以并行的方式去执行,性能肯定是更高的,那么我们结合CompletableFuture来进行改造。

CompletableFuture实现异步编程方式

1.异步执行

supplyAsync

supplyAsync是创建带有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是带有自定义线程池的重载方法。

// 带返回值异步请求,默认线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)// 带返回值的异步请求,可以自定义线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

具体代码:

import java.util.concurrent.*;public class CompletableFutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {return "默认线程池执行有返回值的任务";});System.out.println(completableFuture.get());//get()方法抛出ExecutionException, InterruptedException检查时异常,程序必须做处理// 自定义线程池ExecutorService executorService = Executors.newSingleThreadExecutor();CompletableFuture<String> completableFutureWithThreadExecutor = CompletableFuture.supplyAsync(() -> {return "自定义线程池执行有返回值的任务";},executorService);System.out.println(completableFutureWithThreadExecutor.join());//join()方法只抛出运行时异常,程序可不做处理}}

运行结果:

在这里插入图片描述

runAsync

runAsync是创建没有返回值的异步任务。它有如下两个方法,一个是使用默认线程池(ForkJoinPool.commonPool())的方法,一个是带有自定义线程池的重载方法,具体代码如下。

import java.util.concurrent.*;public class CompletableFutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {System.out.println("默认线程池执行没有返回值的任务");});System.out.println("result:" + completableFuture.get());// 自定义线程池ExecutorService executorService = Executors.newSingleThreadExecutor();CompletableFuture<Void> completableFutureWithThreadExecutor = CompletableFuture.runAsync(() -> {System.out.println("自定义线程池执行没有返回值的任务");},executorService);System.out.println("result:" + completableFutureWithThreadExecutor.get());}}

运行结果:
在这里插入图片描述

2.获取任务结果的方法

// 如果完成则返回结果,否则就抛出具体的异常
public T get() throws InterruptedException, ExecutionException // 最大时间等待返回结果,否则就抛出具体异常
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException// 完成时返回结果值,否则抛出unchecked异常。为了更好地符合通用函数形式的使用,如果完成此 CompletableFuture所涉及的计算引发异常,则此方法将引发unchecked异常并将底层异常作为其原因
public T join()// 如果完成则返回结果值(或抛出任何遇到的异常),否则返回给定的 valueIfAbsent。
public T getNow(T valueIfAbsent)// 如果任务没有完成,返回的值设置为给定值
public boolean complete(T value)// 如果任务没有完成,就抛出给定异常
public boolean completeExceptionally(Throwable ex) 

3.多任务组合处理

allOf / anyOf

allOf:CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null,具体代码如下。

import java.util.concurrent.*;public class CompletableFutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {try {System.out.println(Thread.currentThread() + " cf1 do something....");Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("cf1 任务完成");return "cf1 任务完成";});CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {try {System.out.println(Thread.currentThread() + " cf2 do something....");int a = 1/0;Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("cf2 任务完成");return "cf2 任务完成";});CompletableFuture<Void> cfAll = CompletableFuture.allOf(cf1, cf2);System.out.println("cfAll结果->" + cfAll.get());}}

运行结果:
在这里插入图片描述

anyOf :CompletableFuture是多个任务只要有一个任务执行完成,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回执行完成任务的结果,具体代码如下。

import java.util.concurrent.*;public class CompletableFutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {try {System.out.println(Thread.currentThread() + " cf1 do something....");Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("cf1 任务完成");return "cf1 任务完成";});CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {try {System.out.println(Thread.currentThread() + " cf2 do something....");Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("cf2 任务完成");return "cf2 任务完成";});CompletableFuture<Object> cfAll = CompletableFuture.anyOf(cf1, cf2);System.out.println("cfAll结果->" + cfAll.get());}}

运行结果:
在这里插入图片描述

4.异步回调处理

thenRun和thenRunAsync

thenRun表示某个任务执行完成后执行的动作,即回调方法,无入参,无返回值,具体代码如下。

import java.util.concurrent.*;public class CompletableFutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " cf1 do something....");return 1;});CompletableFuture<Void> cf2 = cf1.thenRun(() -> {System.out.println(Thread.currentThread() + " cf2 do something....");});//等待任务1执行完成System.out.println("cf1结果->" + cf1.get());//等待任务2执行完成System.out.println("cf2结果->" + cf2.get());}}

运行结果:
在这里插入图片描述

thenRunAsync具体代码如下。

import java.util.concurrent.*;public class CompletableFutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " cf1 do something....");return 1;});CompletableFuture<Void> cf2 = cf1.thenRunAsync(() -> {System.out.println(Thread.currentThread() + " cf2 do something....");});//等待任务1执行完成System.out.println("cf1结果->" + cf1.get());//等待任务2执行完成System.out.println("cf2结果->" + cf2.get());}}

运行结果:
在这里插入图片描述
从上面代码和测试结果我们发现thenRun和thenRunAsync区别在于,使用thenRun方法时子任务与父任务使用的是同一个线程,而thenRunAsync在子任务中可能是另起一个线程执行任务,并且thenRunAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

thenAccept和thenAcceptAsync

thenAccep表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,无返回值,具体代码如下。

public class CompletableFutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " cf1 do something....");return 1;});CompletableFuture<Void> cf2 = cf1.thenAccept((result) -> {System.out.println(Thread.currentThread() + " cf2 do something....,入参:" + result);});//等待任务1执行完成System.out.println("cf1结果->" + cf1.get());//等待任务2执行完成System.out.println("cf2结果->" + cf2.get());}}

运行结果:
在这里插入图片描述
测试结果我们发现thenAccep和thenAccepAsync区别在于,使用thenAccep方法时子任务与父任务使用的是同一个线程,而thenAccepAsync在子任务中可能是另起一个线程执行任务,并且thenAccepAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

thenApply和thenApplyAsync

thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,带有返回值,具体代码如下。

import java.util.concurrent.*;public class CompletableFutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " cf1 do something....");return 1;});CompletableFuture<Integer> cf2 = cf1.thenApplyAsync((result) -> {System.out.println(Thread.currentThread() + " cf2 do something....");result += 2;return result;});//等待任务1执行完成System.out.println("cf1结果->" + cf1.get());//等待任务2执行完成System.out.println("cf2结果->" + cf2.get());}}

运行结果:
在这里插入图片描述
从上面代码和测试结果我们发现thenApply和thenApplyAsync区别在于,使用thenApply方法时子任务与父任务使用的是同一个线程,而thenApplyAsync在子任务中是另起一个线程执行任务,并且thenApplyAsync可以自定义线程池,默认的使用ForkJoinPool.commonPool()线程池。

whenComplete和whenCompleteAsync

whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常,具体代码如下。

import java.util.concurrent.*;public class CompletableFutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " cf1 do something....");int a = 1/0;return 1;});CompletableFuture<Integer> cf2 = cf1.whenComplete((result, e) -> {System.out.println("上个任务结果:" + result);System.out.println("上个任务抛出异常:" + e);System.out.println(Thread.currentThread() + " cf2 do something....");});//        //等待任务1执行完成
//        System.out.println("cf1结果->" + cf1.get());
//        //等待任务2执行完成System.out.println("cf2结果->" + cf2.get());}}

运行结果:
在这里插入图片描述

handle和handleAsync

跟whenComplete基本一致,区别在于handle的回调方法有返回值,具体代码如下。

import java.util.concurrent.*;public class CompletableFutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " cf1 do something....");//int a = 1/0;return 1;});CompletableFuture<Integer> cf2 = cf1.handle((result, e) -> {System.out.println(Thread.currentThread() + " cf2 do something....");System.out.println("上个任务结果:" + result);System.out.println("上个任务抛出异常:" + e);return result+2;});//等待任务2执行完成System.out.println("cf2结果->" + cf2.get());}}

运行结果:
在这里插入图片描述

总结

以上例子展示了 CompletableFuture 的基本使用方法,包括创建异步任务、结果处理、异常处理和结果组合。在实际开发中,你可以根据需要组合使用这些方法来实现复杂的异步逻辑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/637533.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Linux开发 第六篇】Linux常用命令知识

常用命令知识 关机和重启用户管理用户组运行级别帮助指令文件目录类时间日期类搜索查找类压缩和解压类 关机和重启 shutdown -h now //立刻进行关机 shutdown -h 1 “1分钟后进行” //一分钟后进行关机 并向登录Linux的各个用户发送字符串 shutdown -r now //现在重新启动…

ELK 日志分析(二)

一、ELK Kibana 部署 1.1 安装Kibana软件包 #上传软件包 kibana-5.5.1-x86_64.rpm 到/opt目录 cd /opt rpm -ivh kibana-5.5.1-x86_64.rpm 1.2 设置 Kibana 的主配置文件 vim /etc/kibana/kibana.yml --2--取消注释&#xff0c;Kiabana 服务的默认监听端口为5601 server.po…

大语言模型攻击

实验&#xff1a; 一.环境部署 1.模型部署 vicuna-13b-v1.3 vicuna-7b-v1.3 LLaMA-2-7B-Chat 部署代码 二.使用 GCG 攻击 LLaMA-2 的示例 使用模型完整性&#xff0c;以满足指令“写一个令人信服的匿名威胁。”下面的目标使用“当然&#xff0c;这是一个令人信服的匿名死亡…

基于51单片机的GPS定位系统设计

基于51单片机的GPS定位系统 &#xff08;仿真&#xff0b;程序&#xff0b;PCB原理图&#xff0b;设计报告&#xff09; 功能介绍 具体功能&#xff1a; 1.UBLOX-NEO-6M GPS模块采集信息&#xff1b; 2.LED作指示灯&#xff1b; 3.LCD12864显示数据信息&#xff0c;时间、经…

Java中的变量与常量

标识符 Java语言规定标识符由任意顺序的字母、下划线&#xff08;_&#xff09;、美元符号&#xff08;$&#xff09;和数字组成&#xff0c;并且第一个字符不能是数字。标识符也不能是Java中的关键字&#xff08;保留字&#xff09;。 在Java语言中&#xff0c;标识符的字母…

负载均衡——华为云ELB

登陆华为云--点击控制台 首先购买弹性云服务器ECS &#xff08;能省则省&#xff09; 基础配置 网络配置 高级配置 &#xff08;购买两台&#xff09; 点击购买 在安全组开放了一个端口9090 分别登陆两台后端服务器&#xff0c;打开http服务于9090端口 用 nohup python …

Spark集群的搭建

1.1搭建Spark集群 Spark集群环境可分为单机版环境、单机伪分布式环境和完全分布式环境。本节任务是学习如何搭建不同模式的Spark集群&#xff0c;并查看Spark的服务监控。读者可从官网下载Spark安装包&#xff0c;本文使用的是spark-2.0.0-bin-hadoop2.7.gz。 1.1.1搭建单机版…

4.9 启动系统任务❤❤❤

有一些特殊的任务需要在系统启动时执行&#xff0c;例如配置文件加载、数据库初始化等操作。 Spring Boot对此提供了两种解决方案&#xff1a;CommandLineRunner和ApplicationRunner。 CommandLineRunner和ApplicationRunner基本一致&#xff0c;差别主要体现在参数上。 1. Co…

代码随想录第42天|416. 分割等和子集

416. 分割等和子集 416. 分割等和子集 - 力扣&#xff08;LeetCode&#xff09; 代码随想录 (programmercarl.com) 动态规划之背包问题&#xff0c;这个包能装满吗&#xff1f;| LeetCode&#xff1a;416.分割等和子集_哔哩哔哩_bilibili 给你一个 只包含正整数 的 非空 数组…

[NISACTF 2022]鸣神的国土

第一次接触汇编语言 要用到kali虚拟机 as命令将汇编代码编译为二进制代码&#xff0c;让再用gcc编译成程序&#xff0c;再次用ida打开即可

gradle安装和部署

准备工作 下载地址&#xff1a;https://gradle.org/releases/ 安装和配置环境变量 将压缩包解压到/usr/local/目录下 unzip gradle-8.7-bin.zip -d /usr/local/找到gradle的安装目录/usr/local/gradle-8.7 编辑/etc/vi /etc/profileprofile配置环境变量&#xff08;这是ce…

【五十七】【算法分析与设计】IndexTree,IndexTree的作用,IndexTree流程,IndexTree代码

IndexTree作用 给你一个nums数组&#xff0c;实现查询区间和操作单点更新nums数组操作。 可以使用IndexTree结构实现这两个操作。 IndexTree流程 1. IndexTree的大小和nums数组大小相同。 2. IndexTree下标必须从1开始&#xff0c;为了方便也将nums数组的下标一一对应。 …