AQS(AbstractQueuedSynchronizer), 可以说的夸张点,并发包中的几乎所有类都是基于AQS的。
一起揭开AQS的面纱
1. 介绍
为依赖 FIFO阻塞队列 的阻塞锁和相关同步器(semaphores, events等)的实现提供一个框架。 为那些依赖于原子state的同步器提供基础(CyclicBarrier、CountDownLatch等). 支持独占模式和共享模式, 不同的模式需要实现不同的方法.
引用这位大佬的图 http://www.cnblogs.com/waterystone/p/4920797.html 这个图是AQS整体结构,从图中可以看到,AQS维护着一个阻塞队列(当线程获取资源失败时,就会进入该队列,等待,直到被唤醒), state是一个共享的资源。
2. 源码剖析
我们先看看AQS的类图,
内部类: Node,阻塞队列维护的元素;ConditionObject, 支持独占模式下的子类用作Condition实现, 后面会讲到。先看看Node的结构。
1 |
|
tip: waitStatus > 0, 即 CANCELLED, 此时的结点不正常。
AQS使用了模板模式, 自主选择重新定义以下方法
- tryAcquire - 独占模式
- tryRelease - 独占模式
- tryAcquireShared - 共享模式
- tryReleaseShared - 共享模式
- isHeldExclusively
调用这些方法,都会引发UnsupportedOperationException,后面的文章将通过其子类,来讲解它们的实现。
有了这些知识后,我们从下面这几个关键的共有方法入手去讲解AQS
- acquire(int arg) - 独占模式
- release(int arg) - 独占模式
- acquireShared(int arg) - 共享模式
- releaseShared(int arg) - 共享模式
2.1 acquire
独占模式下的获取资源,忽略中断。调用tryAcquire至少一次,若成功就返回。否则,将线程入队,并可能反复阻塞和接触阻塞,并调用tryAcquire直至成功。此方法可以用于实现 Lock.lock
1 |
|
tryAcquire在后面文章结合子类进行分析。 代码(2.1.1)中,此时调用了tryAcquire,获取资源失败,返回false,继续执行后续方法。
addWaiter – Queuing utilities
使用当前线程和给定mode, 新建一个Node,并且将新建Node入队
1 |
|
enq – Queuing utilities
一直循环直到node入队成功
1 |
|
经过了上面的操作,目前的线程已经加入了队尾,此时做的事情就是阻塞自己,等待资源释放并且获取,然后执行自己的操作
acquireQueued
以独占且不可中断的模式,获取已经在阻塞队列中的线程。若在等待时被中断,返回true
1 |
|
shouldParkAfterFailedAcquire – Utilities for various versions of acquire
对获取资源失败的node,检测并获取结点。返回true,如果线程需要阻塞
1 |
|
cancelAcquire – Utilities for various versions of acquire
取消正在进行的获取资源的尝试
1 |
|
unparkSuccessor
后驱结点存在一个在正在等待的结点,则唤醒它
1 |
|
2.1.1 acquire流程图
根据上面的分析,整一个流程图
2.1.2 小结
根据acquire流程图,一句话小结其流程,尝试获取资源,失败则将新建node(当前线程及独占模式)入队,检测自己是否是老二,是老二就再一次尝试获取资源,成功就返回中断标志,不是老二就设置为SIGNAL,park自己,然后安心等待被唤醒。
2.2 release
独占模式下的释放资源。解除阻塞一个或多个线程,当tryRelease返回true时。此方法可以用于实现 Lock.unlock
1 |
|
2.2.1 release流程图
因为release(int arg)的主要流程是在tryRelease和unparkSuccessor中,但是tryRelease又是在子类中实现,所以该流程图也可以看作unparkSuccessor的流程图
2.2.2 小结
根据release流程图, 一句话小结其流程, 释放资源,唤醒后驱没有被取消的结点。
下面讲讲AQS的另一种模式,共享模式
2.3 acquireShared
共享模式下获取资源,忽略中断。至少调用tryAcquireShared一次,成功就返回。否则,线程将入队,可能会重复的阻塞和解除阻塞,直到调用tryAcquireShared成功。成功获取到资源,将会唤醒后驱结点,若资源满足。
1 |
|
对tryAcquireShared返回的参数,进行简单的介绍
- 返回负数表示失败;
- 返回零,随后的线程都不能获取到资源
- 返回正数,随后的线程可以获取到资源
此时tryAcquireShared的返回值是小于零,表示获取资源失败,进行下一步处理
doAcquireShared
获取资源在不可中断的模式下
1 |
|
setHeadAndPropagate
设置队列的头结点,达到条件就唤醒后面的结点.
1 |
|
代码(2.3.3)中, 为什么不只用propagate来判断是否唤醒后驱结点 [问题二]
doReleaseShared
共享模式下主要的释放资源的逻辑,唤醒后驱结点,确保线程不被挂起
1 |
|
2.3.1 acquireShared的流程图
2.3.2 小结
一句话小结acquireShared的流程,尝试获取资源,若获取到资源,资源还有剩余就去继续唤醒后驱结点,若尝试获取资源失败,就park自己,等待被唤醒。 跟acquire相比,最大的区别就是,获取到资源acquireShared,还会去尝试唤醒其后驱结点
2.4 releaseShared
Releases in shared mode. Implemented by unblocking one or more threads if tryReleaseShared returns true.
1 |
|
2.5 问题解答
[问题一] 为什么在唤醒后驱结点时,node的后驱结点为空,需要重新从后往前找
1 |
|
仔细观察代码(2.1.2) 和(2.1.3), 此时添加结点相当于有三步,都不是原子性的,当执行到第二步时,就要唤醒后驱结点了,此时新增的结点只设置了前驱结点,队列设置了尾结点,但是没有设置后驱结点,如果从前往后查找的话,可能会丢失符合要求的结点。
[问题二],代码(2.3.3)中, 为什么不只用propagate来判断是否唤醒后驱结点。 请看这位大佬的博客 讲的非常详细 大致意思就是, 我们假设有A、B、C、D四个线程,前两个释放资源的线程,后两个是争抢资源的线程,此时只有A或B释放了资源,C、D才可以被唤醒,假设我们不看PROPAGTE 时刻一: A线程释放资源,执行代码 (2.3.4),head的waitStatus从SIGNAL(-1)变为了0 时刻二: C线程获取到资源,执行到代码(2.3.1), tryAcquireShared返回0 时刻三: B线程线程释放资源,执行代码 (2.3.4),因为此时未改变头结点,head的waitStatus为0,不能unparkSuccessor 时刻四: 此时C执行到代码(2.3.3),propagate(tryAcquireShared返回值)为0,C也不会去唤醒后驱结点,D线程就永远GG了
引用doReleaseShared注释中的一句话
status is set to PROPAGATE to ensure that upon release, propagation continues.
2.6 Condition
使用synchronized时,线程间通信使用wait, notify and notifyAll;而使用AQS实现的lock,线程间的通信就使用Condition中的await、signal…。Condition与Lock结合使用,同一个lock对应多个Condition。
1 |
|
在AQS中,已经对Condition的方法进行了实现,子类想使用的话,只需要调用ConditionObject就行了
1 |
|
本来想跟着源码走,简简单单介绍一下Condition,但是源码有几处细节,让我头秃,在网上搜索别人的博客,这篇博客解开了我的疑惑,对Condition介绍的非常详细,写的非常的完美~
根据大佬的博客,那我们下面简单讲解Condition的两个常用方法
- await
- signal
2.6.1 await & signal
导致目前线程阻塞直到被唤醒或中断;调用await后,会将当前的线程封装成node,加入到条件队列中
1 |
|
为了讲清楚代码(2.6.1)之后的逻辑,我们先看看signal的源码
将condition queue中等待最长的结点转移到sync queue中去,去争抢资源
1 |
|
此时,执行signal的主要逻辑
1 |
|
将condition queue中的一个结点转移到sync queue中去
1 |
|
这里signal的逻辑就讲完了,总结一下:
- 在condition queue中找出等待时间最长且未被取消的结点, 转移到sync queue的队尾去,同时要在condition queue中删除该结点
- 若在sync queue中的该结点的前驱结点被取消了或设置SIGNAL状态失败,要直接唤醒它,叫它去竞争锁。
signalAll的主要逻辑和signal是一样的,差别就是signalAll会把所有在condition queue中的结点转移到sync queue中去,并清空所有在condition queue中的结点,下面只贴一下signalAll的主要代码,
1 |
|
我们再次回到await中去
1 |
|
从代码(2.6.1),继续讲解 isOnSyncQueue
一开始在条件队列中,现在在sync queue中等待重新获取资源,如果有这种的node就返回true
1 |
|
findNodeFromTail
从尾部找寻结点
1 |
|
node.waitStatus == Node.CONDITION, 表示当前结点肯定在condition queue中。
为何是上面的那些条件? 我们上面看了转移到sync queue是用的enq方法
1 |
|
结合代码(2.6.5),思考一下就知道isOnSyncQueue中条件设置的道理了,但是为何需要findNodeFromTail啦? 这是需要补充一个知识点了,在多个线程执行enq时,只有一个线程会设置为tail成功,其余的都只是设置prev,就可能会出现下面图片中的情况,‘多个尾巴’。一直不断自旋,最后会变成一个正常的链表。 此时线程的状态是,调用await后,将结点添加到条件队列中,且释放了自己持有的所有资源,并将自己park,此时等待被signal或者被中断。
1 |
|
执行到代码(2.6.2), 我们直到可能是被signal或被中断了。现在要解决的是,
- 是否被中断?
- 何时被中断?
- 中断如何处理?
我们带着这三个问题,继续出发~
补充一个小知识点,AQS定义了三种情况中断的值
- THROW_IE, signal前被中断,要抛出InterruptedException
- REINTERRUPT, signal后被中断
- 0, 未被中断
关于REINTERRUPT这个中断,可以理解成,吃饭,吃完了但是还有一个菜没有上,问服务员,“如果没有炒,就不要了”,但是服务员告诉,菜已经下锅,所以这时候的中断就是REINTERRUPT,中断的太晚了。 – 例子来自上面的那篇博客
我们继续看向代码(2.6.2) checkInterruptWhileWaiting
1 |
|
若被中断Thread.interrupted肯定为true
transferAfterCancelledWait
1 |
|
上面的代码已经注明了,各种情况的发生时机,此时我们来到了await的第二部分~
1 |
|
reportInterruptAfterWait
1 |
|
2.6.2 await与signal的流程图
2.6.3 小结
先讲讲await的流程,看起流程图有点吓人,其实很多步骤是对不同时机的中断操作的记录 当await被执行,下面简单总结下await的流程
- 将当前线程与CONDITION状态封装成node,加入到condition queue的末尾
- 释放线程之前获取的所有资源
- 若不在sync queue中,阻塞自己,等待被signal或被中断
- 获取中断操作的时机,并记录表示何时中断的值(interruptMode)
- 不管是怎么被唤醒的,都要去竞争资源,直到获得资源为止
- 最后对不同的中断值,作出不同的操作
signal的流程就相对于简单一点
- 获取condition queue的头结点
- 检验是否被取消,若被取消,就获取头结点的后驱结点,以此类推;
- 将结点从condition queue中转移到sync queue中,而且会从condition queue中删除该节点
- 若结点插入sync queue,得到的前驱结点,被取消了,或者CAS前驱结点状态为SIGNAL
失败,将直接unpark当前线程
3. 总结
Doug Lea,太秀了。AQS中有很多细枝末节的东西,只有自己去读了源码,理解为何这样做,你才会明白才会真正读懂AQS。 关于学习和写AQS文章时,看了一些博客,为我解答了自己的疑惑,慢慢加油,我也要向这些大佬看齐~
4. 参考
- 《Java并发编程之美》 - 这本书可以作为学习并发的入门书
- Java并发之AQS详解 - 引用了他的图片
- AbstractQueuedSynchronizer源码解读 - 为我解开了一些获取和释放资源的疑惑
- 逐行分析AQS源码(4)——Condition接口实现 - 为我解开了一些Condition的疑惑
- AQS论文 Doug Lea
# 5. 面试中问题
~~这是我的一个想法,若我博客中写过的知识,在面试中有问到过,我会记录下来,没有就是目前还没遇到过