CompletableFuture
的前世今生
一、背景:Java 异步编程的演进
在 CompletableFuture
之前,Java 的异步编程主要通过线程、Future 等实现。
flowchart LRA[Before Java 5] -->|单线程| B[Main Thread]B --> C[手动创建线程]D[Java 5 Future] -->|线程池| E[异步任务]E --> F[阻塞获取结果]G[Java 8 CompletableFuture] -->|链式调用| H[非阻塞流水线]H --> I[结果转换]I --> J[异常处理]J --> K[组合多个任务]
1. Future
接口(Java 5+)
- 作用:通过
ExecutorService.submit()
提交任务,返回Future
对象,用于获取异步任务的结果。 - 局限性:
- 无法手动设置结果或异常。
- 无法组合多个异步操作(如任务A完成后触发任务B)。
- 只能通过阻塞的
get()
方法等待结果,缺乏非阻塞回调机制。
ExecutorService executor = Executors.newFixedThreadPool(2);// 1. 基本 Future 示例
Future<Integer> future = executor.submit(() -> {Thread.sleep(1000);return 42;
});// 阻塞获取结果(无法实现非阻塞回调)
Integer result = future.get(); // 2. 组合多个 Future 的困境
Future<String> f1 = executor.submit(task1);
Future<String> f2 = executor.submit(task2);// 需要手动协调多个 Future(复杂且容易出错)
String combined = f1.get() + f2.get();
2. 回调地狱(Callback Hell)
- 开发者需要通过嵌套回调处理多个异步任务,代码可读性和维护性差。
- 例如:数据库查询完成后触发网络请求,再触发文件写入操作。
// 传统 Future 实现(回调地狱)
Future<Order> orderFuture = queryOrder();
orderFuture.get(); // 阻塞
Future<Payment> paymentFuture = processPayment(orderFuture.get());
paymentFuture.get(); // 再次阻塞
Future<Notification> notifyFuture = sendNotification(paymentFuture.get());
3. 第三方库的尝试
- 如 Guava 的
ListenableFuture
,支持添加回调,但需要额外依赖。
二、CompletableFuture
的诞生(Java 8+)
Java 8 引入了 CompletableFuture
,目标是提供更灵活的异步编程模型,支持非阻塞操作和链式组合。
架构继承:兼容性设计
classDiagramclass Future {<<interface>>+get()+isDone()+cancel()}class CompletionStage {<<interface>>+thenApply()+thenCompose()+thenCombine()}class CompletableFuture {+supplyAsync()+thenApplyAsync()+complete()}Future <|.. CompletableFutureCompletionStage <|.. CompletableFuture
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {// 维护异步计算结果volatile Object result; // 具体结果或 AltResult// 实现 Future 接口的阻塞获取public T get() throws InterruptedException, ExecutionException {Object r;if ((r = result) == null)r = waitingGet(true);return (T) reportJoin(r);}// 实现 CompletionStage 的链式调用public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) {return uniApplyStage(screenExecutor(executor), fn);}
}
关键设计考量:
- 向后兼容:使现有基于
Future
的代码无需修改即可使用 CompletableFuture - 接口契约:保持异步计算结果的获取/取消等基础能力
- 类型系统:通过接口继承实现多态特性
核心设计思想
- 基于
Future
接口和CompletionStage
接口:Future
:保留基础的异步结果获取能力。CompletionStage
:定义异步操作的链式组合(如thenApply
,thenCombine
等)。
- 借鉴了函数式编程和 Promise 模式(类似 JavaScript 的
Promise
)。
核心升级点
- 从 被动拉取 到 主动推送 的结果处理
- 从 单任务 到 流水线 的异步操作
- 新增 异常传播机制(
exceptionally()
/handle()
)
示例代码
// 1. 异步任务创建
CompletableFuture.supplyAsync(() -> "Hello").thenApply(s -> s + " World") // 转换结果.thenAccept(System.out::println); // 消费结果// 2. 组合多个 Future
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> 20);cf1.thenCombine(cf2, (a, b) -> a + b).thenAccept(sum -> System.out.println("Sum: " + sum));// 3. 异常处理
CompletableFuture.supplyAsync(() -> {if (new Random().nextBoolean()) throw new RuntimeException("Oops");return "Success";}).exceptionally(ex -> "Fallback: " + ex.getMessage()).thenAccept(System.out::println);
三、核心特性
CompletableFuture
的核心优势在于其灵活的组合能力与异步流程控制:
mindmaproot((CompletableFuture))创建方法supplyAsyncrunAsynccompletedFuture转换方法thenApplythenCompose消费方法thenAcceptthenRun组合方法thenCombineallOfanyOf异常处理exceptionallyhandle
1. 手动完成任务
- 可以显式设置结果或异常:
CompletableFuture<String> future = new CompletableFuture<>(); future.complete("Result"); // 手动设置结果 future.completeExceptionally(new RuntimeException()); // 手动设置异常
2. 链式组合操作
- 转换结果:
thenApply()
future.thenApply(result -> result + " processed");
- 消费结果:
thenAccept()
,thenRun()
future.thenAccept(System.out::println);
- 组合多个 Future:
- 顺序组合:
thenCompose()
future.thenCompose(result -> anotherFuture);
- 并行组合:
thenCombine()
future1.thenCombine(future2, (a, b) -> a + b);
- 顺序组合:
- 等待多个任务:
allOf()
,anyOf()
CompletableFuture.allOf(future1, future2).join();
3. 异常处理
- 通过
exceptionally()
或handle()
捕获异常:future.exceptionally(ex -> "Fallback value");
4. 异步执行器(Executor)
- 默认使用
ForkJoinPool.commonPool()
,也支持自定义线程池:future.supplyAsync(() -> "Task", customExecutor);
四、典型应用场景
- 并行执行多个独立任务:
CompletableFuture<User> userFuture = fetchUserAsync(); CompletableFuture<Order> orderFuture = fetchOrderAsync(); userFuture.thenCombine(orderFuture, (user, order) -> buildResponse(user, order));
- 服务调用链(微服务场景):
CompletableFuture<Response> future = login().thenCompose(token -> fetchData(token)).thenApply(data -> process(data)).exceptionally(ex -> handleError(ex));
- 超时控制与降级:
future.completeOnTimeout("default", 1, TimeUnit.SECONDS);
五、最佳实践建议
- 线程池选择:默认使用 ForkJoinPool.commonPool(),密集任务建议使用自定义线程池
- 异常处理:始终使用 exceptionally() 或 handle() 处理异常
- 超时控制:Java 9+ 使用 orTimeout() 方法
- 资源释放:使用 whenComplete() 进行资源清理
- 组合优先:使用 thenCompose() 代替嵌套的 thenApply()
CompletableFuture 的演进体现了 Java 并发编程从命令式到声明式的转变,通过函数式编程实现了更优雅的异步处理流水线。