今天想跟大家来聊一聊JDK1.8提供的异步神器CompletableFuture,
最后呢我会结合RocketMQ源码分析一下CompletableFuture的使用。
Future接口以及它的局限性
FutureTask<String> futureTask = new FutureTask<>(() -> "三友");new Thread(futureTask).start();System.out.println(futureTask.get());
或者使用线程池的方式
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> "三友");
System.out.println(future.get());
executorService.shutdown();
线程池底层也是将提交的Callable的实现先封装成FutureTask,然后通过execute方法来提交任务,执行异步逻辑。
Future接口的局限性
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> "三友");
while (!future.isDone()) {//任务有没有完成,没有就继续循环判断
}
System.out.println(future.get());
executorService.shutdown();
什么是CompletableFuture?
CompletableFuture常见api详解
CompletableFuture的方法api多,但主要可以分为以下几类。
1、实例化CompletableFuture
构造方法创建
CompletableFuture<String> completableFuture = new CompletableFuture<>();
System.out.println(completableFuture.get());
此时如果有其它线程执行如下代码,就能执行打印出 三友
completableFuture.complete("三友")
静态方法创建
除了使用构造方法构造,CompletableFuture还提供了静态方法来创建
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "三友");
System.out.println(completableFuture.get());
2、获取任务执行结果
public T get();
public T get(long timeout, TimeUnit unit);
public T getNow(T valueIfAbsent);
public T join();
3、主动触发任务完成
public boolean complete(T value);
public boolean completeExceptionally(Throwable ex);
4、对任务执行结果进行下一步处理
只能接收任务正常执行后的回调
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public CompletableFuture<Void> thenRun(Runnable action);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> 10).thenApply(v -> ("上一步的执行的结果为:" + v));
System.out.println(completableFuture.join());
执行结果:
上一步的执行的结果为:10
thenRun示例:
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> 10).thenRun(() -> System.out.println("上一步执行完成"));
执行结果:
上一步执行完成
thenAccept示例:
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> 10).thenAccept(v -> System.out.println("上一步执行完成,结果为:" + v));
执行结果:
上一步执行完成,结果为:10
thenApply有异常示例:
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {//模拟异常int i = 1 / 0;return 10;
}).thenApply(v -> ("上一步的执行的结果为:" + v));
System.out.println(completableFuture.join());
执行结果:
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zeroat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
当有异常时是不会回调的
只能接收任务处理异常后的回调
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {return 100;
}).exceptionally(e -> {System.out.println("出现异常了,返回默认值");return 110;
});
System.out.println(completableFuture.join());
执行结果:
100
有异常情况下:
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {int i = 1 / 0;return 100;
}).exceptionally(e -> {System.out.println("出现异常了,返回默认值");return 110;
});
System.out.println(completableFuture.join());
执行结果:
出现异常了,返回默认值
110
能同时接收任务执行正常和异常的回调
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> actin);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {int i = 1 / 0;return 10;
}).whenComplete((r, e) -> {System.out.println("whenComplete被调用了");
});
System.out.println(completableFuture.join());
执行结果:
whenComplete被调用了
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zeroat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
5、对任务结果进行合并
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
这个方法的意思是,当前任务和other任务都执行结束后,拿到这两个任务的执行结果,回调 BiFunction ,然后返回新的结果。
thenCombine的例子请往下继续看。
6、以Async结尾的方法
上面说的一些方法,比如说thenAccept方法,他有两个对应的Async结尾的方法,如下:
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
CompletableFuture在RocketMQ中的使用
CompletableFuture在RocketMQ中的使用场景比较多,这里我举一个消息存储的场景。
在RocketMQ中,Broker接收到生产者产生的消息的时候,会将消息持久化到磁盘和同步到从节点中。
持久化到磁盘和消息同步到从节点是两个独立的任务,互不干扰,可以相互独立执行。
当消息持久化到磁盘和同步到从节点中任务完成之后,需要统计整个存储消息消耗的时间,所以统计整个存储消息消耗的时间是依赖前面两个任务的完成。
实现代码如下
消息存储刷盘任务和主从复制任务:
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// 提交刷盘的请求
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
//提交主从复制的请求
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);//刷盘 和 主从复制 两个异步任务通过thenCombine联合
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {// 当两个刷盘和主从复制任务都完成的时候,就会回调// 如果刷盘没有成功,那么就将消息存储的状态设置为失败if (flushStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(flushStatus);}// 如果主从复制没有成功,那么就将消息存储的状态设置为失败if (replicaStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(replicaStatus);}// 最终返回消息存储的结果return putMessageResult;
});
对上面两个合并的任务执行结果通过thenAccept方法进行监听,统计消息存储的耗时:
//消息存储的开始时间
long beginTime = this.getSystemClock().now();
// 存储消息,然后返回 CompletableFuture,也就是上面一段代码得返回值
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);//监听消息存储的结果
putResultFuture.thenAccept((result) -> {// 消息存储完成之后会回调long elapsedTime = this.getSystemClock().now() - beginTime;if (elapsedTime > 500) {log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);}this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);if (null == result || !result.isOk()) {this.storeStatsService.getPutMessageFailedTimes().add(1);}
});
最后说一句(求关注!别白嫖!)
如果这篇文章对您有所帮助,或者有所启发的话,求一键三连:点赞、转发、在看。
关注公众号:woniuxgg,在公众号中回复:笔记 就可以获得蜗牛为你精心准备的java实战语雀笔记,回复面试、开发手册、有超赞的粉丝福利!