图解java.util.concurrent并发包源码系列——深入理解AQS,看完可以吊打面试官
- AQS是什么?有什么作用?
- AQS的原理
- 自定义资源
- 资源的获取与释放
- 线程阻塞等待唤醒
- AQS源码
- 核心成员变量
- Node 的内部结构
- waitStatus
- prev、next、thread
- nextWaiter
- predecessor()
- 操作state的方法
- acquire
- addWaiter
- enq
- acquireQueued
- shouldParkAfterFailedAcquire
- parkAndCheckInterrupt
- release
- unparkSuccessor
- 总结
- 用AQS实现一个排他锁
往期文章:
- 人人都能看懂的图解java.util.concurrent并发包源码系列 ThreadPoolExecutor线程池
- 图解java.util.concurrent并发包源码系列,原子类、CAS、AtomicLong、AtomicStampedReference一套带走
- 图解java.util.concurrent并发包源码系列——LongAdder
AQS是什么?有什么作用?
假设现在我们要开发一套并发工具,这一套工具里面可能有可重入排他锁、读写锁、共享锁等等,还包括其他的一些工具,那么大家想想这些工具类里面有什么共同逻辑?
首先这些工具类都有自己自定义的资源,比如排他锁,我给他的资源定义为一个变量1,每个线程来抢锁就看谁把这个1扣减掉,扣成功的我就把这个线程视为成功获取锁,那么这个变量1就是这个排他锁的资源类型。
然后这些工具类都有获取资源、释放资源的方法。
最后,这些工具类都需要实现获取不到资源的线程排队阻塞等待唤醒的逻辑。
以上的这些逻辑,在每个工具类里面都是大同小异的,那为什么不把它们抽成一个公共逻辑呢?比如抽成一个抽象类,每个工具类继承这个抽象类,就可以获得这些功能。
JDK的java.util.concurrent包就是这么干的,java.util.concurrent包里面有很多并发工具类,这些工具类都要实现资源获取与释放、阻塞线程的排队等待唤醒,于是java.util.concurrent里面就把这些公共的逻辑抽到AQS里面。
AQS的全称是AbstractQueuedSynchronizer抽象队列同步器。
- 抽象:它是一个抽象类,需要被子类继承才能发挥作用。
- 队列:它有一个队列,这个队列会用来存储被阻塞的线程。
- 同步器:它是一个同步器,提供线程间同步的功能。
因此很明显,AQS里面通过一个队列取存储那些获取不到资源而被阻塞的线程,那些线程将会在队列中排队等待被唤醒,唤醒后的线程会再次重试获取资源。
AQS的原理
我们再梳理一下AQS需要提供哪些能力:
- 自定义资源
- 资源的获取与释放
- 线程阻塞等待唤醒
自定义资源
AQS提供自定义资源的功能,AQS里面有一个int类型的state变量,我们自己的实现类继承AQS后,就拥有了这个state变量,我们可以自己定义这个state变量不同的变量值对应的语义。比如我实现了一个排他锁,那么我就定义当state等于1时,代表此时排他锁是处于没被获取的状态,线程可以过来获取排他锁,而当state小于等于0时,代表已经有线程获取了排他锁,其他线程必须阻塞等待。
另外,AQS提供了state变量的set、get方法,以及CAS修改state的方法,我们可以直接使用这些方法来操作state变量。
资源的获取与释放
对于资源获取与释放的抽象,AQS 定义了一些获取与释放资源的模板方法(acquire、release…),然后模板方法会调用真正获取资源与释放资源的抽象方法(tryAcquire、tryRelease…),我们只需要实现这些抽象方法即可。就拿AQS定义的获取资源的方法来举例,AQS定义的获取资源的模板方法,会调用我们实现的真正获取资源的方法,而我们实现的真正获取资源的方法,就可以调用AQS提供的操作state变量的方法来获取资源,然后AQS需要我们返回一个boolean类型的结果值,true表示当前线程获取资源成功,false表示当前线程获取锁失败,获取锁失败的线程,AQS会自动把它放入队列中排队并挂起,等待别的线程释放资源后唤醒。
线程阻塞等待唤醒
上面已经说到,在 AQS 定义的获取资源的抽象方法里面,会把获取不到资源的线程放到队列中排队等待,并将它挂起,等待释放资源的线程去唤醒。
同样,在 AQS 定义的释放资源的抽象方法里面,会判断资源是否释放成功,如果释放成功,那么会唤醒队列中的队头线程,让它取获取资源。
然后AQS里面有一个队列,存储着这些获取不到资源被挂起的线程。
这个队列是一个双向队列,有一个头指针和尾指针分表别引用队列中的头节点和尾结点。入队时会从队尾入队,出队时从队头出队。
AQS源码
看完原理,我们进入AQS的源码,看看里面的具体实现。
核心成员变量
首先是AQS里面的三个核心成员变量:
// 头指针private transient volatile Node head;// 尾指针private transient volatile Node tail;// 资源变量private volatile int state;
int state 就是上面说的代表资源的 state变量,我们可以自己定义这个state变量不同的值对应的语义。
Node head 和 Node tail 分表对应 AQS 里面队列的头指针和尾指针。被阻塞的线程会被包装成 Node 对象然后放入队列的尾部。而所谓的队列其实就是 Node 对象通过前驱指针 prev 和后继指针 next 串联起来,是一个虚的队列(CLH队列),并没有真正的队列实体。
Node 的内部结构
接下来看看 Node 的内部结构。
static final class Node {// nextWaiter变量的值static final Node SHARED = new Node();static final Node EXCLUSIVE = null;// waitStatus变量的值static final int CANCELLED = 1;static final int SIGNAL = -1;static final int CONDITION = -2;static final int PROPAGATE = -3;/*** waitStatus的值:* SIGNAL: 等待前驱节点唤醒 * CANCELLED: 已取消* CONDITION: 被放入了条件队列* PROPAGATE: 这个状态的节点被唤醒后,会继续唤醒后面的节点,一直传播下去(前提是可以获取到资源,否则也会阻塞)* 0: 初始状态*/volatile int waitStatus;/*** 前驱指针*/volatile Node prev;/*** 后继指针*/volatile Node next;/*** 阻塞的线程*/volatile Thread thread;/*** 当Node节点处于普通队列时nextWaiter的值为SHARED和EXCLUSIVE二者之一,* 共享模式下为SHARED,独占模式下为EXCLUSIVE。* 当Node节点处于Condition队列时,nextWaiter用于指向Condition队列中的下一个节点*/Node nextWaiter;/*** 返回true表示当前节点处于共享模式*/final boolean isShared() {return nextWaiter == SHARED;}/*** 返回当前节点的前驱节点*/final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node() {}Node(Thread thread, Node mode) {this.nextWaiter = mode;this.thread = thread;}Node(Thread thread, int waitStatus) {this.waitStatus = waitStatus;this.thread = thread;}}
waitStatus
Node 类是 AQS 的内部类,首先 Node 类里面有一个 int waitStatus 变量,表示当前 Node 节点的状态。初始状态为0; SIGNAL(-1) 表示后继节点线程需要被唤醒,那么当前节点释放资源后会去唤醒后继节点;CONDITION(-2) 表示当前节点被移入了条件队列(Condition队列),一般是调用了Condition的await方法(相当于是在获取Synchronized锁时调用Object的wait方法)时,当前线程就会是否资源,然后Node节点就会被移入条件队列并挂起; PROPAGATE(-3) 通常是在资源可共享的情况下,当前线程获取资源成功,会唤醒后继节点的线程也去获取资源; CANCELLED(1) 则是已取消状态,通常是当前线程未获取资源成功就被中断。
prev、next、thread
然后 Node prev、Node next、Thread thread 分表就是前驱节点指针,后继节点指针,和当前节点的线程。
nextWaiter
Node 类里面的最后一个属性是Node nextWaiter,这个属性会随着当前节点处于不同队列(普通队列和条件队列),会有不同的取值和作用。普通队列就是上面介绍的AQS里面的队列,而条件队列则是调用Condition的await方法后,节点被移入的队列。
当前节点处于普通队列时,有两种取值,分别是 Node SHARED 和 Node EXCLUSIVE。
如果nextWaiter取值为EXCLUSIVE,表示当前节点的线程希望以独占的方式获取state资源,当该线程成功获取资源后,其他线程只能阻塞等待该线程释放资源。
如果当前节点的nextWaiter取值为SHARED,表示当前节点的线程希望以共享的方式获取资源,当前节点的线程成功获取资源后,其他线程不用阻塞,只要资源充足,可以继续获取资源,比如共享锁就是这种模式。
当前节点处于条件队列,那么nextWaiter 的作用就是用于指向当前节点在条件队列中的后继节点。
以下是 nextWaiter 不同情况的取值。
predecessor()
predecessor() 方法用于获取当前节点的前驱节点。
以上就是Node里面所有的属性和方法的介绍。
操作state的方法
然后就是三个操作state属性的方法,分表是 getState() 获取state,setState(int newStatte) 设置state,compareAndSetState(int expect, int update) 以CAS的方式更新state。
/*** 获取state*/protected final int getState() {return state;}/*** 设置state*/protected final void setState(int newState) {state = newState;}/*** CAS更新state*/protected final boolean compareAndSetState(int expect, int update) {return unsafe.compareAndSwapInt(this, stateOffset, expect, update);}
可以看到这三个方法都是 protected 访问限定符修饰的,可以被继承AQS的子类调用。
acquire
然后来看一下获取资源的方法——acquire(int arg)。AQS定义了一个模板方法acquire(int arg),可以供继承它的子类直接调用。
/*** 获取资源的模板方法,供子类直接调用*/public final void acquire(int arg) {// tryAcquire(arg):调用子类实现的真正获取资源的方法// addWaiter(Node.EXCLUSIVE):如果tryAcquire(arg)获取资源不成功,则调用addWaiter(Node.EXCLUSIVE)把当前线程封装为Node节点进队列,EXCLUSIVE表示当前线程要以独占的方式获取资源,addWaiter(Node.EXCLUSIVE)方法会返回Node对象// acquireQueued(node, arg):节点进队列之后,设置前驱节点的waitStatus属性为SIGNAL以后,挂起等待唤醒,被唤醒后会重试获取资源。if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}/** 真正获取资源的操作,需要由子类去实现*/protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}
acquire(int arg) 方法是AQS里面非常重要的一个模板方法,它定义了以独占的模式去获取资源的模板操作。
acquire(int arg) 首先会调用 tryAcquire(arg) 尝试获取资源。tryAcquire(int arg)默认实现是抛出一个 UnsupportedOperationException 异常,我们需要继承AQS并重写tryAcquire(int arg)方法,定义我们真正获取资源的逻辑,我们可以使用上面三个操作state的方法。比如我们使用AQS实现了一个排他锁,那么获取资源的逻辑可以使用compareAndSetState方法把state扣减为0,扣减成功表示获取资源成功,那么我们的tryAcquire方法可以返回true,否则就返回false。
如果tryAcquire方法返回true,表示成功获取到资源,那么acquire方法也就结束了。如果tryAcquire方法返回false,那么等于告诉AQS当前线程获取资源不成功,那么AQS会进行下一步处理,也就下面一行代码 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)。
addWaiter(Node.EXCLUSIVE) 方法是把当前线程封装为一个Nodo节点,并把该Node节点放入AQS队列的尾部,参数 Node.EXCLUSIVE 表示当前线程以独占的方式获取资源(也就是如果当前线程成功获取资源了,其他线程都要阻塞等待)。
acquireQueued方法里面是一个自旋的逻辑。但是这个自旋不是一直不停的while循环尝试获取资源,如果当前节点存在前驱节点,并且设置前驱节点的waitStatus属性为SIGNAL以后,当前节点的线程就会挂起,等待前驱节点唤醒,前驱节点唤醒当前线程后,当前线程才会再次尝试获取资源。
addWaiter
addWaiter方法里面是节点入队列的逻辑,因为有可能同一时间可能会有多个线程同时调用addWaiter方法,所以这里需要考虑处理并发的情况。
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// 如果tail指针不为空,尝试通过CAS的方式修改tail指针的指向,CAS成功则入队列成功Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 如果tail指针为空,或者CAS失败,则调用enq方法作进一步处理enq(node);return node;}
首先会把当前线程封装为Node节点。
然后检查AQS队列的tail指针是否为空,如果不为空,则当前Node节点的prev前驱节点指针指向当前AQS队列的尾节点。然后尝试通过CAS的方式修改tail指针(尾节点指针)指向当前入队的Node节点。
如果CAS修改成功,则修改原来的尾节点的next指针,指向当前入队的节点,然后addWaiter方法返回当前入队的节点。
如果tail指针为空,或者CAS修改不成功,则调用enq方法,进一步处理。tail指针为空代表当前队列还未初始化。
下面是流程图:
enq
enq方法的内部,就是一个自旋不断尝试入队的逻辑,入队的方式还是通过CAS修改tail指针。但是因为队列未初始化时也会进这个方法,所有enq方法还有一个初始化队列的分支。
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) {// 如果tail指针为空,表示队列没有初始化// 尝试以CAS的方式初始化头指针,// 初始化成功,则修改tail指针指向与head指针指向同一个节点if (compareAndSetHead(new Node()))tail = head;} else {// tail指针不为空// 先修改node的prev前驱指针指向tail指针所指向的节点,// 然后以CAS的方式修改tail指针,// CAS修改成,则修改tail指针原先指向的节点的next指针指向node节点node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
首先如果tail指针为空,那么说明AQS的队列还没有初始化,那要先初始化AQS的队列。AQS队列的初始化就是new一个空的Node,然后用CAS的方式设置为头节点,如果设置为头节点成功,那么修tail指针也指向这个空的Node。如果CAS设置头节点失败,说明有另外的线程初始化成功了。
接下来会进入下一轮的for循环,尝试以CAS的方式设置tail指针指向当前node节点。
acquireQueued
节点入队成功后,就要进入acquireQueued方法的处理逻辑。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {// 获取当前节点的前驱节点final Node p = node.predecessor();// 如果前驱节点是头结点,则尝试获取资源一次if (p == head && tryAcquire(arg)) {// 获取资源成功,设置当前节点为头节点setHead(node);p.next = null;failed = false;return interrupted;}// 判断是否可以挂起当前线程,如果可以则挂起当前线程,等待前驱节点的线程唤醒if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
整个acquireQueued方法里面主要就是一个for循环进行自旋,自旋会直到当前线程获取到资源为止。每一轮循环,会判断前驱节点是否是头节点,如果前驱节点是头节点,那么当前线程会尝试获取资源一次,如果获取资源成功,则设置当前节点为头节点,然后退出循环;如果获取资源不成功,那么会判断是否可以挂起当前线程,满足挂起当前线程的条件的话,那么就会挂起当前线程,等待被前驱节点的线程唤醒,唤醒后会进入下一轮循环。
下面是流程图:
接下来就要看看如何判断当前线程是否能被挂起的,以及是如何挂起当前线程的。
shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** 前驱节点的waitStats已经被设置为SIGNAL了,当前线程可以挂起*/return true;if (ws > 0) {/** 前驱节点的waitStatus是CANCELLED(只有CANCELLED是大于0的),* 那么要往后寻找waitStatus不是CANCELLED的节点,* 并且中间所有的阶段都剔除出队列*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** 尝试以CAS的方式修改前驱节点的waitStatus为SIGNAL*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}// 当前线程还不能被挂起return false;}
- 如果前驱节点的waitStatus属性为SIGNAL,那么当前线程可以挂起
- 如果前端节点的waitStatus属性为CANCELLED,那么需要跳过后面所有的CANCELLED节点,把它们断连出队列
- 以上两条件不满足,那么尝试通过CAS的方式修改前驱节点的waitStatus属性为SIGNAL,如果修改成功,那么下一次进来就可以返回true,可以挂起当前线程了
下面是流程图:
parkAndCheckInterrupt
parkAndCheckInterrupt方法挂起当前线程,里面会调用LockSupport这个工具类的park方法挂起当前线程。
private final boolean parkAndCheckInterrupt() {// 调用LockSupport这个工具类的park方法挂起当前线程LockSupport.park(this);return Thread.interrupted();}
LockSupport也是java.util.concurrent并发包里面的一个工具类,可以用于挂起线程,里面是调用Unsafe提供的park方法。
public static void park(Object blocker) {Thread t = Thread.currentThread();setBlocker(t, blocker);// 利用Unsafe提供的park方法挂起当前线程UNSAFE.park(false, 0L);setBlocker(t, null);}
以上就是acquire方法的整个核心流程。
release
AQS内部还提供了一个释放资源的模板方法release,相对于用于获取资源的acquire方法,release方法则是提供给子类直接调用的用于释放资源的模板方法。
/*** 释放资源的模板方法,供子类直接调用*/public final boolean release(int arg) {// 调用子类实现的尝试释放资源的方法,该方法需要子类实现释放资源的逻辑if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)// 唤醒后继节点的线程unparkSuccessor(h);// 释放已释放return true;}// 资源未释放return false;}
release方法首先会调用 tryRelease(arg) 方法尝试释放资源,tryRelease(arg)也是默认抛出一个异常,该方法需要继承AQS的子类去实现。
如果tryRelease(arg)返回true,代表释放资源成功,那么接下来就会调用 unparkSuccessor(h) 方法唤醒后继节点的线程。
unparkSuccessor
unparkSuccessor方法的主要逻辑就是找到有效的(waitStatus属性非CANCELLED)后继节点,然后通过LockSupport的unpark方法唤醒后继节点的线程。
private void unparkSuccessor(Node node) {/** 当前节点(头节点)的waitStatus*/int ws = node.waitStatus;if (ws < 0)// 尝试CAS修改waitStatus为0compareAndSetWaitStatus(node, ws, 0);/** 后继节点*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;// 如果后继节点的waitStatus是CANCELLED,// 那么要从尾部开始往前遍历,找到离当前节点最近的不是CANCELLED的节点for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)// 唤醒后继节点线程LockSupport.unpark(s.thread);}
而Lockupport的unpark方法也是调用Unsafe的unpark方法唤醒指定的线程。
public static void unpark(Thread thread) {if (thread != null)UNSAFE.unpark(thread);}
unparkSuccessor方法这里有一个问题,为什么当后继节点的waitStatus属性是CANCELLED时,要从后往前找呢?
/** 后继节点*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;// 如果后继节点的waitStatus是CANCELLED,// 那么要从尾部开始往前遍历,找到离当前节点最近的不是CANCELLED的节点for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}
那是因为如果不从后往前找,就有可能漏掉节点。我们回顾一下节点入队的代码。
Node pred = tail;if (pred != null) {node.prev = pred; // (A)if (compareAndSetTail(pred, node)) { // (B)pred.next = node; // (C)return node;}}
可以看到是先把当前入队节点的prev指针指向当前队列的尾节点(代码A),然后通过CAS修改队列的尾节点为当前入队节点(代码B),最后才把原来的尾节点的next指针指向当前入队的节点。
那么我们想一想,假设现在有一个节点入队列,执行完代码A和代码B,但是代码C没有执行,如果此时我们通过next指针去遍历(从前往后)会怎么样呢?那就会遍历不到最新的尾节点的,因此unparkSuccessor方法才要从后往前遍历。
总结
我们已经看完了AQS的acquire方法和release方法的具体逻辑,对AQS的内部原理也有了一定的了解。AQS除了这两个方法还有acquireShared方法(以共享模式获取资源)、releaseShared方法(共享模式释放资源)、acquireInterruptibly方法(响应中断的获取资源)、acquireSharedInterruptibly方法(共享模式加响应中断的获取资源)等等,这些就不在这里继续展开了,可能会放到后面再去介绍。
下面放一张源码流程图,总结一下acquire方法和release方法的具体逻辑:
用AQS实现一个排他锁
我们看了那么多的AQS原理和源码,还没真正看到它的具体用途,下面使用AQS来实现一个排他锁,来具体看看AQS的作用:
/*** 排他锁(不可重入)* Created by huangjunyi on 2023/8/3.*/
public class ExclusiveLock {private final Sync sync;public ExclusiveLock() {sync = new Sync();}/*** 内部类,继承AQS*/class Sync extends AbstractQueuedSynchronizer {public Sync() {// state初始化为1,表示锁没有被获取this.setState(1);}@Overrideprotected boolean tryAcquire(int arg) {int state = this.getState();if (state == 0) return false; // 锁已被获取if (this.compareAndSetState(state, 0)) {// CAS成功,当前线程获取到锁,设置当前线程为占有锁的线程this.setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}@Overrideprotected boolean tryRelease(int arg) {// 占用锁的线程不是当前线程,返回falseif (this.getExclusiveOwnerThread() != Thread.currentThread()) return false;// 设置占用锁的线程为空this.setExclusiveOwnerThread(null);// 恢复state为1,表示释放锁this.setState(1);return true;}}public void lock() {this.sync.acquire(1);}public void unlock() {this.sync.release(1);}}
上面就是一个使用AQS实现的简单的排他锁,使用一个内部类Sync继承了AQS并重写tryAcquire放和tryRelease方法。tryAcquire是尝试获取锁的逻辑,会被AQS的acquire方法调用;而tryRelease则是释放锁的逻辑,会被AQS的release方法调用。而ExclusiveLock的lock方法直接调用acquire方法获取锁,ExclusiveLock的unlock方法直接调用release方法释放锁。
接下来我们测试一下,定义一个整形num(因为要使用Lambda表达式,所以定义为了整形数组),开启100个线程,每个线程做10000次num++,正常结果应该是输出1000000。
我们先不使用锁,看看输出的结果。
public static void main(String[] args) throws InterruptedException {int[] num = {0};Thread[] threads = new Thread[100];for (int i = 0; i < 100; i++) {Thread thread = new Thread(() -> {for (int j = 0; j < 10000; j++) {num[0]++;}});thread.start();threads[i] = thread;}for (int i = 0; i < threads.length; i++) {threads[i].join();}System.out.println(num[0]);}
输出结果:
可以看到在不加锁的情况下,发生了更新丢失,导致输出的不是预期的结果。
接下来我们加上排他锁再进行测试。
public static void main(String[] args) throws InterruptedException {int[] num = {0};ExclusiveLock exclusiveLock = new ExclusiveLock();Thread[] threads = new Thread[100];for (int i = 0; i < 100; i++) {Thread thread = new Thread(() -> {for (int j = 0; j < 10000; j++) {exclusiveLock.lock();num[0]++;exclusiveLock.unlock();}});thread.start();threads[i] = thread;}for (int i = 0; i < threads.length; i++) {threads[i].join();}System.out.println(num[0]);}
使用排他锁之后,输出的结果就正确了。
这样就实现了一个排他锁,至于线程间同步,排队,阻塞唤醒等等都由AQS实现了。