再见了Future,图解JDK21虚拟线程的结构化并发

Java为我们提供了许多启动线程和管理线程的方法。在本文中,我们将介绍一些在Java中进行并发编程的选项。我们将介绍结构化并发的概念,然后讨论Java 21中一组预览类——它使将任务拆分为子任务、收集结果并对其进行操作变得非常容易,而且不会不小心留下任何挂起的任务。

1 基础方法

通过Lambda表达式启动平台线程的这种创建线程的方法最简单,适用于简单情况。

// Lambda表达式启动平台线程的一种方法。
Thread.ofPlatform().start(() -> {// 在这里执行在独立线程上运行的操作});

问题

  • 创建平台线程是昂贵的
  • 若应用程序用户量很大,平台线程数量可能增长到超出JVM支持的限制

显然,大多数应用程序服务器不鼓励这种行为。因此,继续下一种方法——Java Futures。

2 Java Future类

JDK 5引入,开发者需要改变思考方式。不再考虑启动新线程,而考虑将“任务”提交到线程池以供执行。JDK 5还引入ExecutorService,任务将提交到该服务。ExecutorService是一个定义了提交任务并返回Java Future的机制的接口。提交的任务需实现Runnable或Callable接口。

任务提交给表示单线程线程池

// 将Callable任务提交给表示单线程线程池的ExecutorServiceExecutorService service = Executors.newSingleThreadExecutor();
Future<String> future = service.submit(() -> {// 进行一些工作并返回数据return "Done";
});
// 在这里执行其他任务// 阻塞直到提交的任务完成
String output = future.get();// 打印 "Done"
System.out.println(output);// 继续执行后续任务

多个任务提交到ExecutorService

try (ExecutorService service = Executors.newFixedThreadPool(3)) {Future<TaskResult> future1 = service.submit(() -> { // 执行任务1并返回TaskResult });Future<TaskResult> future2 = service.submit(() -> { // 执行任务2并返回TaskResult });  Future<TaskResult> future3 = service.submit(() -> { // 执行任务3并返回TaskResult });/* 所有异常上抛 */// get()将阻塞直到任务1完成TaskResult result1 = future1.get();// get()将阻塞直到任务2完成TaskResult result2 = future2.get();// get()将阻塞直到任务3完成TaskResult result3 = future3.get();// 处理result1、result2、result3handleResults(result1, result2, result3);
}

所有这些任务将并行运行,然后父线程可用future.get()方法检索每个任务的结果。

3 上述实现的问题

如在上面代码中用Platform线程,则存在一个问题。获取TaskResult的get()方法将阻塞线程,由于与阻塞Platform线程相关的可扩展性问题,这代价可能很昂贵。然而,使用Java 21——如用Virtual Threads,则在get()期间,底层的平台线程不会被阻塞。

若task2、task3在task1前完成,须等到task1完成,然后处理task2和task3结果。

若task2或task3执行过程失败,则问题更糟。假设整个用例应在任何任务失败时就失败,代码将等到task1完成,然后抛异常。这不是我们的期望,它将为最终用户创建一个非常迟钝的体验。

3.1 基本问题

ExecutorService类对提交给它的各种任务之间关系一无所知。因此,它不知道若一个任务失败,该发生点啥。即示例中提交的三个任务被视为独立任务,而非用例的一部分。这并不是ExecutorService类的失败,因为它没有设计为处理提交的任务之间的任何关系。

3.2 另一个问题

ExecutorService周围使用try-with-resources块,确保在try块退出时调用ExecutorService的close方法。close方法确保所有提交给执行器服务的任务在继续执行之前终止。

若用例要求在任何任务失败时立即失败,那我们运气不好。close方法将等待所有提交的任务完成。

但若不用try-with-resources块,则不能保证在块退出前三个任务都结束。将保留未清理终止的“未明确终止的线程”。任何其他自定义实现都须确保在失败时立即取消其他任务。

因此,尽管用Java Future是处理可拆分为子任务的任务的一种不错方法,但还不够完美。开发须将用例的“感知”编码到逻辑中,但这很难!

注意,对Platform线程存在于Java Futures的问题之一即阻塞问题——Java 21使用Virtual线程时,这问题不再存在。因为使用Virtual Threads时,使用future.get()方法阻塞线程将释放底层的Platform线程。

使用CompletableFuture Pipelines也可解决阻塞问题,但这里不深入探讨。有更简单的方法来解决Java 21阻塞问题,没错就是Virtual Threads!但我们需要找到一种更好解决方案,以处理可拆分为多个子任务且“知道”用例的任务。这就引出结构化并发的基本思想。

4 结构化并发

想象,从方法内部向ExecutorService提交的任务,然后方法退出。现在更难推断代码,因为不知道此提交的任务可能的副作用,且这可能导致难以调试的问题。该问题的图解:

结构化并发基本思想是从一个块(方法或块)内启动的所有任务应在该块结束前终止。即:

  • 代码的结构边界(块)

  • 和该块内提交的任务的运行时边界

重合。这使应用程序代码更容易理解,因为一个块内提交的所有任务的执行效果都被限制在该块内。块外查看代码时,不必担心任务是否仍在运行。

ExecutorService的try-with-resources块是对结构化并发的一次良好尝试,其中从块内提交的所有任务在块退出时完成。但它还不够,因为它可能导致父线程等待时间超过必要时间。其改进版——StructuredTaskScope

5 StructuredTaskScope

Java 21 Virtual Thread作为一项功能被引入,它在大多情况下实际上消除了阻塞问题。但即使使用Virtual线程和Futures,仍存在“不干净终止任务”和“等待时间比必要时间长”的问题。

StructuredTaskScope类在Java 21中作为预览功能提供,旨在解决这问题。它试图提供比Executor Service的try-with-resources块更干净的结构化并发模型。StructuredTaskScope类知道提交的任务之间的关系,因此它可对它们进行更智能假设。

使用StructuredTaskScope的示例

在任一任务失败时,立即返回用例。

StructuredTaskScope.ShutdownOnFailure()返回一个StructuredTaskScope的引用,该引用知道若一个任务失败,那其他任务也须终止,因为它“知道”提交的任务之间的关系。

 try(var scope = new StructuredTaskScope.ShutdownOnFailure()) {          // 想象一下LongRunningTask实现Suppliervar dataTask = new LongRunningTask("dataTask", ...);  var restTask = new LongRunningTask("restTask", ...); // 并行运行任务Subtask<TaskResponse> dataSubTask = scope.fork(dataTask);           Subtask<TaskResponse> restSubTask = scope.fork(restTask);           // 等待所有任务成功完成或第一个子任务失败。 // 如果一个失败,向所有其他子任务发送取消请求// 在范围上调用join方法,等待两个任务都完成或如果一个任务失败scope.join();                                                       scope.throwIfFailed();                                              // 处理成功的子任务结果                                System.out.println(dataSubTask.get());                              System.out.println(restSubTask.get());                              }                                                                       

企业用例

其中两个任务可并行运行:

  • 一个DB任务
  • 一个Rest API任务

目标是并行运行这些任务,然后将结果合并到单个对象中并返回。

调用ShutdownOnFailure()静态方法创建一个StructuredTaskScope类。然后使用StructuredTaskScope对象fork方法(将fork方法考虑为submit方法)并行运行两个任务。幕后,StructuredTaskScope类默认使用Virtual线程来运行任务。每次fork一个任务,都创建一个新Virtual线程(Virtual线程永不会被池化)并运行任务。

然后在范围上调用join方法,等待两个任务都完成或如果一个任务失败。更重要的——若一个任务失败,join()方法将自动向其他任务(剩余运行任务)发送取消请求并等待其终止。这很重要,因为取消请求将确保在块退出时没有不必要的悬挂任务。

若其他线程向父线程发取消请求,也是如此。在最后,若块内部任何位置抛异常——StructuredTaskScope的close方法将确保向子任务发送取消请求并终止任务。StructuredTaskScope美妙在于——若子线程创建自己的StructuredTaskScope(子任务本身有自己的子任务),取消时它们都会得到干净处理。

开发在这里的一个职责是确保它们编写的任务须正确处理在取消期间设置在线程上的中断标志。任务有责任读取此中断标志并干净终止自己。若任务未正确处理中断标志,那用例的响应性将受影响。

6 使用StructuredTaskScope

当一个用例需要将任务分解为子任务,可能还需将子任务进一步分解为更多子任务时,使用StructuredTaskScope是合适的。本文看到的示例是用例需在任一子任务失败时立即返回。但StructuredTaskScope远不止如此。

  • 在第一个任务成功时返回
  • 在所有任务完成时返回(成功或失败)
  • 制作自己的StructuredTaskScope版本

6.1 StructuredTaskScope优点

  • 代码易阅读,因为无论哪种用例,代码看着都一样
  • 子线程失败时会在适当时被干净终止。没有不必要的悬挂线程
  • 使用StructuredTaskScope与Virtual Threads一起,意味与阻塞相关可扩展性问题不存在。这也难怪,默认情况下,StructuredTaskScope在底层使用Virtual Threads

7 总结

总的来说,StructuredTaskScope类是Java中处理将任务拆分为多个子任务的用例的良好补充。子线程在失败时自动取消,不同用例的代码一致性以及更好地理解代码的能力,使其成为在Java中实现Structured Concurrency的理想选择。

Virtual Threads和StructuredTaskScope类共同组成了一个完美的组合。Virtual Threads使我们能够在JVM中创建数十万个线程,而StructuredTaskScope类使我们能够有效地管理这些线程。

让我们等待它退出预览并成为一个正式特性!

本文由博客一文多发平台 OpenWrite 发布!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/263420.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c语言希尔排序总结(详解)

希尔排序&#xff1a; 1&#xff1a;分组插入排序两两分组降低元素个数提高插入的效率&#xff0c;先分组对每一组分别进行插入排序 希尔排序是插入排序的一种改进算法&#xff0c;也称为缩小增量排序。其基本原理是通过将待排序的序列分成若干个子序列&#xff0c;对每个子序…

udp多播组播

import socket ,struct,time# 组播地址和端口号 MCAST_GRP 239.0.0.1 MCAST_PORT 8888 # 创建UDP socket对象 sock socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) # 绑定socket对象到本地端口号 # sock.bind((MCAST_GRP, MCAST_PORT)) …

sentinel整合nacos配置中心持久化

在网上找了很多的资料&#xff0c;发现sentinel整合nacos持久化的博文和视频大多数都只有改造限流部分的教程&#xff0c;并且都需要修改前端&#xff0c;略显麻烦&#xff0c;至于剩下的熔断、热点流控、授权的更是没有相关的改造教程&#xff0c;最后在知乎的看到一篇文章后让…

【目标检测从零开始】torch实现yolov3数据加载

文章目录 数据简介Dataset读取Step1&#xff1a;类别定义Step2&#xff1a;解析xmlStep3&#xff1a;实现DatasetStep4&#xff1a;数据增强Step5&#xff1a;添加dataset_collateStep6&#xff1a;测试 小结 数据简介 林业病虫害防治项目用到的AI识虫数据集&#xff0c;该数据…

Base64编码解码

一、Base64编码技术简介 Base64编码是一种广泛应用于网络传输和数据存储的编码方式。它将原始数据转换为可打印的字符形式&#xff0c;以便于传输和存储。Base64编码后的数据长度是原始数据长度的约3/4&#xff0c;具有一定的压缩效果。 Base64编码解码 -- 一个覆盖广泛主题工…

【蜗牛到家】获南明电子信息产业引导基金战略投资

智慧社区生活服务平台「蜗牛到家」已于近期获得贵阳南明电子信息产业引导基金、华科明德战略投资。 贵阳南明电子信息产业引导基金属于政府旗下产业引导基金&#xff0c;贵州华科明德基金管理有限公司擅长电子信息产业、高科技产业、城市建设及民生保障领域的投资&#xff0c;双…

主窗体、QFile、编码转换、事件、禁止输入特殊字符

主窗体 部件构成 菜单栏、工具栏、主窗体、状态栏。 UI 编辑器设计主窗体 &#x1f4a1; 简易记事本的实现&#xff08;part 1&#xff09; 菜单栏 工具栏&#xff08;图标&#xff09; 主窗体 完善菜单栏&#xff1a; mainwindow.cpp #include "mainwindow.h"…

《PySpark大数据分析实战》-01.关于数据

&#x1f4cb; 博主简介 &#x1f496; 作者简介&#xff1a;大家好&#xff0c;我是wux_labs。&#x1f61c; 热衷于各种主流技术&#xff0c;热爱数据科学、机器学习、云计算、人工智能。 通过了TiDB数据库专员&#xff08;PCTA&#xff09;、TiDB数据库专家&#xff08;PCTP…

PHP 二维码内容解析、二维码识别

目录 1.首先是一些错误的示例 2.正确示例 3.二维码解析 4.完整示例&#xff0c;含生成 5.代码执行结果 6.参考文档 1.首先是一些错误的示例 本示例使用的是php7.3 通过搜索各种结果逐个尝试以后&#xff0c;得出一个可使用版本 解析错误经历&#xff1a;vendor核心报错 …

[C++]:10.vector使用

vector使用 一.vector使用1.构造函数&#xff1a;2.迭代器遍历数据&#xff1a;3.空间问题&#xff1a;1.size():返回有效数据个数&#xff1a;2.capacity()&#xff1a;返回容量大小&#xff1a;3.容量检测&#xff1a;4.emptr()&#xff1a;判断顺序表是否为空&#xff1a;5.…

Linux6-配置网络、源码包的编译和安装

配置 linux 网络 配置主机名 修改/etc/hostname 配置文件&#xff0c;永久配置主机名 [rootlocalhost ~]# vim /etc/hostname svr7.tedu.cn [rootlocalhost ~]# cat /etc/hostname svr7.tedu.cn [rootlocalhost ~]# reboot #重启生效命令行永久修改主机名 [rootlocalhost ~…

Vue3使用Tailwind CSS

安装 Tailwind 以及其它依赖项 npm install -D tailwindcsslatest postcsslatest autoprefixerlatest生成配置文件&#xff1a; npx tailwindcss init -p.修改配置文件 tailwind.config.js 2.6版本 &#xff1a; module.exports {purge: [./index.html, ./src/**/*.{vue,j…