Java虚拟线程探索

在Java 21中,引入了虚拟线程,这是一个非常非常重要的特性,之前一直苦苦寻找的Java协程,终于问世了。在高并发以及IO密集型的应用中,虚拟线程能极大的提高应用的性能和吞吐量。

什么是虚拟线程

先来看一下虚拟线程的概念。

虚拟线程概念

DK 21 引入了虚拟线程的支持,这是为了改善 Java 应用程序在高并发场景下的性能。虚拟线程是一种轻量级线程,具有较小的内存占用,能够更高效地进行上下文切换,适用于 I/O 密集型的应用程序

虚拟线程的工作原理

当应用程序启动一个虚拟线程时,JVM会将这个虚拟线程交给JVM底层的线程池去执行,这个底层的线程池是一个传统线程池,并且真正执行虚拟线程中任务的线程,也是传统线程(操作系统线程)。当虚拟线程遇到阻塞时,JVM会立刻将虚拟线程挂起,让其它虚拟线程执行。也就是说,开启一个虚拟线程,并不需要启用一个传统线程,一般一个传统线程,可以执行多个虚拟线程的任务。在执行过程中,可以把虚拟线程理解成任务task。

这里举一个列子,假设用户创建了1000个虚拟线程,JVM的执行虚拟线程的线程池线程数是10,那么当第一个虚拟线程V1需要执行时,JVM会将V1调度到传统线程T1上,以此类推,虚拟线程V2会被调度到传统线程T2上,那么V3->T3,V4->T4,… V10->T10。当执行到V11时,这里有三种情况:

  • 如果V1~V10中有任何一个线程遇到阻塞,我们这里假设V3遇到阻塞,那么JVM会将V3挂起,此时T3线程可用,那么V11被T3执行。

  • 如果V1~V10没有线程被阻塞,那么JVM根据划分的时间片,假设每个虚拟线程允许执行100ns,那么过了100ns后,这里V1最新执行,JVM则将V1挂起,让T1去执行V11。

  • 如果以上两种情况都不满足,那么先将V11挂起,等待有可用的传统线程时,再执行V11。

对于被阻塞的线程,如V3,当IO结束后,操作系统会通过事件,如epoll通知JVM,V3的IO操作已结束,此时JVM重新唤醒V3,选择可用的传统线程,来执行V3的任务。

这里需要注意两点:

  • 虚拟线程IO执行完成后,会通过操作系统的事件通知机制,如epoll来通知JVM。这一点对于虚拟线程的高效调度至关重要,因为它确保了 阻塞的 I/O 操作 不会占用操作系统线程的时间片,避免了传统线程池的高资源消耗和效率低下。。

  • JVM在对虚拟线程进行上下文切换时,因为不涉及到操作系统级别的线程上下文切换,代价非常低,速度也非常快。

虚拟线程的调度

一般来说,程序员不需要对虚拟线程的调度进行管理,在JDK 21中,JVM默认启用了虚拟线程,并且会使用默认的ForkJoinPool线程池来执行虚拟线程,并且线程池的大小,也会根据虚拟线程的数量,进行动态调整。如果需要手动管理执行虚拟线程的线程池大小,那么需要自定义线程池,并将虚拟线程交给自定义的线程池来执行,这样虽然可行,通常没有必要。

虚拟线程与传统线程区别

虚拟线程与传统线程的区别主要在于:

  • 创建虚拟线程时,JVM不会创建一个操作系统线程,创建一个传统线程时,JVM会创建一个操作系统线程。一个传统线程,可以轮询执行多个虚拟线程。

  • 虚拟线程是由传统线程来执行的,虚拟线程的调度由JVM控制,传统线程的执行和调度,由操作系统来控制。

  • 虚拟线程的上下文切换是由JVM控制的,因为不涉及到操作系统级别线程的上下文切换,虚拟线程上下文切换速度非常快,可以满足高并发需求。

  • 创建一个虚拟线程占用的内存非常小,相对而言,创建一个传统线程,占用的内存空间大。在应用中,可以创建大量的虚拟线程,一般支持到百万级,而创建传统线程,一般只能到几千,我们一般也不建议创建这么多传统线程。

虚拟线程类似于task,传统系统与操作系统线程对应,一个传统线程可以执行多个虚拟线程。虚拟线程与task的区别是,当传统线程执行虚拟线程时,遇到阻塞会挂起虚拟线程,当传统线程执行task时,遇到阻塞就真的阻塞了。当然传统中的task继承自runnable,虚拟线程继承自Thread,他们属于不同的类,可调用的方法也不一样。

JDK也提供了虚拟线程池,可以通过下面方式得到一个虚拟线程池。

import java.util.concurrent.*;public class VirtualThreadPoolExample {public static void main(String[] args) {// 创建一个虚拟线程池ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();// 提交多个任务到线程池for (int i = 0; i < 10; i++) {final int taskId = i;executor.submit(() -> {System.out.println("Task " + taskId + " running in " + Thread.currentThread());});}// 关闭线程池executor.shutdown();}
}

上面代码中,提交给线程池的任务,JVM都会为其创建一个虚拟线程,然后以虚拟线程的方式执行。

与传统的线程池相比,虚拟线程池无法设置核心线程数、最大线程数、线程池大小、任务队列等参数,也不需要设置这些参数。

虚拟线程与传统线程的相同之处:

  • 他们都继承自Thread,用法一摸一样。也都支持线程池。

  • 与传统一样,虚拟线程也有new,runnable,waiting,blocked,terminated等状态。

  • 所有的锁,同步机制,对虚拟线程都适用,并且与传统线程一样,虚拟线程也会有资源争夺以及状态同步问题。并且也有上下文切换,虽然虚拟线程的上下文切换,代价非常小。

  • 异常处理机制一样,如果遇到异常不处理,虚拟线程也会终止执行。

虚拟线程与协程的区别

协程是python中的异步编程技术,对于IO密集型应用,协程可以发挥很大的优势。协程的异步工作原理与虚拟线程相似,也是遇到IO就阻塞,让主线程继续执行其它任务,当IO完成时,操作系统通过事件机制,如epoll,通知python进程,产生一个事件,放到event loop队列中,最后由主线程执行。

虚拟线程与协程的主要区别在于:

区别 虚拟线程 协程
并发/并行 虚拟线程是并行的,多个虚拟线程可以同时在多个CPU上运行,同一时刻,可以运行多个虚拟线程。从这个角度将,虚拟线程能支持更高的并发。 协程不是并行的,因为只有一个主线程执行任务事件,同一时刻,只有一个任务被处理。
资源争夺 虚拟线程中,存在资源争夺问题,以及状态同步问题,在编写代码时,需要考虑并发控制。甚至需要做合理的并发设计。 因为只有一个主线程在执行任务事件,没有并发问题,编程时也不需要考虑并发问题。
框架支持 虚拟线程是JDK 21的新特性,不需要任何框架支持。 需要框架支持,写异步代码和同步代码,使用的是两个完全不同的框架,另外学习异步编程,增加了学习成本。并且异步编程有些难度,debug也变得复杂些。

怎样使用虚拟线程

在JDK 21中,使用虚拟线程有两种方式:

  • 直接创建并启动虚拟线程。
public class VirtualThreadExample {public static void main(String[] args) {Thread virtualThread = Thread.ofVirtual().start(() -> {System.out.println("Hello virtual thread ");});try {virtualThread.join();  // 等待虚拟线程完成} catch (InterruptedException e) {e.printStackTrace();}}
}
  • 通过线程池执行虚拟线程。
import java.util.concurrent.*;public class VirtualThreadPoolExample {public static void main(String[] args) {// 创建一个虚拟线程池ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();// 提交多个任务到线程池for (int i = 0; i < 10; i++) {final int taskId = i;executor.submit(() -> {System.out.println("Task " + taskId + " running in " + Thread.currentThread());});}// 关闭线程池executor.shutdown();}
}

通过线程池执行任务时,无法对并发实现控制,容易造成OOM,或耗尽服务方资源,可以自定义以下虚拟线程池,实现资源控制:

package com.zengbiaobiao.demo.vitrualthreaddemo;import org.springframework.lang.NonNull;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;/****** 虚拟线程池,支持配置任务队列数和最大并发任务数*/
public class VirtualThreadExecutorService extends AbstractExecutorService {private volatile boolean shouldStop = false;private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();private final Semaphore semaphore;private final BlockingQueue<Runnable> taskQueue;/******* 构造函数* @param taskQueueSize,任务队列大小,任务队列是一个阻塞队列,如果任务队列满了,那么调用execute方法会阻塞* @param concurrencySize,并发任务大小,同时执行的IO任务个数,防止并发过重,或者资源不够*/public VirtualThreadExecutorService(int taskQueueSize, int concurrencySize) {this.semaphore = new Semaphore(concurrencySize);taskQueue = new LinkedBlockingQueue<>(taskQueueSize);this.loopEvent();}private void loopEvent() {Thread.ofVirtual().name("VirtualThreadExecutor").start(() -> {while (!shouldStop) {try {Runnable task = taskQueue.take();semaphore.acquire();executor.execute(() -> {try {try {task.run();} finally {semaphore.release();}} catch (Exception e) {Thread.currentThread().interrupt();throw new RuntimeException(e);}});} catch (InterruptedException e) {Thread.currentThread().interrupt();if (shouldStop) break;}}});}@Overridepublic void shutdown() {shouldStop = true;executor.shutdown();}/*** @return The task not executed*/@Overridepublic List<Runnable> shutdownNow() {shouldStop = true;List<Runnable> remainingTasks = new ArrayList<>(taskQueue);taskQueue.clear();executor.shutdownNow();return remainingTasks;}@Overridepublic boolean isShutdown() {return shouldStop;}@Overridepublic boolean isTerminated() {return shouldStop && executor.isTerminated();}@Overridepublic boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {return executor.awaitTermination(timeout, unit);}@Overridepublic void execute(Runnable command) {try {taskQueue.put(command); // 阻塞直到队列有空间} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RejectedExecutionException("Task submission interrupted.", e);}}
}

测试代码如下:

package com.zengbiaobiao.demo.vitrualthreaddemo;import org.apache.tomcat.util.threads.VirtualThreadExecutor;public class VirtualThreadExecutorServiceDemo {public static void main(String[] args) throws InterruptedException {VirtualThreadExecutorService executorService = new VirtualThreadExecutorService(10, 2);for (int i = 0; i < 100000; i++) {final String threadName = "thread-" + i;System.out.println(Thread.currentThread() + ": try to create task " + threadName);executorService.submit(() -> {System.out.println(Thread.currentThread() + ": " + threadName + " created!");try {Thread.sleep(2000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread() + ": " + threadName + " finished!");});}Thread.sleep(5000000);}
}

哪些场景下可以应用虚拟线程

虚拟线程在IO密集型的高并发应用中能发挥出巨大的威力,在所有IO密集型应用中,具体来说,下列场景中,使用虚拟线程是比较合适的:

  • 短时间需要完成的任务,且没有资源争夺或乱序问题,比如数据库写入,服务器 HTTP 请求处理,远程 RESTful API 调用,RabbitMQ 消息处理等应用场景。。

  • 长时间运行的任务,但是对消息处理由顺序要求的任务。比如在电梯监控系统中,需要对每台电梯的数据进行处理,但是需要保证消息被处理的顺序。这时可以为每台电梯创建一个虚拟线程,这台电梯的数据交给专门的虚拟线程处理。因为应用中可以创建大量虚拟线程,并且虚拟线程一般都是异步处理任务,所以这个场景中,使用虚拟线程,可以满足高性能和高并发的要求。

  • API网关中,对多个上游API数据进行查询,组装合并,使用虚拟线程,相比传统线程,效果更佳。虚拟线程,也支持CountDownLatch,Semaphore等工具类。

  • 事件驱动的架构中,使用虚拟线程,效果也很好。比如spring boot中的异步事件,默认使用的是传统线程池,如果将其改成虚拟线程池,并发处理能力可以极大提高。

那么哪些场景下不合适使用虚拟线程呢?

  • CPU密集型应用,比如大数据处理、图像处理、矩阵运算等。

  • 如果应用有很高的并发资源争夺,或者状态同步,并且造成系统吞吐量低,需要考虑优化并发模型,这种场景下,不但传统线程不合适,虚拟线程也不合适。

虚拟线程实际应用场景举例

在一个spring boot项目中,有时候因为异步事件处理不过来,造成吞吐量下降,在JDK 21中,可以将事件改成虚拟线程来执行,代码如下:

package com.zengbiaobiao.demo.vitrualthreaddemo;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;@Configuration
@EnableAsync
public class AsyncConfig {@Bean(name = "taskExecutor")public Executor taskExecutor() {// 最大并行任务数Semaphore semaphore = new Semaphore(100);ExecutorService virtualThreadPool = Executors.newVirtualThreadPerTaskExecutor();return runnable -> {try {// 控制并行任务数semaphore.acquire();virtualThreadPool.submit(() -> {try {runnable.run();} finally {semaphore.release();}});} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RuntimeException("Task submission interrupted", e);}};}
}

事件发送和处理代码如下:

package com.zengbiaobiao.demo.vitrualthreaddemo;import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/home")
public class HomeController {private final ApplicationEventPublisher eventPublisher;public HomeController(ApplicationEventPublisher eventPublisher) {this.eventPublisher = eventPublisher;}@GetMapping("/index")public String index() {for (int i = 0; i < 1000; i++) {eventPublisher.publishEvent("event " + i);}return "success";}@EventListener@Asyncpublic void handleEvent(String event) {System.out.println(Thread.currentThread() + ": " + event);try {Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}

输出结果如下:

VirtualThread[#2031]/runnable@ForkJoinPool-1-worker-4: event 976
VirtualThread[#2039]/runnable@ForkJoinPool-1-worker-1: event 980
VirtualThread[#1064]/runnable@ForkJoinPool-1-worker-1: event 983
VirtualThread[#2047]/runnable@ForkJoinPool-1-worker-2: event 984
VirtualThread[#2049]/runnable@ForkJoinPool-1-worker-9: event 985
VirtualThread[#2057]/runnable@ForkJoinPool-1-worker-2: event 989
VirtualThread[#2059]/runnable@ForkJoinPool-1-worker-3: event 990
VirtualThread[#2061]/runnable@ForkJoinPool-1-worker-6: event 991
VirtualThread[#2063]/runnable@ForkJoinPool-1-worker-10: event 992
VirtualThread[#2065]/runnable@ForkJoinPool-1-worker-10: event 993
VirtualThread[#2071]/runnable@ForkJoinPool-1-worker-3: event 996
VirtualThread[#2069]/runnable@ForkJoinPool-1-worker-2: event 995
VirtualThread[#2075]/runnable@ForkJoinPool-1-worker-7: event 998
VirtualThread[#2077]/runnable@ForkJoinPool-1-worker-10: event 999

上面输出结果中,每次并发执行100个任务,当虚拟线程池任务达到100之后,执行eventPublisher.publishEvent("event " + i)代码时,代码阻塞,过100ms之后,100个任务执行完成,下一批任务被执行。

虚拟线程使用注意事项

  • 搞清楚任务类型,是IO密集型,还是CPU密集型

  • 与传统线程结合使用

  • 关注性能和资源,使用虚拟线程无法通过线程池等工具控制并发,需要借助Semepha,CountdownLatch等工具才能限流,如果不限流,容易造成OOM,或对目标系统造成巨大流量冲击。

  • 在异步框架中,关注隐藏的传统线程,比如在HttpClient的异步请求中,每次异步请求都会创建一个HttpClient回调线程。大量的传统线程被间接创建,也容易引起OOM。

  • 由synchronized关键字引起的pinned问题,看起来在JDK 21中,做了一些优化,即便虚拟线程pinned到传统线程,也只是性能退回到传统线程,无非是慢一点,反而不是太大问题。经过大量测试,发现基本只出现一次,之后不会再出现。不过使用ReentrantLock,效果确实会好很多,将synchronized关键字改成lock.()和lock.unlock(),ForkJoinPool中的线程数量会降低,并且任务分配均衡。

  • 不要忽略软件设计,尤其在需要大量同步的应用中。

经过验证,虚拟线程在遇到IO时,确实会让步,并且不消耗太多资源,核心特点是,让异步编程变得简单,并且不需要框架支持。但是容易因大的并发,造成OOM,或者对目标系统造成冲击,追求高并发可用,但一定要做测试和验证。对于需要做状态同步,如需要加锁,或需要使用synchronize关键字的代码,需要优化设计,如果无法规避,那么,使用虚拟线程,和使用线程池,效果差不多。

虚拟线程存在的问题:

Java Virtual Threads — some early gotchas to look out for

Two Pitfalls by moving to Java Virtual Threads

Java 21 Virtual Threads - Dude, Where’s My Lock?

Pitfalls to avoid when switching to Virtual threads

Do Java 21 virtual threads address the main reason to switch to reactive single-thread frameworks?

Pinning: A pitfall to avoid when using virtual threads in Java

Taming the Virtual Threads: Embracing Concurrency With Pitfall Avoidance

Pitfalls you encounter with virtual threads


示例代码在Gitee上同步,你也可以访问曾彪彪的个人博客点击查看作者更多文章

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/880012.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

回归本质:第一性原理引领团队项目管理新方向

第一性原理作为一种极具颠覆性和创新性的思维工具,在团队项目管理中发挥着重要作用。通过归零思考、解构现象和重构模式等步骤,团队能够更清晰地认识项目的本质需求,发现潜在问题并制定针对性的解决方案。同时,借助专业的项目管理软件如板栗看板,团队可以进一步提高管理效…

详细教程 | 如何使用DolphinScheduler调度Flink实时任务

Apache DolphinScheduler 非常适用于实时数据处理场景,尤其是与 Apache Flink 的集成。DolphinScheduler 提供了丰富的功能,包括任务依赖管理、动态调度、实时监控和日志管理,能够有效简化 Flink 实时任务的管理和部署。通过 DolphinScheduler 的可视化界面,用户可以轻松创…

htb Topology walkthrough

80端口在这里有个链接点击会跳转到子域名输入公式他能生成图片上网搜索 发现一个比较有用的 https://exexute.github.io/2019/04/24/how-hacking-with-LaTex/点击查看代码 \newread\file \openin\file=/etc/passwd \read\file to\1 \text{\1} \closein\file让chatgpt修改一下点…

window10本地搭建DeepSeek R1(二)

本章介绍在window上部署 DeepSeek R1-8B + Open WebUI :需要安装的有:Ollama,python 3.11,DeepSeek ,Open WebUI。 一:环境:我的window10的环境是:12核32G GPU8+16(目前看可以运行8B的模型) 二:上一张介绍了Ollama+DeepSeek的安装,这里介绍 OpenWebUI的安装。 1:可…

windows 系统上搭建 Phpstudy 集成环境 + DVWA 靶场

一、前言 网站是由中间件、网站程序、数据库等组成的。中间件可以选择 Apache、Nginx、IIS 等,网站程序可以选择 PHP、JSP、ASP等,数据库可以选择MySQL等。有时候可单独安装,有时也可使用便捷的集成环境来安装。 网站搭建常见集成环境:Phpstudy 、XMAPP、Wamp 二、Phpstudy…

vscode golang test单元测试缓存问题

打开设置搜索 Go: Test Flags 添加"-count=1"

必看:DeepSeek-R1本地部署!超详细教程~

最近国产AI大模型DeepSeek爆火出圈,登顶中美App Store下载榜,还在性能、性价比上碾压了ChatGPT和Google Gemini等硅谷巨头,直接杀入科技圈C位,成为现象级应用! 然而,不少小伙伴在使用的时候,经常会出现响应迟缓甚至宕机的情况👇这可怎么办? 万幸的是,DeepSeek是一个…

鸿蒙页面开发 - 组件复用样式 @Styles

这篇文章介绍一个装饰器 @Styles 他的主要作用是:当多个组件都有相同的样式,如果每个组件单独设置,会造成大量重复的代码冗余。这时我们可以使用 @Styles 将这些相同样式封装成一个方法,供这些组件调用,达到复用样式的目的 使用方法 @Styles 使用分为两种情况,在组件内使…

春节档票房背后:电影院如何通过项目管理软件提升运营效率?

春节档的火爆票房为电影院带来了前所未有的机遇和挑战。为了抓住这一机遇并应对挑战,电影院需要加强项目管理,并借助数字化转型的力量实现高效运营。在未来的发展中,电影院应积极探索数字化转型的新路径,以更加智能化、自动化的管理方式迎接市场的不断变化和观众的多样化需…

SAP ABAP 中的 STOP EXIT CHECK 的区别

SAP ABAP 中的 STOP EXIT CHECK 的区别PARAMETERS p_mode(5).START-OF-SELECTION.CASE p_mode.WHEN STOP.WRITE / testing stop.STOP.WHEN EXIT.WRITE / test exit.EXIT.WHEN OTHERS.CHECK p_mode = CHECK.WRITE / test check.ENDCASE.WRITE / END OF START-OF-SELECTION.END-O…

当代码哲学遇上AI革命:重读《软件设计的哲学》的启示

引言:软件复杂性的永恒之困 John Ousterhout的《软件设计的哲学》并非一本传统意义上的“编码规范手册”,而是一部探讨软件系统本质性矛盾的哲学著作。书中对“复杂性”的剖析直指软件工程的阿喀琉斯之踵——如何驯服系统熵增,让代码在迭代中保持生命力?在AI工具重构编程范…

window10本地搭建DeepSeek R1(一)

本章介绍在window上部署 DeepSeek R1-8B + Open WebUI :需要安装的有:Ollama,python 3.11,DeepSeek ,Open WebUI。 一:环境:我的window10的环境是:12核32G GPU8+16(目前看可以运行8B的模型) 二:安装Ollama :https://ollama.com/ 直接下载window的。下载完成后直…