AQS
- 简介
- 基本框架
- 1、AQS框架介绍
- 2、AQS核心成员变量和方法
- 源码分析
- 1、CLH队列(FIFO)
- 2、独占模式获取资源
- 2.1、acquire(int)
- 2.1.1、tryAcquire(int arg)
- 2.1.2、addWaiter(Node mode)
- 2.1.3、acquireQueued(final Node node, int arg)
- 2.1.3.1、shouldParkAfterFailedAcquire(p, node)
- 2.1.3.2、parkAndCheckInterrupt()
- 2.1.4、selfInterrupt()
- 2.2、独占式获取资源小结
- 3、独占模式释放资源
- 3.1、release(int arg)
- 3.1.1、tryRelease(int arg)
- 3.1.2、unparkSuccessor(Node node)
- 3.2、独占式释放资源小结
- 4、共享模式获取资源
- 4.1、acquireShared(int arg)
- 4.1.1、tryAcquireShared(int arg)
- 4.1.2、doAcquireShared(int arg)
- 4.1.2.1、setHeadAndPropagate(Node node, int propagate)
- 4.2、共享模式获取资源小结
- 5、共享模式释放资源
- 5.1、共享模式释放资源
- 5.1.1、releaseShared(int arg)
- 5.1.1.1、doReleaseShared()
- 5.2、共享模式释放资源小结
- 总结
简介
AbstractQueuedSynchronizer(AQS)字面意思是抽象队列同步器,。AQS定义了一套多线程访问共享资源的同步器框架,许多我们使用的同步器都是基于它来实现的,如常用的ReentrantLock、Semaphore、CountDownLatch、CyclicBarrie并发类都是通过实现AQS里面的模板方法来实现内部的组件。
原文链接:https://blog.csdn.net/u011047968/article/details/107007010/
基本框架
1、AQS框架介绍
原图地址:https://www.processon.com/view/link/5ef89c477d9c08442039b8c5
AQS实现原理依赖内部state(同步状态)和CHL队列(FIFO双向队列),如果当前线程获取state同步状态失败AQS会将该线程以及状态等信息构造一个Node节点,并将这个Node节点添加到队尾,同时阻塞当前线程,当同步状态释放时,唤醒队列头节点。
2、AQS核心成员变量和方法
AQS核心的三个成员变量如下:
private transient volatile Node head;//CHL队列的头部节点,延迟初始化。除了初始化,它只通过setHead()方法进行修改。如果head节点存在,head节点的waitStatus保证不会被CANCELLEDprivate transient volatile Node tail;//CHL队列的尾部节点,延迟初始化。仅通过enq()方法新增等待的节点。private volatile int state; //同步状态
我们可以看出来这三个成员变量都是使用volatile关键字来修饰的,volatile代表变量内存可见。
state有以下三种访问方式:
getState():获取同步状态。
setState(int newState):设置同步状态。
compareAndSetState(int expect, int update):通过CAS方式修改同步状态。
三种方法源码实现如下:
private volatile int state;// 具有内存读可见性语义protected final int getState() {return state;}// 具有内存写可见性语义protected final void setState(int newState) {state = newState;}// 具有内存读/写可见性语义protected final boolean compareAndSetState(int expect, int update) {// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update);}
资源共享方式分为两种:
独占式(Exclusive):只有单个线程能够成功呢获取资源并执行,如ReentrantLock。
共享式(Shared):多个线程可成功获取资源并执行,如Semaphore、CountDownLatch等。
AQS将大部分的同步逻辑均已经实现好了,继承的自定义同步器只需要实现state的获取(acquire)和释放(release)的逻辑代码就可以了。
独占
tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
AQS需要子类复写的方法均没有声明为abstract,目的是避免子类需要强制性覆写多个方法,因为一般自定义同步器要么是独占要么是共享方式,只需实现tryAcquire-tryRelease或tryAcquireShared-tryReleaseShared中的一种组合即可。当然,AQS也支持子类同时实现独占和共享两种模式,如ReentrantReadWriteLock。
源码分析
1、CLH队列(FIFO)
AQS是通过内部类Node来实现FIFO队列的,源码如下
static final class Node {//表明节点在共享模式下等待的标记static final Node SHARED = new Node();//表情节点在独占模式下等待的标记static final Node EXCLUSIVE = null;//指示等待线程已取消static final int CANCELLED = 1;//指示需要唤醒后续线程static final int SIGNAL = -1;//指示线程在等待触发条件(condition)static final int CONDITION = -2;//指示下一个acquireShared应无条件传播的waitStatus值static final int PROPAGATE = -3;/*** CANCELLED(1) :表示当前节点因timeout和interrupt而放弃竞争state,进入该状态后的节点将不会再变化。* SIGNAL(-1) :表示后继节点等待当前节点唤醒。后继节点入队列是,会将前继节点的状态变更为SIGNAL。 * CONDITION(-2):表示节点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的节点将从等待队列转移到同步队列中,等待获取同步锁* PROPAGATE(-3):共享模式下,前继节点不仅会唤醒其后继节点,同时也可能唤醒后继的后继节点。* 0 :以上情况都不是。*/volatile int waitStatus;//前继节点volatile Node prev;//后继节点volatile Node next;//持有的线程volatile Thread thread;//下一个等待条件出发的节点Node nextWaiter;//返回节点是否处于Shared状态下final boolean isShared() {return nextWaiter == SHARED;}//返回前继节点final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}//Shared模式下的Node构造方法Node() { }//用于addWaiter的构造方法Node(Thread thread, Node mode) {this.nextWaiter = mode;this.thread = thread;}//用于Condition下的构造方法Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}}
当waitStatus非负的时候,表示不可用,正数代表处于等待状态,所以waitStatus只需要检查其正负符号即可,不用太多关注特定值。
2、独占模式获取资源
2.1、acquire(int)
独占模式(Exclusive)获取资源的入口方法为:
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//获取失败,则加入等待队列selfInterrupt();}
从上面的源代码我们可以看出执行方法的顺序依次为:
(1)tryAcquire(arg) 尝试获取资源,如果获取成功返回true则acquire()直接返回。如果返回fasle,则进入(2);
(2)addWaiter(Node.EXCLUSIVE), arg) 将该线程加入CHL等待队列的尾部,并标记为独占模式,完成后进入(3);
(3)acquireQueued()以独占模式不间断获取队列中已存在的线程直到获取元素。获取元素成功后若线程未中断过则返回false然后acquire()直接返回,如果等待过程中被中断过则返回true,然后进入(4);
(4)selfInterrupt() 这个方法翻译过来就是自我在中断,注意这个中断方法必须是在获取元素成功之后才会执行的,就是说获取资源成功了才会执行的,不是立即响应中断的。
2.1.1、tryAcquire(int arg)
这个目的是尝试获取独占资源的方法,成功直接返回true,失败直接返回false,这个地方体现了非公平锁,因为调用的线程直接获取,完全不考虑CHL队列中还有可能有线程在等待获取资源。源码如下
//(1)尝试获取资源protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}
注意,这里的tryAcquire() 是一个由protected修饰的空方法,AQS只是一个框架具体的资源获取-释放则是由自定义的同步器去实现的。这里本可以定义成为abstract方法,前面我们说过如果独占模式下只用实现tryAcquire-tryRelease而共享模式下只用实现tryAcquireShared-tryReleaseShared,如果这里我们定义成abstract方法的话,我们在实现独占模式的情况下还要去考虑实现共享模式的两个方法。作者Doug Lea使用这种方式可以让我们该去实现独占模式的时候不去考虑共享模式的方法,如果未自己实现就用则会抛UnsupportedOperationException异常。
2.1.2、addWaiter(Node mode)
这个方法就是在上面获取资源失败的情况下,将当前线程加入到CHL队列的队尾,并返回当前线程所在的Node节点。源码如下:
//(2)将获取资源失败的线程放入队尾private Node addWaiter(Node mode) {//(2.1)用给定模式构造Node节点。mode取值有两种:EXCLUSIVE(独占)、SHARED(共享)Node node = new Node(Thread.currentThread(), mode);//(2.2)尝试快速插入等待队列,如果失败则执行常规插入操作enq(node);Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}//(2.3)上面插入失败,则使用此方法插入enq(node);return node;}
其中enq(node)方法如下:
private Node enq(final Node node) {//(2.3.1)CAS自旋,直到成功加入到队尾for (;;) {Node t = tail;if (t == null) { //(2.3.2)如果队列为空,则创建一个空的Node节点作为head节点,并将tail指向headif (compareAndSetHead(new Node()))tail = head;} else {//(2.3.3)正常流程,放入队尾node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
其中compareAndSetHead和compareAndSetTail执行的是unsafe里面的compareAndSwapObject方法,这个方法是native方法,属于原子操作。想了解这个方法的可以查阅CAS相关的知识。
//CAS自旋赋值head节点private final boolean compareAndSetHead(Node update) {return unsafe.compareAndSwapObject(this, headOffset, null, update);}//CAS自旋赋值tail节点private final boolean compareAndSetTail(Node expect, Node update) {return unsafe.compareAndSwapObject(this, tailOffset, expect, update);}
2.1.3、acquireQueued(final Node node, int arg)
通过上面的tryAcquire() 和addWaiter()这个 线程已经获取资源失败了,并且已经被放到等待队列的尾部了。acquireQueued()方法是以自旋方式获取独占模式获取队列中已存在的线程。举个例子:例如我们去12306网站买票,刚开页面显示无票,我们就一直刷新页面直到有车票资源。
//(3)进入等待状态直到head节点线程释放资源,当前线程获取资源并返回是否被中断标识final boolean acquireQueued(final Node node, int arg) {//(3.1)标记是否成功拿到资源boolean failed = true;try {//(3.2)标记等待过程中是否被中断过boolean interrupted = false;//自旋for (;;) {//(3.3)拿到前驱节点final Node p = node.predecessor();//(3.4)如果前驱节点是head,即当前是第2个节点,那么符合条件尝试获取资源。可能是head节点释放完资源释放了当前节点,也有可能被interrupt中断了。if (p == head && tryAcquire(arg)) {//(3.4.1)剩下的这两步就是把当前节点设置为head节点,并且释放原来的head节点setHead(node);//(3.4.2)这里p.next指向null就是交给GC回收了p.next = null;//(3.4.3)成功获取资源failed = false;//(3.4.4)返回等待过程中是否被中断过return interrupted;}//(3.5)走到这里代表当前元素不是第2个节点则继续判断是否满足下面2个条件。//1.shouldParkAfterFailedAcquire方法检查线程是否应该阻塞//2.parkAndCheckInterrupt方法调用park()当前线程,直到unpack()被唤醒,判断当前线程是否被中断了if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())//如果等待过程中被红断过,就将interrupted设置为trueinterrupted = true;}} finally {//(3.6)代表等待过程中没有成功获取到资源(timeout,或者被中断),则放弃争抢资源if (failed)cancelAcquire(node);}}
2.1.3.1、shouldParkAfterFailedAcquire(p, node)
上面的shouldParkAfterFailedAcquire方法的实现如下:
//(3.5.1)shouldParkAfterFailedAcquire方法检查线程是否应该阻塞private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {//(3.5.1.1)获取前继节点的waitStatus值int ws = pred.waitStatus;//(3.5.1.2)如果ws值为SIGNAL(-1),代表前继节点完成资源释放或者中断后,会通知当前节点,因此当前节点可以安全的parkif (ws == Node.SIGNAL)return true;//(3.5.1.3)如果ws>0,其实就是CANCELLED(1)代表前继节点处于放弃状态,//那就继续遍历直到前继节点的ws为0或者为-1if (ws > 0) {do {//这一句的意思就是节点指针向前移动,直到前继节点满足条件node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {//(3.5.1.4)如果当前ws<=0则设置当前节点为SIGNAL(-1),以保证外层方法自旋的时候返回truecompareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
2.1.3.2、parkAndCheckInterrupt()
parkAndCheckInterrupt主要是调用LockSupport类的park()方法阻塞当前线程,并返回线程是否被中断过。
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
2.1.4、selfInterrupt()
通过上面的分析,能走到这一步代表此线程在等待过程中被中断了。
//(4)中断当前线程static void selfInterrupt() {Thread.currentThread().interrupt();}
2.2、独占式获取资源小结
再一次,拿出独占模式(Exclusive)获取资源的入口方法:
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//获取失败,则加入等待队列selfInterrupt();}
从上面的源代码我们可以看出执行方法的顺序依次为:
(1)tryAcquire(arg)尝试获取资源,如果获取成功返回true则acquire()直接返回。如果返回fasle,则进入(2);
(2)addWaiter(Node.EXCLUSIVE), arg)将该线程加入CHL等待队列的尾部,并标记为独占模式,完成后进入(3);
(3)acquireQueued()以独占模式不间断获取队列中已存在的线程直到获取元素。获取元素成功后若线程未中断过则返回false然后acquire()直接返回,如果等待过程中被中断过则返回true,然后进入(4);
(4)selfInterrupt()这个方法翻译过来就是自我在中断,注意这个中断方法必须是在获取元素成功之后才会执行的,就是说获取资源成功了才会执行的,不是立即响应中断的。
我们再补一个流程图便于理解:
这也就是ReentrantLock.lock()的流程,其整个函数就是一条acquire(1)
3、独占模式释放资源
3.1、release(int arg)
独占模式释放资源的过程其也就是unlock()过程,其实就是赋值state=0,此时线程AQS会唤醒队列其他线程获取资源。
public final boolean release(int arg) {//(1)尝试释放资源if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)//(2)唤醒队列里其他线程unparkSuccessor(h);return true;}return false;}
3.1.1、tryRelease(int arg)
尝试释放资源,根据他的返回值判断是否释放资源成功。
protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}
这是个空方法,这个需要根据自己的情况自定义同步器去实现。如果彻底释放资源返回true,否则返回false。
3.1.2、unparkSuccessor(Node node)
此方法是用来唤醒等待队列中的下一个线程:
//(2)唤醒队列里其他线程private void unparkSuccessor(Node node) {//(2.1)获取当前Node节点的waitStateint ws = node.waitStatus;//(2.2)如果当前的状态为SIGNAL(-1),则尝试置为0if (ws < 0)compareAndSetWaitStatus(node, ws, 0);//(2.3)找到下一个节点Node s = node.next;//(2.4)如果节点为空或者CANCELLED(1)if (s == null || s.waitStatus > 0) {s = null;//(2.5)这就是从尾部tail节点遍历队列,直到获取状态<=0的节点for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)//(2.6)如果s不为空唤醒线程LockSupport.unpark(s.thread);}
后继节点的阻塞线程被唤醒后,就进入到acquireQueued()的if (p == head && tryAcquire(arg))的判断中,此时被唤醒的线程将尝试获取资源。如果被唤醒的线程所在节点的前继节点不是头结点,经过shouldParkAfterFailedAcquire的调整,也会移动到等待队列的前面,直到其前继节点为头结点。
3.2、独占式释放资源小结
release()是独占模式下释放共享资源的入口方法,它会释放指定量的资源,如果彻底释放了(即state=0),此时它将唤醒等待队列的线程来获取资源。一共分为两个步骤:
1)使用tryRelease(arg)尝试释放资源
2)释放成功则使用unpack()唤醒等待队列里面的下一个线程
4、共享模式获取资源
4.1、acquireShared(int arg)
共享模式获取共享资源的入口就是acquireShared方法,方法的代码如下:
public final void acquireShared(int arg) {//(1)尝试获取共享资源if (tryAcquireShared(arg) < 0)//(2)获取资源失败,进入等待队列doAcquireShared(arg);}
共享模式获取锁分为以下两步:
(1)tryAcquireShared()方法尝试获取共享资源,如果获取成功了就是返回的结果大于等于0,那恭喜你,直接返回了。如果返回值小于0,代表获取共享资源失败了,则进入(2);
(2)通过doAcquireShared()方法将获取锁失败的线程放入到队列中。
这里tryAcquireShared返回值负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。
4.1.1、tryAcquireShared(int arg)
这个tryAcquireShared() 也是一个空方法跟我们之前的独占式的一样,需要我们自定义的同步器去实现。
//(1)尝试获取共享资源protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}
4.1.2、doAcquireShared(int arg)
将线程加入等待队尾,直到其他线程释放资源释放资源,并成功大道相应的共享资源才 返回。
//(2)获取资源失败,进入等待队列private void doAcquireShared(int arg) {//(2.1)将SHARED模式的节点添加到队尾final Node node = addWaiter(Node.SHARED);boolean failed = true;//是否失败标识默认truetry {boolean interrupted = false;//是否被中断标识,默认fasle//(2.2)使用自旋方式获取资源for (;;) {final Node p = node.predecessor();//获取前驱节点if (p == head) {//如果当前线程的前驱节点是首节点,此时当前节点就是第2个节点,head执行完就该唤醒自己了//(2.2)尝试获取资源int r = tryAcquireShared(arg);if (r >= 0) {//代表获取资源成功了//(2.2.1)获取资源成功后重新设置head节点并且释放;老的head节点setHeadAndPropagate(node, r);p.next = null; //释放老head节点,交给GC//(2.2.2)如果此时发现线程已经被中断,则中断自己,这个跟独占方式一样if (interrupted)//(2.2.2)中断自己selfInterrupt();failed = false;//成功标识return;}}//(2.3)走到这里代表获取资源失败了,判断是否可以park,如果可以调用给你park()方法,然后等待unpark或者interrupt。如果线程被中断过,则将中断标识修改为trueif (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {//(2.4)代表等待过程中没有成功获取到资源(timeout,或者被中断),则放弃争抢资源if (failed)cancelAcquire(node);}}
我们可以发现doAcquireShared() 的实现和acquireQueued() ,十分相似,流程没有太大差别。只是把selfInterrupt() 方法是在doAcquireShared() 内部,独占模式是在acquireQueued() 外面,其实结果都差不多。
4.1.2.1、setHeadAndPropagate(Node node, int propagate)
//(2.2.1)获取资源成功后重新设置head节点private void setHeadAndPropagate(Node node, int propagate) {Node h = head;//记录老节点下面会检查setHead(node);//将头结点指向当前节点//如果共享资源还有剩余量,则继续唤醒下一个线程if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}}
这里面其实和独占模式基本一样的,就是多了一步如果有剩余共享资源则主动唤醒下一个节点这一步。下面释放资源的时候分析doReleaseShared() 。
4.2、共享模式获取资源小结
其实共享模式获取资源和独占模式获取资源很相似,它的流程如下:
(1)tryAcquireShared() 方法尝试获取共享资源,如果获取成功了就是返回的结果大于等于0,那恭喜你,直接返回了。如果返回值小于0,代表获取共享资源失败了,则进入(2);
(2)通过doAcquireShared() 方法将获取锁失败的线程放入队列,并调用park() 方法,直到被unpark() 或者interrupt() 。这里多了一步,在当先线程拿到资源后,还会去唤醒后继线程的操作。
5、共享模式释放资源
5.1、共享模式释放资源
5.1.1、releaseShared(int arg)
releaseShared方法是释放资源的入口方法。这个方法会释放定量的资源,如果成功释放且允许唤醒等待线程,则会唤醒等待队列里的其他线程来获取资源。
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}//这是一个空方法等待自定义同步器去实现protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();}
这个流程原理概括起来就是:释放资源后,唤醒后继。举个例子理解以下这个场景:
假如一共有10个共享资源,线程A、B、C分别需要5、4、3个资源
- A线程获取到5个资源,发现剩余5个资源,然后醒B线程;
- B线程获取到4个资源,发现剩余1个资源,然后唤醒C线程;
- C线程尝试获取3个资源,现在只有1个资源不够用,则继续阻塞;
- A线程释放1个资源,现在剩余2个资源了,然后唤醒C线程;
- C线程尝试获取3个资源,现在只有2个资源不够用,继续阻塞;
- B线程释放1个资源,现在剩余3个资源,然后唤醒C线程;
- C线程尝试获取3个资源,资源获取成功,C线程被唤醒。
这点表达跟独占锁不同的地方是独占方式必须是资源释放掉(state=0)才返回true,但是共享模式下根据我们上面的例子可以看出没有这种要求。
5.1.1.1、doReleaseShared()
这个方法主要是唤醒后继线程。
private void doReleaseShared() {//自旋for (;;) {//储存head节点,后续检查Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck cases//唤醒后继线程unparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}//head节点发生变化就跳出循环if (h == head) // loop if head changedbreak;}}
5.2、共享模式释放资源小结
上面的我们已经把共享模式下释放资源的情况分析了。一句话总结:释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。
总结
以上我们分析了独占模式、共享模式下获取-释放资源(acquire-release、acquireShared-releaseShared)的源码,他们在获取共享资源的时候都是忽略中断的直到获取资源。其实AQS也支持响应中断的,acquireInterruptibly() 和acquireSharedInterruptibly(),有兴趣可以自行研究。