如何安全发布 CompletableFuture ?Java9新增方法分析

news/2025/1/30 10:53:44/文章来源:https://www.cnblogs.com/HuaTalkHub/p/18695088

如何安全发布 CompletableFuture ?Java9新增方法分析

本文未经允许禁止转载。

JDK9 中对于CompletableFuture做了新的增强,除了超时功能(orTimeout),还有面向继承、安全发布等相关方法。本文中,我们将详细分析各个新增方法,同时说明其安全发布的重要性,最后提出相关的实践原则。

1. newIncompleteFuture

public <U> CompletableFuture<U> newIncompleteFuture() {return new CompletableFuture<U>();
}

这是一个面向继承的方法,Java支持方法返回值协变,子类可以返回自己的实现。如果你不需要继承 CompletableFuture,对于使用来说,这个方法没有新增新的功能。

2. 默认执行器

public Executor defaultExecutor() {return ASYNC_POOL;
}

子类可以指定默认执行器,不过需要注意调用不显式指定执行器的async相关方法时,其执行回调所在的线程有两种可能:当前调用线程或者子类指定的执行器线程。如果使用 CompletableFuture,推荐显式指定执行器;如果使用 CFFU,无需每次都显式指定执行器。

3. 复制方法

public CompletableFuture<T> copy() {return uniCopyStage(this);
}

此方法等同于调用 cf.thenApply(x -> x), 目的是实现保护性复制,避免后续操作对于原实例的修改。

4. 只读形式与安全发布

// 返回结果仅支持 CompletionStage 操作
public CompletionStage<T> minimalCompletionStage() {return uniAsMinimalStage();
}private MinimalStage<T> uniAsMinimalStage() {Object r;// 性能优化1:若已有结果,当前线程下直接返回stageif ((r = result) != null)return new MinimalStage<T>(encodeRelay(r));// 性能优化2:若还未有结果,注册回调MinimalStage<T> d = new MinimalStage<T>();unipush(new UniRelay<T,T>(d, this));return d;
}// 实际实现,CompletableFuture 独有方法均不支持
static final class MinimalStage<T> extends CompletableFuture<T> {MinimalStage() { }MinimalStage(Object r) { super(r); }@Override public <U> CompletableFuture<U> newIncompleteFuture() {return new MinimalStage<U>(); }@Override public T get() {throw new UnsupportedOperationException(); }@Override public T get(long timeout, TimeUnit unit) {throw new UnsupportedOperationException(); }@Override public T getNow(T valueIfAbsent) {throw new UnsupportedOperationException(); }@Override public T join() {throw new UnsupportedOperationException(); }@Override public T resultNow() {throw new UnsupportedOperationException(); }@Override public Throwable exceptionNow() {throw new UnsupportedOperationException(); }@Override public boolean complete(T value) {throw new UnsupportedOperationException(); }@Override public boolean completeExceptionally(Throwable ex) {throw new UnsupportedOperationException(); }@Override public boolean cancel(boolean mayInterruptIfRunning) {throw new UnsupportedOperationException(); }@Override public void obtrudeValue(T value) {throw new UnsupportedOperationException(); }@Override public void obtrudeException(Throwable ex) {throw new UnsupportedOperationException(); }@Override public boolean isDone() {throw new UnsupportedOperationException(); }@Override public boolean isCancelled() {throw new UnsupportedOperationException(); }@Override public boolean isCompletedExceptionally() {throw new UnsupportedOperationException(); }@Override public State state() {throw new UnsupportedOperationException(); }@Override public int getNumberOfDependents() {throw new UnsupportedOperationException(); }@Override public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor) {throw new UnsupportedOperationException(); }@Override public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) {throw new UnsupportedOperationException(); }@Override public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) {throw new UnsupportedOperationException(); }@Override public CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit) {throw new UnsupportedOperationException(); }// 返回新的CompletableFuture, 等效于thenApply(x -> x)@Override public CompletableFuture<T> toCompletableFuture() {Object r;if ((r = result) != null)return new CompletableFuture<T>(encodeRelay(r));else {CompletableFuture<T> d = new CompletableFuture<>();unipush(new UniRelay<T,T>(d, this));return d;}}}

此方法返回只读的 CompletionStage,可以实现安全发布。可以类比于一个方法返回了ImmutableList,优点是可以防止后续的错误操作,比如其他线程强制写结果(类比List#add)。

有这样一个安全发布的保证至关重要,特别是面临复杂场景时,可以减轻编程负担,放心地使用 CompletableFuture 返回的结果。

我们知道CompletionStage接口定义了链式异步回调相关方法, CompletableFuture 相关读写操作(join, complete, obtrudeValue 等),不在CompletionStage的定义下,可以通过方法toCompletableFuture进行中转。

class InventoryServiceDemo {final Executor executor = Executors.newCachedThreadPool();// 获取库存信息// 方法可以返回 CompletionStage,相比于CompletableFuture更安全CompletionStage<Integer> getInventoryAsync(String productId) {return CompletableFuture.supplyAsync(() -> {try {Thread.sleep(Duration.ofSeconds(2));} catch (InterruptedException e) {throw new RuntimeException(e);}return 42;}, executor).minimalCompletionStage();}
}

笔者曾在《深入理解 Future, CompletableFuture, ListenableFuture,回调机制》一文中质疑 CompletionStage#toCompletableFuture 方法破坏了接口设计的相关原则,CompletableFuture#minimalCompletionStage 不如直接使用接口。

  1. CompletionStage#toCompletableFuture 表明 CompletableFuture 可以作为 CompletionStage 的默认实现,实现两者类型的快速转换。很多情况下,两者的区别不大。
  2. CompletableFuture#minimalCompletionStage 底层思想是提供只读功能。
  3. 这里的实现有点取巧,minimalStage虽然继承自CompleatableFuture,而实际上只是实现了CompletionStage 接口方法。minimalStage 不应该是 CompletableFuture的示例,更好的实现方式是使用委托方式(组合)。
  4. 如果子类进行类型检查,使用 instanceOf 或者新版JDK支持的模式匹配,minimalStage 需要进行额外的处理。
  5. CFFU 的实现对于minimalStage做了兼容处理,你可以放心使用。

5. 异步写操作

public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) {return completeAsync(supplier, defaultExecutor());
}public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier,Executor executor) {if (supplier == null || executor == null)throw new NullPointerException();executor.execute(new AsyncSupply<T>(this, supplier));return this;
}

这两个方法很容易理解,适用于子类返回自己的实现。CompletableFuture#supplyAsync 工厂方法返回的实例是 ComplteableFuture 无法更改。子类可以自己定义自己的工厂方法;用户也可以使用这里的异步写方法。

6. CFFU的优化拓展

开源项目CFFU(功夫未来)对于CompletableFuture 进行了拓展,以下仅列举和Java9新增功能相关的拓展点:

  1. 可以在对象创建时指定执行器,后续操作无需重复指定
  2. Java8 版本下可以使用后续版本的功能
  3. 提供了相关安全发布功能,比如安全的超时功能
  4. 内部的很多便利方法应用了保护性复制方法
  5. CffuFactoryBuilder 支持以参数形式配置是否允许强制重写结果

7. 实践原则

  1. 异步方法可以返回 CompletionStage,其相比于CompletableFuture更为安全,同时可以提示用户不要使用阻塞相关方法
  2. 就像推荐使用不可变对象一样,推荐默认使用 CompletableFuture#minimalCompletionStage 方法
  3. 无论是对于方法入参还是返回结果,使用接口一般都优于实体类
  4. 理想的 Future 设计应该是读写分离的,不可变的。虽然 CompletableFuture 并没有完全遵循以上原则,但是我们在使用时应当注意遵循。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/877019.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大模型

目录大模型的演变大模型的使用与训练大模型的特点与分类大模型的工作流程大模型的应用 大模型的演变机器学习:深度学习:大模型的使用与训练 大模型的特点与分类 大模型的工作流程 大模型的应用本文来自博客园,作者:chuangzhou,转载请注明原文链接:https://www.cnblogs.co…

“星门计划对AI未来的意义——以及谁将掌控它”

“星门计划对AI未来的意义——以及谁将掌控它”图片由DALL-E 3生成就在几天前,唐纳德特朗普宣布了“星门计划”,OpenAI随即跟进,分享了更多细节。他们明确表示,计划在未来四年内投资5000亿美元,在美国为OpenAI构建一个全新的AI基础设施。这让我颇感意外,尤其是考虑到埃隆…

A Critique of ANSI SQL Isolation Levels.18695069

原文:A critique of ANSI SQL isolation levels摘要:ANSI SQL-92[MS, ANSI]使用脏读、不可重复读以及幻读现象(phenomena)定义了隔离级别,本论文展示了这些现象,以及ANSI SQL定义并无法合适的描述众多流行的隔离级别,包括(ANSI标准)所涵盖的级别的标准锁实现。我们还介…

HTML, CSS

什么是 HTML、CSS HTML (HyperText Markup Language): 超文本标记语言. 超文本: 超越了普通文本的限制, 比普通文本更加强大. 除了文字信息, 还可以定义图片、音频、视频等内容. 标记语言: 由标签构成的语言. HTML 标签都是预定义好的。例如: 使用 <a> 展示超链接,使用 &…

Cisco NX-OS System Software - ACI 16.1(2f) - 适用于 ACI 模式下的 Nexus 9000 系列交换机系统软件

Cisco NX-OS System Software - ACI 16.1(2f) - 适用于 ACI 模式下的 Nexus 9000 系列交换机系统软件Cisco NX-OS System Software - ACI 16.1(2f) 适用于 ACI 模式下的 Cisco Nexus 9000 系列交换机系统软件 请访问原文链接:https://sysin.org/blog/cisco-aci-16/ 查看最新版…

Cisco APIC 6.1(2f)F - 应用策略基础设施控制器

Cisco APIC 6.1(2f)F - 应用策略基础设施控制器Cisco APIC 6.1(2f)F - 应用策略基础设施控制器 Application Policy Infrastructure Controller (APIC) 请访问原文链接:https://sysin.org/blog/cisco-apic-6/ 查看最新版。原创作品,转载请保留出处。 作者主页:sysin.org思科…

[搬运自 qq 空间] 19 北大冬令营小结

PKU 冬令营19北大冬令营小结 北大冬令营刚刚结束 , 以下是这两天以来笔者的经历。 Day1 比赛日 上午开营仪式 , 整个过程大概就是讲了一下北大计算机学科有哪些优势 , 比较无趣。 12 : 40的时候来到机房准备考试 , 1 : 00钟时 , 比赛正式开始。 首先浏览了一下A题 , 是吉…

X3ctf 比赛 Write Up

X3ctf Write Up 1. Misc p11n-trophy(签到题): 题目描述:我们首先会得到这样一份证书:第一题签到题的答案就是证书下面正中间的“This certificate does not grant the rank of Master"。 trophy-plus + trophy-plus64: 这两道目描述一模一样其中一个flag是藏在certif…

python--用户意见

https://www.python.org/about/quotes/

虚拟记账系统之三种结算模式

虚拟记账系统作为近年来支付领域的创新产品,正成为企业资金管理和支付结算的重要工具。本文从支付断直连的背景出发,详细介绍了虚拟记账系统的三种结算模式:收单结算、归集直清和归集调拨,并深入探讨了这些模式在直播电商、企业资金管理等场景中的应用。从这篇文章开始,我…

RocketMQ实战—2.RocketMQ集群生产部署

大纲 1.什么是消息中间件 2.消息中间件的技术选型 3.RocketMQ的架构原理和使用方式 4.消息中间件路由中心的架构原理 5.Broker的主从架构原理 6.高可用的消息中间件生产部署架构 7.部署一个小规模的RocketMQ集群进行压测 8.如何对RocketMQ集群进行可视化的监控和管理 9.进行OS内…