第七部分:Java 并发包中并发队列解析
7.1)ConcurrentLinkedQueue 原理探究
7.1.1)类图结构
ConcurrentLinkedQueue 底层通过单向链表的方式实现,其中有两个 volatile 类型的 Node 节点用来表示队列的首、尾节点。
public ConcurrentLinkedQueue() {head = tail = new Node<E>(null);}
在默认的构造方法中,首和尾指向值为 null 的哨兵节点。新元素会被插入到队列的末尾,出队从队列对头获取第一个元素。
在 Node 节点中,维护着一个使用 volatile 变量修饰的 item 属性来存放节点的值;next 属性存储下一个节点的指针。
private static class Node<E> {volatile E item;volatile Node<E> next;}
其内部使用 UnSafe 提供的 CAS 方法来保证出队和入队操作的原子性。
// 通过 cas 操作设置节点的 itemboolean casItem(E cmp, E val) {return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);}// 通过 cas 操作设置 next 的值void lazySetNext(Node<E> val) {UNSAFE.putOrderedObject(this, nextOffset, val);}// 通过 cas 操作修改 nextboolean casNext(Node<E> cmp, Node<E> val) {return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);}
7.1.2) ConcurrentLinkedQueue 原理介绍
💡 offer 操作介绍
offer 操作是在队列的末尾添加一个元素,由于队列是无界队列,所以一定会返回
true
;当传入的值为 null 的时候,会抛出 NPE 异常。
这里先给出完整的代码和注释:
public boolean offer(E e) {// e 为空则抛出空指针异常checkNotNull(e);// 构造 node 节点final Node<E> newNode = new Node<E>(e);// 从尾节点开始插入for (Node<E> t = tail, p = t;;) {Node<E> q = p.next;// p 的下一个节点为空,说明 p 是最后一个节点if (q == null) {// 通过 CAS 操作设置 p 节点的 nextif (p.casNext(null, newNode)) {// 每插入两个节点的时候会执行这个方法if (p != t) casTail(t, newNode); // Failure is OK.return true;}}// 当 p 节点(指向的是尾部节点,发生自旋的的时候)else if (p == q)// 多线程操作,由于 poll 操作移除元素之后可能会引发链表自旋// 通过这里找到新的 head// 下面这段代码的逻辑是这样的:// 如果 tail 在操作之前被改变了,就将其变为新的 tail// 反之则将其赋值为 head 节点p = (t != (t = tail)) ? t : head;else// 重新寻找尾节点p = (p != t && t != (t = tail)) ? t : q;}}
在进入方法时,首先检测传入的参数是否为 null
,如果是则直接抛出 NPE 异常;检测完节点后就构造一个新的节点 newNode
。
然后构造了一个无限循环
for (Node<E> t = tail, p = t;;)
这个循环唯一的出口是通过 CAS 操作成功插入节点后,这也就解释了上面说的这个方法一定会返回 true,下面来具体看这三个 if
分支分别处理的是什么情况:
第一个 if
分支
当 p.next
为 null 的时候,p 在循环的开始被赋值为了 tail 也就是尾节点,通过这个条件就确定了 p 此时为尾节点;但是多线程情况下会出现改变 tail 的情况,前面提到过 tail 是一个被 volatile
关键字修饰的变量,线程对于它的修改对于其他线程是可见的,每次获取 tail 都是获取最新的值。
然后尝试通过 CAS 操作来设置尾节点的 next
,每插入两个节点会重置一次 tail 的值,但是这里对于 tail 的修改没有要求一定要成功,也就是在多线程的环境下不一定保证 tail 就是真正的尾节点,但是在各种方法中都有充分的安全措施来弥补这个问题,继续看下去就能理解。
最后返回一个 true。
第二个 if
分支
在多线程下执行 poll 出队列的操作的时候,有可能会出现链表自旋的状况,也就是这个分支中出现的 p.next = p
情况;此时需要去重新设置头节点;但多线程情况下仍然很多线程同时操作这个链表,所以在修改之前先去判断链表的 tail 节点是否被重新设置,如果被重新设置了则将 p 赋值为这个新的节点。
p = (t != (t = tail)) ? t : head;
t != (t = tail)
的执行顺序是这样的:
- 首先,
t
的当前值被拿来与新的tail
进行比较。 - 接着,赋值操作
t = tail
执行。这会将t
更新为新的tail
的值。 - 最后,将更新后的
t
的值与原来的t
进行比较。
第三个 if
分支
当链表状态正常且没有插入成功的情况下执行这个分支:
p = (p != t && t != (t = tail)) ? t : q;
在这里会重新去寻找尾节点,当 tail 节点在执行操作的时候被修改了则将 p 赋值为这个新的 tail,反之则将 p 赋值为 q,也就是 p.next
。
最后,让我们来分析一下上面的代码是如何确保线程安全性的:
- 首先,链表的插入是用来原子操作 CAS 来执行,它是原子性的,多个线程同时尝试进行设置时,只有一个线程会成功。
- 对于多线程下可能导致的自旋异常,采用了重新找新的 head 节点来解决。
- 最后,在发现尾节点
t
在操作之前已经被改变时,会将当前节点p
更新为新的尾节点 **t
,**确保了在多线程环境下链表的正确性。
因为上面的方法比较复杂,这里采用画图的形式来模拟两个节点插入的情况:
刚初始化的时候,head 和 next 都指向 item 为 null 的哨兵节点,此时我们执行一个插入的操作
// 从尾节点开始插入for (Node<E> t = tail, p = t;;) {Node<E> q = p.next;// p 的下一个节点为空,说明 p 是最后一个节点if (q == null) {// 通过 CAS 操作设置 p 节点的 nextif (p.casNext(null, newNode)) {// 每插入两个节点的时候会执行这个方法if (p != t) casTail(t, newNode); // Failure is OK.return true;}}
此时队列中没有任何元素,所以此时指向的 q == null 是成立的,此时执行插入操作设置下一个节点为新创建的节点
此时去判断 p 是否等于 t,此时发现是相等的,所以直接返回 true
然后我们来插入第二个节点,此时指针的指向和上面相同,但是此时发现 q 并不是 null,此时执行的就是这个 if 分支:
else// 重新寻找尾节点p = (p != t && t != (t = tail)) ? t : q;
此时 p 会指向 q,在进行下一次循环时,指针的指向是这样的:
此时执行完和第一次一样的插入逻辑之后,t 就不等于 p,此时就更新 tail 为新的节点,所以每当插入两个节点之后会更新一次 tail。
💡 add 操作
在链表的末尾添加一个元素,底层仍然是调用的 offer()
方法:
public boolean add(E e) {return offer(e);}
💡 poll 操作
这个方法的作用是在队列的头部获取并移除一个元素,如果队列为空则会返回 null。
这里给出完整的代码和注释:
public E poll() {// goto 标记restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {// 存储当前循环中节点的值E item = p.item;// 当前节点有值,且通过 CAS 将其变为 nullif (item != null && p.casItem(item, null)) {if (p != h) // 两次 poll 会更新一次头节点updateHead(h, ((q = p.next) != null) ? q : p);return item;} // 队列为空则返回 nullelse if ((q = p.next) == null) {updateHead(h, p);return null;}// 如果当前节点被自引用了,重新寻找头节点else if (p == q)continue restartFromHead;elsep = q;}}}final void updateHead(Node<E> h, Node<E> p) {if (h != p && casHead(h, p))h.lazySetNext(h);}
在本方法中我们要注意 updateHead()
方法的使用时机,这个方法中会更新头节点 head 为传入的新节点,并且使原来的头节点 自旋。第一个 if 分支中的 unpdateHead 方法和上面的 offer 方法相同,也是在两次 poll 才会更新一次 head 节点;但与 offer 方法不同的是线·
方法最前面的是一个 goto 语句,使用 goto 语句可以跳到指定的位置,**goto
**语句在过去的编程语言中曾经被广泛使用,但是它往往导致代码的可读性和可维护性变差,所以在编写代码的时候不建议使用,只需要看懂即可。
这里先来模拟一个线程弹出两个节点的情况
此时将 p 指向的节点的 item 修改为 null,并且返回 p 节点的值
执行第二次弹出的时候,发现 p 指向的 item 为 null,且队列不为空、没有出现自引用的情况,所以此时后移 p 到 next 的位置,此时正常执行前面的逻辑弹出节点,并且更新 head 的值。
最终会达到上图的效果,这是单个线程在正常情况下得到的结果,下面来模拟多个线程同时操控这个队列导致的其他情况。
如果一个线程执行下面这一步,也就是第一个 if 的时候,其他线程获取并且弹出了节点 1(将节点 2的 item 设置为了 null),假设这个线程此时因为某些原因(比如被中断)没有执行 updateHead 方法,那此时的结果如右图,此时其他线程遍历到第二个 null 节点的时候会将头节点放到其正确的位置上,也就是执行 updateHead 并且返回 null。
而当在执行 poll 操作的时候其他线程执行了 updateHead 方法可能会使得此节点指向的节点变为自旋节点,此时需要重新寻找 head 节点:
// 如果当前节点被自引用了,重新寻找头节点else if (p == q)continue restartFromHead;
上面的方法保证了 head 每两次弹出就会更新一次(即使在多线程的情况下),同时采用节点自旋的方法防止节点被重复的获取。
💡 peek 操作
peek 操作是在不移除元素的情况下获取队列头部的一个元素,如果队列为空则返回 null。
public E peek() {restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {E item = p.item;// 可以获取到元素或者这是队列中最后一个位置(可以为 null)if (item != null || (q = p.next) == null) {// 更新头节点updateHead(h, p);return item;}else if (p == q)continue restartFromHead;elsep = q;}}}
看完上面的 poll 方法,本方法还是比较好懂的
方法返回的条件为 当前节点的 item 有值 或者 当前节点为队列中最后一个节点,此时更新头节点的值并且返回 item。
如果出现 p == q 也就是自旋的时候,重新寻找头节点。
如果没有遍历到队列中有值的节点,且还有后续的节点就后移 p 指针继续寻找。
💡 size 操作
通过 size 方法计算队列中的元素个数,这个方法在并发环境下并不是很有用,因为 CAS 没有加锁,所以调用 size 函数的期间可能增加或删除元素,导致统计结果的不准确。
public int size() {int count = 0;// 通过 first 方法来获取队列中的第一个元素,排除哨兵节点// succ 方法获取当前节点的 next 元素,如果自旋的话就返回头节点for (Node<E> p = first(); p != null; p = succ(p))if (p.item != null)if (++count == Integer.MAX_VALUE)break;return count;}final Node<E> succ(Node<E> p) {Node<E> next = p.next;return (p == next) ? head : next;}
方法中首先调用 first 方法排除哨兵节点,获取真正的第一个节点,然后不断后移节点去计算 count 的值;此方法只能统计到 Integer.MAX_VALUE 即使队列是个无界队列;当统计完成后返回 count。
size 方法的实现逻辑比较简单,来看一下里面调用的 first 方法
Node<E> first() {restartFromHead:for (;;) {for (Node<E> h = head, p = h, q;;) {boolean hasItem = (p.item != null);if (hasItem || (q = p.next) == null) {updateHead(h, p);return hasItem ? p : null;}else if (p == q)continue restartFromHead;elsep = q;}}}
方法中会返回第一个不为 null 的节点,如果没有就返回 null。
到这里可以我们已经见到了 updateHead 的所有使用位置,来总结一下它的使用时机
- 当执行 poll 方法的时候调用了两次
- 第一次是头节点没有指向此时弹出节点的时候
- 第二次是发现队列为空的时候
- 在执行 peek 操作的时候调用了一次
- 每次获取元素或者发现队列为空的时候会更新头节点
- 最后就是在上面的 first 方法中调用了一次
- 当找到第一个 item 不为 null 的节点或者发现队列为空的时候会执行一次
通过上面的方法,在多线程的情况下,也能保证 head 时刻在执行读操作的时候处于正确的位置。
💡 remove 操作
删除队列中的元素,如果存在多个则删除第一个,并且返回 true,没有找到则返回 false。
public boolean remove(Object o) {// o 为空直接返回 falseif (o != null) {Node<E> next, pred = null;for (Node<E> p = first(); p != null; pred = p, p = next) {boolean removed = false;E item = p.item;if (item != null) {// 如果相等则使用 CAS 设置为 null// 多线程情况下只有一个线程会成功,其他线程会继续查找// 是否有匹配的其他元素if (!o.equals(item)) {next = succ(p); // 获取 next 元素continue;}removed = p.casItem(item, null);}next = succ(p);// 如果有后续的节点的话,前驱节点链接// p 节点指向的位置会因为无法到达而被销毁if (pred != null && next != null) // unlinkpred.casNext(p, next);if (removed)return true;}}return false;}
💡 contains 操作
这个方法回去判断队列中是否有指定的对象,由于和 size 方法一样是遍历整个队列,所以结果不是那么精确,比如调用的时候元素在队列中,但是在遍历途中元素被删除,会返回 false。
public boolean contains(Object o) {if (o == null) return false;for (Node<E> p = first(); p != null; p = succ(p)) {E item = p.item;if (item != null && o.equals(item))return true;}return false;}
方法的执行逻辑和 size 相同,这里不过多赘述。
7.1.3)总结
ConcurrentLinkedQueue 底层使用单线填表数据结构来保存队列元素,每个元素被封装成一个 Node 节点;队列通过 head 和 tail 来维护的,创建队列的时候头尾节点会指向哨兵节点,第一次执行 peek 或者 first 的时候才会把 head 指向第一个真正的队列元素。
由于没有加锁,所以 size 或者 contains 会导致结果不准确。
出队和入队的操作都是操作 tail 和 head 节点,保证在多线程的情况下的线程安全,只需要保证操作的可见性和原子性即可,由于两个属性都是 volatile 修饰的,保证了可见性,同时方法中使用 CAS 保证了原子性。