初探 JUC 并发编程:Java 中的并发队列 ConcurrentLinkedQueue 源码级解析

第七部分:Java 并发包中并发队列解析

7.1)ConcurrentLinkedQueue 原理探究

7.1.1)类图结构

ConcurrentLinkedQueue 底层通过单向链表的方式实现,其中有两个 volatile 类型的 Node 节点用来表示队列的首、尾节点。

    public ConcurrentLinkedQueue() {head = tail = new Node<E>(null);}

在默认的构造方法中,首和尾指向值为 null 的哨兵节点。新元素会被插入到队列的末尾,出队从队列对头获取第一个元素。

在 Node 节点中,维护着一个使用 volatile 变量修饰的 item 属性来存放节点的值;next 属性存储下一个节点的指针。

    private static class Node<E> {volatile E item;volatile Node<E> next;}

其内部使用 UnSafe 提供的 CAS 方法来保证出队和入队操作的原子性。

        // 通过 cas 操作设置节点的 itemboolean casItem(E cmp, E val) {return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);}// 通过 cas 操作设置 next 的值void lazySetNext(Node<E> val) {UNSAFE.putOrderedObject(this, nextOffset, val);}// 通过 cas 操作修改 nextboolean casNext(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);}

7.1.2) ConcurrentLinkedQueue 原理介绍

💡 offer 操作介绍

offer 操作是在队列的末尾添加一个元素,由于队列是无界队列,所以一定会返回 true ;当传入的值为 null 的时候,会抛出 NPE 异常。

这里先给出完整的代码和注释:

    public boolean offer(E e) {// e 为空则抛出空指针异常checkNotNull(e);// 构造 node 节点final Node<E> newNode = new Node<E>(e);// 从尾节点开始插入for (Node<E> t = tail, p = t;;) {Node<E> q = p.next;// p 的下一个节点为空,说明 p 是最后一个节点if (q == null) {// 通过 CAS 操作设置 p 节点的 nextif (p.casNext(null, newNode)) {// 每插入两个节点的时候会执行这个方法if (p != t) casTail(t, newNode);  // Failure is OK.return true;}}// 当 p 节点(指向的是尾部节点,发生自旋的的时候)else if (p == q)// 多线程操作,由于 poll 操作移除元素之后可能会引发链表自旋// 通过这里找到新的 head// 下面这段代码的逻辑是这样的:// 如果 tail 在操作之前被改变了,就将其变为新的 tail// 反之则将其赋值为 head 节点p = (t != (t = tail)) ? t : head;else// 重新寻找尾节点p = (p != t && t != (t = tail)) ? t : q;}}

在进入方法时,首先检测传入的参数是否为 null ,如果是则直接抛出 NPE 异常;检测完节点后就构造一个新的节点 newNode

然后构造了一个无限循环

for (Node<E> t = tail, p = t;;)

这个循环唯一的出口是通过 CAS 操作成功插入节点后,这也就解释了上面说的这个方法一定会返回 true,下面来具体看这三个 if 分支分别处理的是什么情况:

第一个 if 分支

p.next 为 null 的时候,p 在循环的开始被赋值为了 tail 也就是尾节点,通过这个条件就确定了 p 此时为尾节点;但是多线程情况下会出现改变 tail 的情况,前面提到过 tail 是一个被 volatile 关键字修饰的变量,线程对于它的修改对于其他线程是可见的,每次获取 tail 都是获取最新的值。

然后尝试通过 CAS 操作来设置尾节点的 next ,每插入两个节点会重置一次 tail 的值,但是这里对于 tail 的修改没有要求一定要成功,也就是在多线程的环境下不一定保证 tail 就是真正的尾节点,但是在各种方法中都有充分的安全措施来弥补这个问题,继续看下去就能理解。

最后返回一个 true。


第二个 if 分支

在多线程下执行 poll 出队列的操作的时候,有可能会出现链表自旋的状况,也就是这个分支中出现的 p.next = p 情况;此时需要去重新设置头节点;但多线程情况下仍然很多线程同时操作这个链表,所以在修改之前先去判断链表的 tail 节点是否被重新设置,如果被重新设置了则将 p 赋值为这个新的节点。

p = (t != (t = tail)) ? t : head;

t != (t = tail) 的执行顺序是这样的:

  1. 首先,t 的当前值被拿来与新的 tail 进行比较。
  2. 接着,赋值操作 t = tail 执行。这会将 t 更新为新的 tail 的值。
  3. 最后,将更新后的 t 的值与原来的 t 进行比较。

第三个 if 分支

当链表状态正常且没有插入成功的情况下执行这个分支:

p = (p != t && t != (t = tail)) ? t : q;

在这里会重新去寻找尾节点,当 tail 节点在执行操作的时候被修改了则将 p 赋值为这个新的 tail,反之则将 p 赋值为 q,也就是 p.next


最后,让我们来分析一下上面的代码是如何确保线程安全性的:

  • 首先,链表的插入是用来原子操作 CAS 来执行,它是原子性的,多个线程同时尝试进行设置时,只有一个线程会成功。
  • 对于多线程下可能导致的自旋异常,采用了重新找新的 head 节点来解决。
  • 最后,在发现尾节点 t 在操作之前已经被改变时,会将当前节点 p 更新为新的尾节点 **t ,**确保了在多线程环境下链表的正确性。

因为上面的方法比较复杂,这里采用画图的形式来模拟两个节点插入的情况:

刚初始化的时候,head 和 next 都指向 item 为 null 的哨兵节点,此时我们执行一个插入的操作

				// 从尾节点开始插入for (Node<E> t = tail, p = t;;) {Node<E> q = p.next;// p 的下一个节点为空,说明 p 是最后一个节点if (q == null) {// 通过 CAS 操作设置 p 节点的 nextif (p.casNext(null, newNode)) {// 每插入两个节点的时候会执行这个方法if (p != t) casTail(t, newNode);  // Failure is OK.return true;}}

在这里插入图片描述

此时队列中没有任何元素,所以此时指向的 q == null 是成立的,此时执行插入操作设置下一个节点为新创建的节点

在这里插入图片描述

此时去判断 p 是否等于 t,此时发现是相等的,所以直接返回 true

然后我们来插入第二个节点,此时指针的指向和上面相同,但是此时发现 q 并不是 null,此时执行的就是这个 if 分支:

 						else// 重新寻找尾节点p = (p != t && t != (t = tail)) ? t : q;

此时 p 会指向 q,在进行下一次循环时,指针的指向是这样的:

在这里插入图片描述

此时执行完和第一次一样的插入逻辑之后,t 就不等于 p,此时就更新 tail 为新的节点,所以每当插入两个节点之后会更新一次 tail。

在这里插入图片描述

💡 add 操作

在链表的末尾添加一个元素,底层仍然是调用的 offer() 方法:

    public boolean add(E e) {return offer(e);}

💡 poll 操作

这个方法的作用是在队列的头部获取并移除一个元素,如果队列为空则会返回 null。

这里给出完整的代码和注释:

    public E poll() {// goto 标记restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {// 存储当前循环中节点的值E item = p.item;// 当前节点有值,且通过 CAS 将其变为 nullif (item != null && p.casItem(item, null)) {if (p != h) // 两次 poll 会更新一次头节点updateHead(h, ((q = p.next) != null) ? q : p);return item;}              // 队列为空则返回 nullelse if ((q = p.next) == null) {updateHead(h, p);return null;}// 如果当前节点被自引用了,重新寻找头节点else if (p == q)continue restartFromHead;elsep = q;}}}final void updateHead(Node<E> h, Node<E> p) {if (h != p && casHead(h, p))h.lazySetNext(h);}

在本方法中我们要注意 updateHead() 方法的使用时机,这个方法中会更新头节点 head 为传入的新节点,并且使原来的头节点 自旋。第一个 if 分支中的 unpdateHead 方法和上面的 offer 方法相同,也是在两次 poll 才会更新一次 head 节点;但与 offer 方法不同的是线·

方法最前面的是一个 goto 语句,使用 goto 语句可以跳到指定的位置,**goto**语句在过去的编程语言中曾经被广泛使用,但是它往往导致代码的可读性和可维护性变差,所以在编写代码的时候不建议使用,只需要看懂即可。

这里先来模拟一个线程弹出两个节点的情况

此时将 p 指向的节点的 item 修改为 null,并且返回 p 节点的值

在这里插入图片描述

执行第二次弹出的时候,发现 p 指向的 item 为 null,且队列不为空、没有出现自引用的情况,所以此时后移 p 到 next 的位置,此时正常执行前面的逻辑弹出节点,并且更新 head 的值。

在这里插入图片描述

最终会达到上图的效果,这是单个线程在正常情况下得到的结果,下面来模拟多个线程同时操控这个队列导致的其他情况。

如果一个线程执行下面这一步,也就是第一个 if 的时候,其他线程获取并且弹出了节点 1(将节点 2的 item 设置为了 null),假设这个线程此时因为某些原因(比如被中断)没有执行 updateHead 方法,那此时的结果如右图,此时其他线程遍历到第二个 null 节点的时候会将头节点放到其正确的位置上,也就是执行 updateHead 并且返回 null。

在这里插入图片描述

而当在执行 poll 操作的时候其他线程执行了 updateHead 方法可能会使得此节点指向的节点变为自旋节点,此时需要重新寻找 head 节点:

                // 如果当前节点被自引用了,重新寻找头节点else if (p == q)continue restartFromHead;

上面的方法保证了 head 每两次弹出就会更新一次(即使在多线程的情况下),同时采用节点自旋的方法防止节点被重复的获取。

💡 peek 操作

peek 操作是在不移除元素的情况下获取队列头部的一个元素,如果队列为空则返回 null。

    public E peek() {restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {E item = p.item;// 可以获取到元素或者这是队列中最后一个位置(可以为 null)if (item != null || (q = p.next) == null) {// 更新头节点updateHead(h, p);return item;}else if (p == q)continue restartFromHead;elsep = q;}}}

看完上面的 poll 方法,本方法还是比较好懂的

方法返回的条件为 当前节点的 item 有值 或者 当前节点为队列中最后一个节点,此时更新头节点的值并且返回 item。

如果出现 p == q 也就是自旋的时候,重新寻找头节点。

如果没有遍历到队列中有值的节点,且还有后续的节点就后移 p 指针继续寻找。

💡 size 操作

通过 size 方法计算队列中的元素个数,这个方法在并发环境下并不是很有用,因为 CAS 没有加锁,所以调用 size 函数的期间可能增加或删除元素,导致统计结果的不准确。

    public int size() {int count = 0;// 通过 first 方法来获取队列中的第一个元素,排除哨兵节点// succ 方法获取当前节点的 next 元素,如果自旋的话就返回头节点for (Node<E> p = first(); p != null; p = succ(p))if (p.item != null)if (++count == Integer.MAX_VALUE)break;return count;}final Node<E> succ(Node<E> p) {Node<E> next = p.next;return (p == next) ? head : next;}

方法中首先调用 first 方法排除哨兵节点,获取真正的第一个节点,然后不断后移节点去计算 count 的值;此方法只能统计到 Integer.MAX_VALUE 即使队列是个无界队列;当统计完成后返回 count。

size 方法的实现逻辑比较简单,来看一下里面调用的 first 方法

    Node<E> first() {restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {boolean hasItem = (p.item != null);if (hasItem || (q = p.next) == null) {updateHead(h, p);return hasItem ? p : null;}else if (p == q)continue restartFromHead;elsep = q;}}}

方法中会返回第一个不为 null 的节点,如果没有就返回 null。

到这里可以我们已经见到了 updateHead 的所有使用位置,来总结一下它的使用时机

  • 当执行 poll 方法的时候调用了两次
    • 第一次是头节点没有指向此时弹出节点的时候
    • 第二次是发现队列为空的时候
  • 在执行 peek 操作的时候调用了一次
    • 每次获取元素或者发现队列为空的时候会更新头节点
  • 最后就是在上面的 first 方法中调用了一次
    • 当找到第一个 item 不为 null 的节点或者发现队列为空的时候会执行一次

通过上面的方法,在多线程的情况下,也能保证 head 时刻在执行读操作的时候处于正确的位置。

💡 remove 操作

删除队列中的元素,如果存在多个则删除第一个,并且返回 true,没有找到则返回 false。

    public boolean remove(Object o) {// o 为空直接返回 falseif (o != null) {Node<E> next, pred = null;for (Node<E> p = first(); p != null; pred = p, p = next) {boolean removed = false;E item = p.item;if (item != null) {// 如果相等则使用 CAS 设置为 null// 多线程情况下只有一个线程会成功,其他线程会继续查找// 是否有匹配的其他元素if (!o.equals(item)) {next = succ(p); // 获取 next 元素continue;}removed = p.casItem(item, null);}next = succ(p);// 如果有后续的节点的话,前驱节点链接// p 节点指向的位置会因为无法到达而被销毁if (pred != null && next != null) // unlinkpred.casNext(p, next);if (removed)return true;}}return false;}

💡 contains 操作

这个方法回去判断队列中是否有指定的对象,由于和 size 方法一样是遍历整个队列,所以结果不是那么精确,比如调用的时候元素在队列中,但是在遍历途中元素被删除,会返回 false。

    public boolean contains(Object o) {if (o == null) return false;for (Node<E> p = first(); p != null; p = succ(p)) {E item = p.item;if (item != null && o.equals(item))return true;}return false;}

方法的执行逻辑和 size 相同,这里不过多赘述。

7.1.3)总结

ConcurrentLinkedQueue 底层使用单线填表数据结构来保存队列元素,每个元素被封装成一个 Node 节点;队列通过 head 和 tail 来维护的,创建队列的时候头尾节点会指向哨兵节点,第一次执行 peek 或者 first 的时候才会把 head 指向第一个真正的队列元素。

由于没有加锁,所以 size 或者 contains 会导致结果不准确。

出队和入队的操作都是操作 tail 和 head 节点,保证在多线程的情况下的线程安全,只需要保证操作的可见性和原子性即可,由于两个属性都是 volatile 修饰的,保证了可见性,同时方法中使用 CAS 保证了原子性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/700824.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【平衡二叉树】AVL树(双旋)

&#x1f389;博主首页&#xff1a; 有趣的中国人 &#x1f389;专栏首页&#xff1a; C进阶 &#x1f389;其它专栏&#xff1a; C初阶 | Linux | 初阶数据结构 小伙伴们大家好&#xff0c;本片文章将会讲解AVL树的左双选和右双旋的相关内容。 如果看到最后您觉得这篇文章写…

HTML常用标签-表格标签

表格标签 1 常规表格2 单元格跨行3 单元格跨行 1 常规表格 table标签 代表表格 thead标签 代表表头 可以省略不写 tbody标签 代表表体 可以省略不写 tfoot标签 代表表尾 可以省略不写 tr标签 代表一行 td标签 代表行内的一格 th标签 自带加粗和居中效果的td 代码 <h…

操作系统-页式存储(淘汰页面问题)

进程P有5个页面&#xff0c;页号为0~4&#xff0c;页面变换表及状态位、访问位和修改位的含义如下图所示。若系统给进程P分配了3个存储块&#xff0c;当访问的页面3不在内存时&#xff0c;应该淘汰表中页号为&#xff08;&#xff09;的页面。 解答&#xff1a; 被淘汰的页面首…

网上有哪些赚钱的方法能一天赚二三十?盘点7个靠谱的搞钱副业和赚钱软件

想在家里躺着就能把钱赚&#xff1f;这不再是遥不可及的梦想&#xff01;随着互联网的飞速发展&#xff0c;网上赚钱的方式层出不穷&#xff0c;总有一款适合你。 今天&#xff0c;就让我们一起揭开这些神秘面纱&#xff0c;看看哪些网上赚钱秘诀能让你轻松实现月入过万&#x…

Python 操作数据库

十、Python3 操作数据库 1、Python3 操作 MySQL 1、基本介绍 Python3 操作 MySQL 数据库 可以使用的模块是 pymysql 和 MySQLdb。 这个两个模块都是通过自己的 API 执行原生的 SQL 语句实现的。 MySQLdb 是最早出现的一个操作 MySQL 数据库的模块&#xff0c;核心由C语言编…

LLM Agent智能体综述(万字长文)

前言 &#x1f3c6;&#x1f3c6;&#x1f3c6;在上一篇文章中&#xff0c;我们介绍了如何部署MetaGPT到本地&#xff0c;获取OpenAI API Key并配置其开发环境&#xff0c;并通过一个开发小组的多Agent案例感受了智能体的强大&#xff0c;在本文中&#xff0c;我们将对AI Agent…

Linux的命名管道 共享内存

目录 命名管道 mkfifo函数 unlink函数 命名管道类 服务端 客户端 共享内存 shmget函数 ftok函数 key和shmid的区别 snprintf函数 ipcs指令 ipcrm指令 shmctl函数 shmat函数 void*做返回值 创建共享内存空间 服务端 客户端 命名管道 基本概念&#xff1…

【Shell脚本】Shell编程之数组

目录 一.数组 1.基本概念 2.定义数组的方法 2.1.方法一 2.2.方法二 2.3.方法三 2.4.方法四 2.5.查看数组长度 2.6.查看数组元素下标 3.数组分片 4.数组字符替换 4.1.临时替换 4.2.永久替换 5.数组删除 5.1.删除某个下标 5.2.删除整组 6.数组遍历和重新定义 7…

【js】获取媒体流实现拍照功能,摄像头切换

<script setup>import {onMounted,reactive,ref} from vueconst videoConstraints reactive({width: 500,height: 300});let picArr reactive([])let videoNode ref(null)let show ref(true)let stream reactive({})onMounted(async () > {// 获取视频流&#xf…

Java面试题:ConcurrentHashMap

ConcurrentHashMap 一种线程安全的高效Map集合 jdk1.7之前 底层采用分段的数组链表实现 一个不可扩容的数组:segment[] 数组中的每个元素都对应一个HashEntry数组用以存放数据 当放入数据时,根据key的哈希值找到对应的segment数组下标 找到下标后就会添加一个reentrantlo…

自动化测试框架怎么选?Robot Framework怎么搭建环境?

本系列文章跟大家分享的内容是Robot Framework从入门到实践的整个过程&#xff0c;首先会简单为大家介绍一下自动化测试框架&#xff0c;包括框架选择、环境搭建、接口自动化等&#xff0c;最后会带大家实际操作一遍&#xff0c;本文我们主要为大家介绍自动化测试框架的不同以及…

c++ 各版本特性介绍

c C是一种高级编程语言&#xff0c;以其强大的功能、灵活性和高效性而闻名。它是由Bjarne Stroustrup在20世纪80年代初期在贝尔实验室开发的&#xff0c;作为C语言的一个扩展。C不仅包含了C语言的所有特性&#xff0c;还引入了面向对象编程&#xff08;OOP&#xff09;的概念&…