CompletableFuture在RocketMQ中的使用实战!

今天想跟大家来聊一聊JDK1.8提供的异步神器CompletableFuture,

最后呢我会结合RocketMQ源码分析一下CompletableFuture的使用。

Future接口以及它的局限性

图片

 FutureTask<String> futureTask = new FutureTask<>(() -> "三友");new Thread(futureTask).start();System.out.println(futureTask.get());

或者使用线程池的方式

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> "三友");
System.out.println(future.get());
executorService.shutdown();

线程池底层也是将提交的Callable的实现先封装成FutureTask,然后通过execute方法来提交任务,执行异步逻辑。

Future接口的局限性

图片

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> "三友");
while (!future.isDone()) {//任务有没有完成,没有就继续循环判断
}
System.out.println(future.get());
executorService.shutdown();

图片

什么是CompletableFuture?

图片

CompletableFuture常见api详解

CompletableFuture的方法api多,但主要可以分为以下几类。

1、实例化CompletableFuture

构造方法创建
CompletableFuture<String> completableFuture = new CompletableFuture<>();
System.out.println(completableFuture.get());

此时如果有其它线程执行如下代码,就能执行打印出 三友

completableFuture.complete("三友")
静态方法创建

除了使用构造方法构造,CompletableFuture还提供了静态方法来创建

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

图片

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "三友");
System.out.println(completableFuture.get());

图片

2、获取任务执行结果

public T get();
public T get(long timeout, TimeUnit unit);
public T getNow(T valueIfAbsent);
public T join();

图片

3、主动触发任务完成

public boolean complete(T value);
public boolean completeExceptionally(Throwable ex);

图片

4、对任务执行结果进行下一步处理

只能接收任务正常执行后的回调
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public CompletableFuture<Void> thenRun(Runnable action);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);

图片

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> 10).thenApply(v -> ("上一步的执行的结果为:" + v));
System.out.println(completableFuture.join());

执行结果:

上一步的执行的结果为:10

thenRun示例:

CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> 10).thenRun(() -> System.out.println("上一步执行完成"));

执行结果:

上一步执行完成

thenAccept示例:

CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> 10).thenAccept(v -> System.out.println("上一步执行完成,结果为:" + v));

执行结果:

上一步执行完成,结果为:10

thenApply有异常示例:

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {//模拟异常int i = 1 / 0;return 10;
}).thenApply(v -> ("上一步的执行的结果为:" + v));
System.out.println(completableFuture.join());

执行结果:

Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zeroat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)

当有异常时是不会回调的

只能接收任务处理异常后的回调
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

图片

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {return 100;
}).exceptionally(e -> {System.out.println("出现异常了,返回默认值");return 110;
});
System.out.println(completableFuture.join());

执行结果:

100

有异常情况下:

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {int i = 1 / 0;return 100;
}).exceptionally(e -> {System.out.println("出现异常了,返回默认值");return 110;
});
System.out.println(completableFuture.join());

执行结果:

出现异常了,返回默认值
110
能同时接收任务执行正常和异常的回调
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> actin);

图片

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {int i = 1 / 0;return 10;
}).whenComplete((r, e) -> {System.out.println("whenComplete被调用了");
});
System.out.println(completableFuture.join());

执行结果:

whenComplete被调用了
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zeroat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)

5、对任务结果进行合并

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);

这个方法的意思是,当前任务和other任务都执行结束后,拿到这两个任务的执行结果,回调 BiFunction ,然后返回新的结果。

thenCombine的例子请往下继续看。

6、以Async结尾的方法

上面说的一些方法,比如说thenAccept方法,他有两个对应的Async结尾的方法,如下:

public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);

图片

CompletableFuture在RocketMQ中的使用

CompletableFuture在RocketMQ中的使用场景比较多,这里我举一个消息存储的场景。

在RocketMQ中,Broker接收到生产者产生的消息的时候,会将消息持久化到磁盘和同步到从节点中。

持久化到磁盘和消息同步到从节点是两个独立的任务,互不干扰,可以相互独立执行。

当消息持久化到磁盘和同步到从节点中任务完成之后,需要统计整个存储消息消耗的时间,所以统计整个存储消息消耗的时间是依赖前面两个任务的完成。

图片

实现代码如下

消息存储刷盘任务和主从复制任务:

PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// 提交刷盘的请求
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
//提交主从复制的请求
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);//刷盘 和 主从复制 两个异步任务通过thenCombine联合
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {// 当两个刷盘和主从复制任务都完成的时候,就会回调// 如果刷盘没有成功,那么就将消息存储的状态设置为失败if (flushStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(flushStatus);}// 如果主从复制没有成功,那么就将消息存储的状态设置为失败if (replicaStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(replicaStatus);}// 最终返回消息存储的结果return putMessageResult;
});

对上面两个合并的任务执行结果通过thenAccept方法进行监听,统计消息存储的耗时:

//消息存储的开始时间
long beginTime = this.getSystemClock().now();
// 存储消息,然后返回 CompletableFuture,也就是上面一段代码得返回值‍
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);//监听消息存储的结果
putResultFuture.thenAccept((result) -> {// 消息存储完成之后会回调long elapsedTime = this.getSystemClock().now() - beginTime;if (elapsedTime > 500) {log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);}this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);if (null == result || !result.isOk()) {this.storeStatsService.getPutMessageFailedTimes().add(1);}
});

最后说一句(求关注!别白嫖!)

如果这篇文章对您有所帮助,或者有所启发的话,求一键三连:点赞、转发、在看。

关注公众号:woniuxgg,在公众号中回复:笔记  就可以获得蜗牛为你精心准备的java实战语雀笔记,回复面试、开发手册、有超赞的粉丝福利!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/475089.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

半导体光伏光电行业为什么选用PFA酸缸浸泡清洗硅片

PFA清洗槽是即四氟清洗桶后的升级款&#xff0c;专为半导体光伏光电等行业设计&#xff0c;一体成型&#xff0c;无需担心漏液。主要用于浸泡、清洗带芯片硅片电池片的花篮。由于PFA的特点它能耐受清洗溶液的腐蚀性&#xff0c;同时金属元素空白值低&#xff0c;无溶出无析出&a…

python自学...

一、稍微高级一点的。。。 1. 闭包&#xff08;跟js差不多&#xff09; 2. 装饰器 就是spring的aop 3. 多线程

搭建智能调度系统:同城代驾小程序的开发教学

当下&#xff0c;同城代驾服务越来越受到人们的青睐。为了满足市场需求&#xff0c;许多企业开始开发智能调度系统&#xff0c;以提高服务效率和用户体验。本文将介绍如何搭建一个智能调度系统&#xff0c;并以同城代驾小程序的开发为例进行详细教学。 第一步&#xff1a;需求…

【大厂AI课学习笔记】【2.2机器学习开发任务实例】(1)搭建一个机器学习模型

今天学习的是&#xff0c;如何搭建一个机器学习模型。 主要有以上的步骤&#xff1a; 原始数据采集特征工程 数据预处理特征提取特征转换&#xff08;构造&#xff09;预测识别&#xff08;模型训练和测试&#xff09; 在实际工作中&#xff0c;特征比模型更重要。 数据和特征…

基于51单片机的智能台灯的设计与实现

摘 要:针对青少年因坐姿不正确、灯光亮度不合适、用眼过度等原因易导致的近视问题,文中提出使用51单片机作为主控制单元,选用红外检测、光敏检测、蓝牙通信、蜂鸣器和模数转换等模块,设计了一款智能台灯。该智能台灯具有节能、预防近视等功能。经测试,该台灯具有保护视力的…

MySQL DQL 基本查询

一.概念 数据查询不应只是简单返回数据库中存储的数据&#xff0c;还应该根据需要对数据进行筛选以及确定数据以什么样的格式显示。 二.语法格式 select 列名 from 表 where 条件 1.查询所有的商品 select * from product; 2.查询商品名和商品价格 select pname,price from…

介绍7款免费的最佳地图/导航/定位/GIS开源项目

文章目录 1、xdh-map新德汇地图应用类库1.1、独立引用1.2、与MyUI结合使用1.3、快速上手1.3.1、采用项目工程模板创建项目【推荐】1.3.2、 调用组件库功能 2、蚂蚁金服AntV-L7地理空间数据可视分析引擎2.1、AntV-L7简介2.2、核心特性2.3、支持丰富的图表类型2.4、如何使用2.4.1…

林浩然与杨凌芸的Java集合奇遇记

林浩然与杨凌芸的Java集合奇遇记 The Java Collection Chronicles of Lin Haoran and Yang Lingyun 在一个充满代码香气的午后&#xff0c;程序员男主角林浩然正在他的编程世界里挥舞着键盘剑&#xff0c;探索Java王国中的神秘宝藏——集合。而我们的女主角杨凌芸&#xff0c;作…

深入浅出了解谷歌「Gemini大模型」发展历程

Google在2023年12月官宣了Gemini模型&#xff0c;随后2024年2月9日才宣布Gemini 1.0 Ultra正式对公众服务&#xff0c;并且开始收费。现在2024年2月14日就宣布了Gemini 1.5 Pro&#xff0c;史诗级多模态最强MoE首破100万极限上下文纪录&#xff01;&#xff01;&#xff01;Gem…

基于SpringBoot+WebSocket+Spring Task的前后端分离外卖项目-订单管理(十七)

订单管理 1. Spring Task1.1 介绍1.2 cron表达式1.3 入门案例1.3.1 Spring Task使用步骤1.3.2 代码开发1.3.3 功能测试 2.订单状态定时处理2.1 需求分析2.2 代码开发2.3 功能测试 3. WebSocket3.1 介绍3.2 入门案例3.2.1 案例分析3.2.2 代码开发3.2.3 功能测试 4. 来单提醒4.1 …

【Vision Pro 应用分享】Make It Spatial——将普通照片转化为Spatial空间照片,以在Vision Pro视界眼镜上观看3D效果

该应用目前在Mac App Store上免费提供 下载地址:‎Make It Spatial on the Mac App Store Read reviews, compare customer ratings, see screenshots, and learn more about Make It Spatial. Download Make It Spatial for macOS 14.0 or later and enjoy it on your Mac.h…

SG5032EAN规格书

SG5032EAN 晶体振荡器结合了相位锁定环&#xff08;PLL&#xff09;技术和AT切割晶体单元&#xff0c;提供了73.5 MHz至700 MHz的广泛频率范围&#xff0c;以满足高速数字应用的需求。高性能的LV-PECL输出&#xff0c;2.5V和3.3V电源电压&#xff0c;可灵活适配不同设计的电源需…