SynchronousQueue底层实现原理剖

news/2025/2/21 17:05:42/文章来源:https://www.cnblogs.com/jock766/p/18725717

一、SynchronousQueue底层实现原理剖

SynchronousQueue(同步移交队列),队列长度为0。作用就是一个线程往队列放数据的时候,必须等待另一个线程从队列中取走数据。同样,从队列中取数据的时候,必须等待另一个线程往队列中放数据


二、SynchronousQueue用法

先看一个SynchronousQueue的简单用例:

    public class SynchronousQueueDemo {public static void main(String[] args) throws InterruptedException {// 1. 创建SynchronousQueue队列BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>();// 2. 启动一个线程,往队列中放3个元素new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + " 入队列 1");synchronousQueue.put(1);Thread.sleep(1);System.out.println(Thread.currentThread().getName() + " 入队列 2");synchronousQueue.put(2);Thread.sleep(1);System.out.println(Thread.currentThread().getName() + " 入队列 3");synchronousQueue.put(3);} catch (InterruptedException e) {e.printStackTrace();}}).start();// 3. 等待1000毫秒Thread.sleep(1000L);// 4. 再启动一个线程,从队列中取出3个元素new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());Thread.sleep(1);System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());Thread.sleep(1);System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();}}


输出结果:

Thread-0 入队列 1
Thread-1 出队列 1Thread-0 入队列 2
Thread-1 出队列 2Thread-0 入队列 3
Thread-1 出队列 3

从输出结果中可以看到,第一个线程Thread-0往队列放入一个元素1后,就被阻塞了。直到第二个线程Thread-1从队列中取走元素1后,Thread-0才能继续放入第二个元素2


三、SynchronousQueue应用场景

  • SynchronousQueue的特点:

    • 队列长度是0,一个线程往队列放数据,必须等待另一个线程取走数据。同样,一个线程从队列中取数据,必须等待另一个线程往队列中放数据。


这种特殊的实现逻辑有什么应用场景呢 ?


我的理解就是,如果你希望你的任务需要被快速处理,就可以使用这种队列。


Java线程池中的 newCachedThreadPool(带缓存的线程池)底层就是使用SynchronousQueue实现的。

public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}


newCachedThreadPool线程池的核心线程数是0,最大线程数是Integer的最大值,线程存活时间是60秒。


如果你使用 newCachedThreadPool 线程池,你提交的任务会被更快速的处理,因为你每次提交任务,都会有一个空闲的线程等着处理任务。如果没有空闲的线程,也会立即创建一个线程处理你的任务。


当然也有弊端,如果你提交了太多的任务,导致创建了大量的线程,这些线程都在竞争CPU时间片,等待CPU调度,处理任务速度也会变慢,所以在使用过程中也要综合考虑。


四、SynchronousQueue源码解析


1、SynchronousQueue类属性

  public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {// 转换器,取数据和放数据的核心逻辑都在这个类里面private transient volatile Transferer<E> transferer;// 默认的构造方法(使用非公平队列)public SynchronousQueue() {this(false);}// 有参构造方法,可以指定是否使用公平队列public SynchronousQueue(boolean fair) {transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();}// 转换器实现类abstract static class Transferer<E> {abstract E transfer(E e, boolean timed, long nanos);}// 基于栈实现的非公平队列static final class TransferStack<E> extends Transferer<E> {}// 基于队列实现的公平队列static final class TransferQueue<E> extends Transferer<E> {}}

可以看到SynchronousQueue默认的无参构造方法,内部使用的是基于栈实现的非公平队列,当然也可以调用有参构造方法,传参是true,使用基于队列实现的公平队列。

// 使用非公平队列(基于栈实现)
BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>();// 使用公平队列(基于队列实现)
BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>(true);


五、常用的栈实现来剖析SynchronousQueue的底层实现原理


1、栈底层结构

栈结构,是非公平的,遵循先进后出。


使用个case测试一下

  public class SynchronousQueueDemo {public static void main(String[] args) throws InterruptedException {// 1. 创建SynchronousQueue队列SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>();// 2. 启动一个线程,往队列中放1个元素new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + " 入队列 0");synchronousQueue.put(0);} catch (InterruptedException e) {e.printStackTrace();}}).start();// 3. 等待1000毫秒Thread.sleep(1000L);// 4. 启动一个线程,往队列中放1个元素new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + " 入队列 1");synchronousQueue.put(1);} catch (InterruptedException e) {e.printStackTrace();}}).start();// 5. 等待1000毫秒Thread.sleep(1000L);// 6. 再启动一个线程,从队列中取出1个元素new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();// 7. 等待1000毫秒Thread.sleep(1000L);// 8. 再启动一个线程,从队列中取出1个元素new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();}}


输出结果:

Thread-0 入队列 0
Thread-1 入队列 1Thread-2 出队列 1
Thread-3 出队列 0

从输出结果中可以看出,符合栈结构先进后出的顺序。


2、栈节点源码

栈中的数据都是由一个个的节点组成的,先看一下节点类的源码:

// 节点
static final class SNode {// 节点值(取数据的时候,该字段为null)Object item;// 存取数据的线程volatile Thread waiter;// 节点模式int mode;// 匹配到的节点volatile SNode match;// 后继节点volatile SNode next;
}
  • item:节点值,只在存数据的时候用。取数据的时候,这个值是null。

  • waiter:存取数据的线程,如果没有对应的接收线程,这个线程会被阻塞。

  • mode:节点模式,共有3种类型:


3、put/take流程


放数据和取数据的逻辑,在底层复用的是同一个方法,以put/take方法为例,另外两个放数据的方法,add和offer方法底层实现是一样的


还是以上面的case为例:

1、Thread0先往SynchronousQueue队列中放入元素 0

2、Thread1再往SynchronousQueue队列中放入元素 1

3、Thread2从SynchronousQueue队列中取出一个元素


第一步:Thread0先往SynchronousQueue队列中放入元素 0


把本次操作组装成SNode压入栈顶,item是元素0,waiter是当前线程Thread0,mode是1表示放入数据。

第二步:Thread1再往SynchronousQueue队列放入元素 1


把本次操作组装成SNode压入栈顶,item是元素1,waiter是当前线程Thread1,mode是1表示放入数据,next是SNode0。

第三步:Thread2从SynchronousQueue队列中取出一个元素


这次的操作比较复杂,也是先把本次的操作包装成SNode压入栈顶。


item是null(取数据的时候,这个字段没有值),waiter是null(当前线程Thread2正在操作,所以不用赋值了),mode是2表示正在操作(即将跟后继节点进行匹配),next是SNode1。

然后,Thread2开始把栈顶的两个节点进行匹配,匹配成功后,就把SNode2赋值给SNode1的match属性,唤醒SNode1中的Thread1线程,然后弹出SNode2节点和SNode1节点。


4、put/take源码实现


看完 了put/take流程,再来看源码就简单多了


先看一下put方法源码:

// 放数据
public void put(E e) throws InterruptedException {// 不允许放null元素if (e == null)throw new NullPointerException();// 调用转换器实现类,放元素if (transferer.transfer(e, false, 0) == null) {// 如果放数据失败,就中断当前线程,并抛出异常Thread.interrupted();throw new InterruptedException();}
}

核心逻辑都在transfer方法中,代码很长,理清逻辑后,也很容易理解。

  // 取数据和放数据操作,共用一个方法E transfer(E e, boolean timed, long nanos) {SNode s = null;// e为空,说明是取数据,否则是放数据int mode = (e == null) ? REQUEST : DATA;for (; ; ) {SNode h = head;// 1. 如果栈顶节点为空,或者栈顶节点类型跟本次操作相同(都是取数据,或者都是放数据)if (h == null || h.mode == mode) {// 2. 判断节点是否已经超时if (timed && nanos <= 0) {// 3. 如果栈顶节点已经被取消,就删除栈顶节点if (h != null && h.isCancelled())casHead(h, h.next);elsereturn null;// 4. 把本次操作包装成SNode,压入栈顶} else if (casHead(h, s = snode(s, e, h, mode))) {// 5. 挂起当前线程,等待被唤醒SNode m = awaitFulfill(s, timed, nanos);// 6. 如果这个节点已经被取消,就删除这个节点if (m == s) {clean(s);return null;}// 7. 把s.next设置成headif ((h = head) != null && h.next == s)casHead(h, s.next);return (E) ((mode == REQUEST) ? m.item : s.item);}// 8. 如果栈顶节点类型跟本次操作不同,并且不是FULFILLING类型} else if (!isFulfilling(h.mode)) {// 9. 再次判断如果栈顶节点已经被取消,就删除栈顶节点if (h.isCancelled())casHead(h, h.next);// 10. 把本次操作包装成SNode(类型是FULFILLING),压入栈顶else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) {// 11. 使用死循环,直到匹配到对应的节点for (; ; ) {// 12. 遍历下个节点SNode m = s.next;// 13. 如果节点是null,表示遍历到末尾,设置栈顶节点是null,结束。if (m == null) {casHead(s, null);s = null;break;}SNode mn = m.next;// 14. 如果栈顶的后继节点跟栈顶节点匹配成功,就删除这两个节点,结束。if (m.tryMatch(s)) {casHead(s, mn);return (E) ((mode == REQUEST) ? m.item : s.item);} else// 15. 如果没有匹配成功,就删除栈顶的后继节点,继续匹配s.casNext(m, mn);}}} else {// 16. 如果栈顶节点类型跟本次操作不同,并且是FULFILLING类型,// 就再执行一遍上面第11步for循环中的逻辑(很少概率出现)SNode m = h.next;if (m == null)casHead(h, null);else {SNode mn = m.next;if (m.tryMatch(h))casHead(h, mn);elseh.casNext(m, mn);}}}}

transfer方法逻辑也很简单,就是判断本次操作类型是否跟栈顶节点相同,如果相同,就把本次操作压入栈顶。否则就跟栈顶节点匹配,唤醒栈顶节点线程,弹出栈顶节点。


transfer方法中调用了awaitFulfill方法,作用是挂起当前线程。

  // 等待被唤醒SNode awaitFulfill(SNode s, boolean timed, long nanos) {// 1. 计算超时时间final long deadline = timed ? System.nanoTime() + nanos : 0L;Thread w = Thread.currentThread();// 2. 计算自旋次数int spins = (shouldSpin(s) ?(timed ? maxTimedSpins : maxUntimedSpins) : 0);for (;;) {if (w.isInterrupted())s.tryCancel();// 3. 如果已经匹配到其他节点,直接返回SNode m = s.match;if (m != null)return m;if (timed) {// 4. 超时时间递减nanos = deadline - System.nanoTime();if (nanos <= 0L) {s.tryCancel();continue;}}// 5. 自旋次数减一if (spins > 0)spins = shouldSpin(s) ? (spins-1) : 0;else if (s.waiter == null)s.waiter = w;// 6. 开始挂起当前线程else if (!timed)LockSupport.park(this);else if (nanos > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanos);}}

awaitFulfill方法的逻辑也很简单,就是挂起当前线程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/887008.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[2025.2.18 JavaWeb学习]Mybatis

入门MyBatis是一款优秀的持久层(dao)框架,用于简化JDBC的开发 使用:IDEA创建SpringBoot模块,而后勾选MySQL Driver和MyBatis Framework pom.xml引入了mybatis-spring-boot-starter起步依赖、com.mysql依赖包、org.springframework.boot单元测试依赖 pojo中,创建实体对象,…

ROS2-PX4学习笔记

搞点小研究,欢迎对ROS2、PX4感兴趣的同学浏览。希望Mujica能好起来。ROS2-PX4学习笔记 前言 做毕设其实是个有点痛苦的过程。 毕竟所有东西都得现学,然后探索,时间又有限。 研究没接触过的新东西,总会失败,没有产出,搞久了人难受。 所以还是写一点东西吧。 我挺想把这个坑…

MessagePipe 中文文档

MessagePipe 是一个专为 .NET 和 Unity 设计的高性能内存/分布式消息管道。支持所有 Pub/Sub 场景、CQRS 中介模式、Prism 的 EventAggregator(视图与视图模型解耦)、IPC(进程间通信)-RPC 等。MessagePipeMessagePipe 是一个专为 .NET 和 Unity 设计的高性能内存/分布式消息…

Avalonia系列文章之样式与主题

随着社会的发展,大家对软件的要求,从最初的命令行输入输出,到可视化输入输出,如报表,图表等;从最初的可用性,稳定性为主,到现代软件理念中的便捷易用性转变,在保证稳定可用外,对软件的交互易用要求越来越高,而这些则离不开UI设计以及样式的应用。今天以一些简单的小…

SynchronousQueue的put方法底层源码

一、SynchronousQueue的put方法底层源码 SynchronousQueue 的 put 方法用于将元素插入队列。由于 SynchronousQueue 没有实际的存储空间,put 方法会阻塞,直到有消费者线程调用 take 方法移除元素 1、put 方法的作用将元素插入队列。如果没有消费者线程等待,当前线程会阻塞,…

【Tomcat文件上传】绕WAF姿势深入研究

环境 本地环境tomcat8.5.93,无黑白名单限制getSubmittedFileName()函数 tomcat可以通过filePart.getSubmittedFileName();获取上传文件的原始名filename获取Content-Disposition头后,判断值是否form-data或attachment开头 然后112行将form-data; name="file"; file…

关于一个手机控制电脑执行特定任务的解决方案探索【1】

【前言】 说来话长,关于这个手机控制电脑执行特定任务的想法早在几年前就有,但因为对安卓平台开发经验实在不足,就一直拖到了现在。不过好在没有忘记初衷,接下来我们一起来看我的思路和方法。 【思路】 想要通过手机作为控制端,来发送指令给同一网络下的电脑端,执行特定任…

如何升级 PowerShell 到最新版本

前言最近,需要大量使用PowerShell,然后有需要PowerShell 7正文升级的步骤也比较简单,按照下面的步骤就好了文字版本的,方便大家复制粘贴。PS C:\WINDOWS\system32> $PSVersionTable.PSVersionMajor Minor Build Revision ----- ----- ----- -------- 5 1 …

百万架构师第四十课:RabbitMq:RabbitMq-工作模型与JAVA编程|JavaGuide

来源:https://javaguide.net RabbitMQ 1-工作模型与Java编程 课前准备 预习资料 Windows安装步骤 Linux安装步骤 官网文章中文翻译系列 环境说明 操作系统:CentOS 7 JDK:1.8 Erlang:19.0.4或最新版 RabbitMQ:3.6.12或最新版 版本对应关系 典型应用场景跨系统的异步通信。人…

1月16日java假期学习读书笔记

一、学习目标 掌握HTML的基本结构和常用标签。 了解CSS的基本选择器和样式规则。 通过实际代码练习,构建一个简单的网页。 二、学习内容 (一)HTML基础 HTML简介 HTML(HyperText Markup Language,超文本标记语言)是用于构建网页的标准标记语言。 它通过一系列的标签(如、…

MapStruct使用指南并结合Lombok

MapStruct使用指南并结合Lombokhttps://juejin.cn/post/6956190395319451679#heading-1 2024-01-11 18:34:06如何结合 lombok 也就说说如果代码中使用了 lombok 注解来生成代码,mapstruct 的 getter/setter 方法也使用了 lombok 的 api,那就需要额外的配置,因为这两个工具都是使…