SynchronousQueue的put方法底层源码

news/2025/2/21 15:42:31/文章来源:https://www.cnblogs.com/jock766/p/18725696

一、SynchronousQueue的put方法底层源码

SynchronousQueue 的 put 方法用于将元素插入队列。由于 SynchronousQueue 没有实际的存储空间,put 方法会阻塞,直到有消费者线程调用 take 方法移除元素


1、put 方法的作用

  • 将元素插入队列。

  • 如果没有消费者线程等待,当前线程会阻塞,直到有消费者线程移除元素。

  • 该方法不会返回任何值,也不会抛出异常(除非线程被中断)。


2、put 方法的源码

以下是 SynchronousQueue 中 put 方法的源码(基于 JDK 17):

可以看到,put 方法的核心逻辑是通过 transferer.transfer 方法实现的。transferer 是 SynchronousQueue 的内部组件,负责实际的数据传输


3、transferer.transfer 方法

transferer 是一个抽象类,有两个实现:

  • TransferStack:用于非公平模式。

  • TransferQueue:用于公平模式。


以下是 TransferStack 和 TransferQueue 中 transfer 方法的通用逻辑:


(1)TransferStack.transfer 方法

  E transfer(E e, boolean timed, long nanos) {SNode s = null; // 创建一个新节点int mode = (e == null) ? REQUEST : DATA; // 判断是生产者还是消费者for (;;) {SNode h = head; // 获取栈顶节点if (h == null || h.mode == mode) { // 如果栈为空或模式匹配if (timed && nanos <= 0) { // 如果超时if (h != null && h.isCancelled()) // 如果节点已取消casHead(h, h.next); // 移除已取消的节点elsereturn null; // 返回 null} else if (casHead(h, s = snode(s, e, h, mode))) { // 尝试插入新节点SNode m = awaitFulfill(s, timed, nanos); // 等待匹配if (m == s) { // 如果节点被取消clean(s); // 清理节点return null; // 返回 null}if ((h = head) != null && h.next == s) // 如果匹配成功casHead(h, s.next); // 移除匹配的节点return (E) ((mode == REQUEST) ? m.item : s.item); // 返回数据}} else if (!isFulfilling(h.mode)) { // 如果栈顶节点未完成匹配if (h.isCancelled()) // 如果节点已取消casHead(h, h.next); // 移除已取消的节点else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { // 尝试插入新节点for (;;) {SNode m = s.next; // 获取下一个节点if (m == null) { // 如果下一个节点为空casHead(s, null); // 重置栈顶s = null; // 重置节点break;}SNode mn = m.next;if (m.tryMatch(s)) { // 尝试匹配casHead(s, mn); // 移除匹配的节点return (E) ((mode == REQUEST) ? m.item : s.item); // 返回数据} elses.casNext(m, mn); // 移除未匹配的节点}}} else { // 如果栈顶节点已完成匹配SNode m = h.next; // 获取下一个节点if (m == null) // 如果下一个节点为空casHead(h, null); // 重置栈顶else {SNode mn = m.next;if (m.tryMatch(h)) // 尝试匹配casHead(h, mn); // 移除匹配的节点elseh.casNext(m, mn); // 移除未匹配的节点}}}}


(2)TransferQueue.transfer 方法

  E transfer(E e, boolean timed, long nanos) {QNode s = null; // 创建一个新节点boolean isData = (e != null); // 判断是生产者还是消费者for (;;) {QNode t = tail;QNode h = head;if (t == null || h == null) // 如果队列未初始化continue;if (h == t || t.isData == isData) { // 如果队列为空或模式匹配QNode tn = t.next;if (t != tail) // 如果 tail 已更新continue;if (tn != null) { // 如果 tail 未更新advanceTail(t, tn); // 更新 tailcontinue;}if (timed && nanos <= 0) // 如果超时return null; // 返回 nullif (s == null) // 如果节点未初始化s = new QNode(e, isData); // 创建新节点if (!t.casNext(null, s)) // 尝试插入新节点continue;advanceTail(t, s); // 更新 tailObject x = awaitFulfill(s, e, timed, nanos); // 等待匹配if (x == s) { // 如果节点被取消clean(t, s); // 清理节点return null; // 返回 null}if (!s.isOffList()) { // 如果节点未移除advanceHead(t, s); // 更新 headif (x != null) // 如果匹配成功s.item = s; // 标记节点s.waiter = null; // 清除等待线程}return (x != null) ? (E)x : e; // 返回数据} else { // 如果模式不匹配QNode m = h.next;if (t != tail || m == null || h != head) // 如果队列已更新continue;Object x = m.item;if (isData == (x != null) || x == m || !m.casItem(x, e)) { // 如果匹配失败advanceHead(h, m); // 移除未匹配的节点continue;}advanceHead(h, m); // 更新 headLockSupport.unpark(m.waiter); // 唤醒等待线程return (x != null) ? (E)x : e; // 返回数据}}}


4、关键点总结

  • 无存储空间:SynchronousQueue 没有容量,插入和移除操作必须一一对应。

  • 阻塞行为:如果没有配对的插入或移除操作,线程会一直阻塞。

  • 公平性:公平模式下,等待时间最长的线程优先获得执行机会。


二、SynchronousQueue的类结构

先看一下SynchronousQueue类里面有哪些属性:

public class SynchronousQueue<E>extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {/*** 转接器(栈和队列的父类)*/abstract static class Transferer<E> {/*** 转移(put和take都用这一个方法)** @param e     元素* @param timed 是否超时* @param nanos 纳秒*/abstract E transfer(E e, boolean timed, long nanos);}/*** 栈实现类*/static final class TransferStack<E> extends Transferer<E> {}/*** 队列实现类*/static final class TransferQueue<E> extends Transferer<E> {}}

SynchronousQueue底层是基于Transferer抽象类实现的,放数据和取数据的逻辑都耦合在transfer()方法中。而Transferer抽象类又有两个实现类,分别是基于栈结构实现和基于队列实现


1、初始化

SynchronousQueue常用的初始化方法有两个:

  • 1、无参构造方法

  • 2、指定容量大小的有参构造方法

       /*** 无参构造方法*/BlockingQueue<Integer> blockingQueue1 = new SynchronousQueue<>();/*** 有参构造方法,指定是否使用公平锁(默认使用非公平锁)*/BlockingQueue<Integer> blockingQueue2 = new SynchronousQueue<>(true);
    

再看一下对应的源码实现:

  /*** 无参构造方法*/public SynchronousQueue() {this(false);}/*** 有参构造方法,指定是否使用公平锁*/public SynchronousQueue(boolean fair) {transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();}

可以看出SynchronousQueue的无参构造方法默认使用的非公平策略,有参构造方法可以指定使用公平策略。 操作策略:

  • 1、公平策略,基于队列实现的是公平策略,先进先出。

  • 2、非公平策略,基于栈实现的是非公平策略,先进后出。


2、栈的类结构

/*** 栈实现*/
static final class TransferStack<E> extends Transferer<E> {/*** 头节点(也是栈顶节点)*/volatile SNode head;/*** 栈节点类*/static final class SNode {/*** 当前操作的线程*/volatile Thread waiter;/*** 节点值(取数据的时候,该字段为null)*/Object item;/*** 节点模式(也叫操作类型)*/int mode;/*** 后继节点*/volatile SNode next;/*** 匹配到的节点*/volatile SNode match;}
}

节点模式有以下三种:


3、栈的transfer方法实现

transfer()方法中,把放数据和取数据的逻辑耦合在一块了,逻辑有点绕,不过核心逻辑就四点,把握住就能豁然开朗。其实就是从栈顶压入,从栈顶弹出。


详细流程如下:


1、首先判断当前线程的操作类型与栈顶节点的操作类型是否一致,比如都是放数据,或者都是取数据。


2、如果是一致,把当前操作包装成SNode节点,压入栈顶,并挂起当前线程。


3、如果不一致,表示相互匹配(比如当前操作是放数据,而栈顶节点是取数据,或者相反)。然后也把当前操作包装成SNode节点压入栈顶,并使用tryMatch()方法匹配两个节点,匹配成功后,弹出两个这两个节点,并唤醒栈顶节点线程,同时把数据传递给栈顶节点线程,最后返回。


4、栈顶节点线程被唤醒,继续执行,然后返回传递过来的数据。

  /*** 转移(put和take都用这一个方法)** @param e     元素(取数据的时候,元素为null)* @param timed 是否超时* @param nanos 纳秒*/E transfer(E e, boolean timed, long nanos) {SNode s = null;// 1. e为null,表示要取数据,否则是放数据int mode = (e == null) ? REQUEST : DATA;for (; ; ) {SNode h = head;// 2. 如果本次操作跟栈顶节点模式相同(都是取数据,或者都是放数据),就把本次操作包装成SNode,压入栈顶if (h == null || h.mode == mode) {if (timed && nanos <= 0) {if (h != null && h.isCancelled()) {casHead(h, h.next);} else {return null;}// 3. 把本次操作包装成SNode,压入栈顶,并挂起当前线程} else if (casHead(h, s = snode(s, e, h, mode))) {// 4. 挂起当前线程SNode m = awaitFulfill(s, timed, nanos);if (m == s) {clean(s);return null;}// 5. 当前线程被唤醒后,如果栈顶有了新节点,就删除当前节点if ((h = head) != null && h.next == s) {casHead(h, s.next);}return (E) ((mode == REQUEST) ? m.item : s.item);}// 6. 如果栈顶节点类型跟本次操作不同,并且模式不是FULFILLING类型} else if (!isFulfilling(h.mode)) {if (h.isCancelled()) {casHead(h, h.next);}// 7. 把本次操作包装成SNode(类型是FULFILLING),压入栈顶else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) {// 8. 使用死循环,直到匹配到对应的节点for (; ; ) {// 9. 遍历下个节点SNode m = s.next;// 10. 如果节点是null,表示遍历到末尾,设置栈顶节点是null,结束。if (m == null) {casHead(s, null);s = null;break;}SNode mn = m.next;// 11. 如果栈顶的后继节点跟栈顶节点匹配成功,就删除这两个节点,结束。if (m.tryMatch(s)) {casHead(s, mn);return (E) ((mode == REQUEST) ? m.item : s.item);} else {// 12. 如果没有匹配成功,就删除栈顶的后继节点,继续匹配s.casNext(m, mn);}}}} else {// 13. 如果栈顶节点类型跟本次操作不同,并且是FULFILLING类型,// 就再执行一遍上面第8步for循环中的逻辑(很少概率出现)SNode m = h.next;if (m == null) {casHead(h, null);} else {SNode mn = m.next;if (m.tryMatch(h)) {casHead(h, mn);} else {h.casNext(m, mn);}}}}}

不用关心细枝末节,把握住代码核心逻辑即可。 再看一下第4步,挂起线程的代码逻辑: 核心逻辑就两条:

  • 第6步,挂起当前线程

  • 第3步,当前线程被唤醒后,直接返回传递过来的match节点

     /*** 等待执行** @param s     节点* @param timed 是否超时* @param nanos 超时时间*/SNode awaitFulfill(SNode s, boolean timed, long nanos) {// 1. 计算超时时间final long deadline = timed ? System.nanoTime() + nanos : 0L;Thread w = Thread.currentThread();// 2. 计算自旋次数int spins = (shouldSpin(s) ?(timed ? maxTimedSpins : maxUntimedSpins) : 0);for (; ; ) {if (w.isInterrupted())s.tryCancel();// 3. 如果已经匹配到其他节点,直接返回SNode m = s.match;if (m != null)return m;if (timed) {// 4. 超时时间递减nanos = deadline - System.nanoTime();if (nanos <= 0L) {s.tryCancel();continue;}}// 5. 自旋次数减一if (spins > 0)spins = shouldSpin(s) ? (spins - 1) : 0;else if (s.waiter == null)s.waiter = w;// 6. 开始挂起当前线程else if (!timed)LockSupport.park(this);else if (nanos > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanos);}}
    

再看一下匹配节点的tryMatch()方法逻辑: 作用就是唤醒栈顶节点,并当前节点传递给栈顶节点。

  /*** 匹配节点** @param s 当前节点*/boolean tryMatch(SNode s) {if (match == null &&UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {Thread w = waiter;if (w != null) {waiter = null;// 1. 唤醒栈顶节点LockSupport.unpark(w);}return true;}// 2. 把当前节点传递给栈顶节点return match == s;}


4、队列的类结构

  /*** 队列实现*/static final class TransferQueue<E> extends Transferer<E> {/*** 头节点*/transient volatile QNode head;/*** 尾节点*/transient volatile QNode tail;/*** 队列节点类*/static final class QNode {/*** 当前操作的线程*/volatile Thread waiter;/*** 节点值*/volatile Object item;/*** 后继节点*/volatile QNode next;/*** 当前节点是否为数据节点*/final boolean isData;}}

可以看出TransferQueue队列是使用带有头尾节点的单链表实现的。 还有一点需要提一下,TransferQueue默认构造方法,会初始化头尾节点,默认是空节点。

/*** TransferQueue默认的构造方法*/
TransferQueue() {QNode h = new QNode(null, false);head = h;tail = h;
}


队列的transfer方法实现


队列使用的公平策略,体现在,每次操作的时候,都是从队尾压入,从队头弹出。 详细流程如下:


1、首先判断当前线程的操作类型与队尾节点的操作类型是否一致,比如都是放数据,或者都是取数据。


2、如果是一致,把当前操作包装成QNode节点,压入队尾,并挂起当前线程。


3、如果不一致,表示相互匹配(比如当前操作是放数据,而队尾节点是取数据,或者相反)。然后在队头节点开始遍历,找到与当前操作类型相匹配的节点,把当前操作的节点值传递给这个节点,并弹出这个节点,唤醒这个节点的线程,最后返回。


4、队头节点线程被唤醒,继续执行,然后返回传递过来的数据。

    /*** 转移(put和take都用这一个方法)** @param e     元素(取数据的时候,元素为null)* @param timed 是否超时* @param nanos 超时时间*/E transfer(E e, boolean timed, long nanos) {QNode s = null;// 1. e不为null,表示要放数据,否则是取数据boolean isData = (e != null);for (; ; ) {QNode t = tail;QNode h = head;if (t == null || h == null) {continue;}// 2. 如果本次操作跟队尾节点模式相同(都是取数据,或者都是放数据),就把本次操作包装成QNode,压入队尾if (h == t || t.isData == isData) {QNode tn = t.next;if (t != tail) {continue;}if (tn != null) {advanceTail(t, tn);continue;}if (timed && nanos <= 0) {return null;}// 3. 把本次操作包装成QNode,压入队尾if (s == null) {s = new QNode(e, isData);}if (!t.casNext(null, s)) {continue;}advanceTail(t, s);// 4. 挂起当前线程Object x = awaitFulfill(s, e, timed, nanos);// 5. 当前线程被唤醒后,返回返回传递过来的节点值if (x == s) {clean(t, s);return null;}if (!s.isOffList()) {advanceHead(t, s);if (x != null) {s.item = s;}s.waiter = null;}return (x != null) ? (E) x : e;} else {// 6. 如果本次操作跟队尾节点模式不同,就从队头结点开始遍历,找到模式相匹配的节点QNode m = h.next;if (t != tail || m == null || h != head) {continue;}Object x = m.item;// 7. 把当前节点值e传递给匹配到的节点mif (isData == (x != null) || x == m ||!m.casItem(x, e)) {advanceHead(h, m);continue;}// 8. 弹出队头节点,并唤醒节点madvanceHead(h, m);LockSupport.unpark(m.waiter);return (x != null) ? (E) x : e;}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/887000.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Tomcat文件上传】绕WAF姿势深入研究

环境 本地环境tomcat8.5.93,无黑白名单限制getSubmittedFileName()函数 tomcat可以通过filePart.getSubmittedFileName();获取上传文件的原始名filename获取Content-Disposition头后,判断值是否form-data或attachment开头 然后112行将form-data; name="file"; file…

关于一个手机控制电脑执行特定任务的解决方案探索【1】

【前言】 说来话长,关于这个手机控制电脑执行特定任务的想法早在几年前就有,但因为对安卓平台开发经验实在不足,就一直拖到了现在。不过好在没有忘记初衷,接下来我们一起来看我的思路和方法。 【思路】 想要通过手机作为控制端,来发送指令给同一网络下的电脑端,执行特定任…

如何升级 PowerShell 到最新版本

前言最近,需要大量使用PowerShell,然后有需要PowerShell 7正文升级的步骤也比较简单,按照下面的步骤就好了文字版本的,方便大家复制粘贴。PS C:\WINDOWS\system32> $PSVersionTable.PSVersionMajor Minor Build Revision ----- ----- ----- -------- 5 1 …

百万架构师第四十课:RabbitMq:RabbitMq-工作模型与JAVA编程|JavaGuide

来源:https://javaguide.net RabbitMQ 1-工作模型与Java编程 课前准备 预习资料 Windows安装步骤 Linux安装步骤 官网文章中文翻译系列 环境说明 操作系统:CentOS 7 JDK:1.8 Erlang:19.0.4或最新版 RabbitMQ:3.6.12或最新版 版本对应关系 典型应用场景跨系统的异步通信。人…

1月16日java假期学习读书笔记

一、学习目标 掌握HTML的基本结构和常用标签。 了解CSS的基本选择器和样式规则。 通过实际代码练习,构建一个简单的网页。 二、学习内容 (一)HTML基础 HTML简介 HTML(HyperText Markup Language,超文本标记语言)是用于构建网页的标准标记语言。 它通过一系列的标签(如、…

MapStruct使用指南并结合Lombok

MapStruct使用指南并结合Lombokhttps://juejin.cn/post/6956190395319451679#heading-1 2024-01-11 18:34:06如何结合 lombok 也就说说如果代码中使用了 lombok 注解来生成代码,mapstruct 的 getter/setter 方法也使用了 lombok 的 api,那就需要额外的配置,因为这两个工具都是使…

史上最全桌面级CPU天梯图-2024年10月更新(包含13/14代Intel/7000系列锐龙)

史上最全桌面级CPU天梯图-2024年10月更新(包含13/14代Intel/7000系列锐龙) 原文:https://www.zhihu.com/tardis/bd/art/499783467?source_id=1001

large_bin_attack

large_bin的结构如下 /*This struct declaration is misleading (but accurate and necessary).It declares a "view" into memory allowing access to necessaryfields at known offsets from a given base. See explanation below. */ struct malloc_chunk {INTERN…

体验用ai做了个python小游戏

写在前面:最近ai确实比较火。各种生成式AI,包括文字、图片、视频。之前听说ai生产代码能力比较强,一直想试试。所以及就有了本问。使用的工具deepinseek :用来生成python代码即梦:用来生成图片素材Remove.bg:用来对生成的图片素材去除背景pixabay.com:用来下载音乐素材游…

2.1.5 节省内存

首先来介绍一下可变对象和不可变对象可变对象:整数,浮点数,字符串,元组等 不可变对象:列表,字典,集合等然后看一下Python中内存分配的方式 执行x=1会发生什么?此时,内存会分配一个地址给1,1是一个整型对象,而x是一个引用(不是对象!),指向1所在的位置,并不占用实…

ABC392E翻译

AT_abc392_e [ABC392E] Cables and Servers 题目描述 有编号从 \(1\) 到 \(N\) 的 \(N\) 台服务器和编号从 \(1\) 到 \(M\) 的 \(M\) 根电缆。 电缆 \(i\) 双向连接服务器 \(A_i\) 和服务器 \(B_i\)。 通过进行以下操作(可以是 \(0\) 次),使得所有服务器之间都能通过电缆相互…