AQS抽象同步器的核心原理
前面介绍的在争用激烈的场景下,使用基于CAS自旋实现的轻量级锁有两个大的问题:
(1)CAS恶性空自旋会浪费大量的CPU资源。
(2)在SMP架构的CPU上会导致“总线风暴”。
解决CAS恶性空自旋的有效方式之一是以空间换时间,较为常见的方案有两种:分散操作热点和使用队列削峰。JUC并发包使用的是队列削峰的方案解决CAS的性能问题,并提供了一个基于双向队列的削峰基类——抽象基础类AbstractQueuedSynchronizer(抽象同步器类,简称为AQS)。
锁与队列的关系
无论是单体服务应用内部的锁,还是分布式环境下多体服务应用所使用的分布式锁,为了减少由于无效争夺导致的资源浪费和性能恶化,一般都基于队列进行排队与削峰。
CLH锁的内部队列
- CLH自旋锁使用的CLH(Craig,Landin,andHagersten Lock Queue)是一个单向队列,也是一个FIFO队列。在独占锁中,竞争资源在一个时间点只能被一个线程锁访问,队列头部的
节点表示占有锁的节点,新加入的抢锁线程则需要等待,会插入队列的尾部。
- CLH自旋锁使用的CLH(Craig,Landin,andHagersten Lock Queue)是一个单向队列,也是一个FIFO队列。在独占锁中,竞争资源在一个时间点只能被一个线程锁访问,队列头部的
分布式锁的内部队列
- 在分布式锁的实现中,比较常见的是基于队列的方式进行不同节点中“等锁线程”的统一调度和管理。
AQS的内部队列
- AQS是JUC提供的一个用于构建锁和同步容器的基础类。JUC包内许多类都是基于AQS构建的,例如ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock、FutureTask等。AQS解决了在实现同步容器时设计的大量细节问题。AQS是CLH队列的一个变种,主要原理和CLH队列差不多,这也是前面对CLH队列进行长篇大论介绍的原因。
- AQS队列内部维护的是一个FIFO的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的前驱节点和直接的后继节点。所以双向链表可以从任意一个节点开始很方便地访问前驱节点和后继节点。每个节点其实是由线程封装的,当线程争抢锁失败后会封装成节点加入AQS队列中;当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。
AQS的核心成员
AQS出于“分离变与不变”的原则,基于模板模式实现。AQS为锁获取、锁释放的排队和出队过程提供了一系列的模板方法。由于JUC的显式锁种类丰富,因此AQS将不同锁的具体操作抽取为钩子方法,供各种锁的子类(或者其内部类)去实现。
状态标志位
AQS中维持了一个单一的volatile
修饰的状态信息state,AQS使用int类型的state标示锁的状态,可以理解为锁的同步状态。
1 | //同步状态,使用 volatile保证线程可见 |
state因为使用volatile保证了操作的可见性,所以任何线程通过getState()
获得状态都可以得到最新值。AQS提供了getState()
、setState()
来获取和设置同步状态,具体如下:
1 | // 获取同步的状态 |
由于setState()无法保证原子性,因此AQS给我们提供了compareAndSetState()
方法利用底层UnSafe的CAS机制来实现原子性。 compareAndSetState()
方法实际上调用的是unsafe成员的compareAndSwapInt()
方法。
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程执行该锁的lock()操作时,会调用tryAcquire()独占该锁并将state加1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(释放锁)为止,其他线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么
次,这样才能保证state能回到零态。
AbstractQueuedSynchronizer
继承了AbstractOwnableSynchronizer
,这个基类只有一个变量叫exclusiveOwnerThread
,表示当前占用该锁的线程,并且提供了相应
的get()和set()方法,具体如下:
1 | public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { |
队列节点Node类
AQS是一个虚拟队列,不存在队列实例,仅存在节点之间的前后关系。节点类型通过内部类Node定义,其核心的成员如下:
1 | static final class Node { |
Node 节点 waitStatus 状态含义
Node 节点状态 | 值 | 含义 |
---|---|---|
CANCELLED |
1 | 表示线程已经取消获取锁。线程在等待获取资源时被中断、等待资源超时会更新为该状态。 |
SIGNAL |
-1 | 表示后继节点需要当前节点唤醒。在当前线程节点释放锁之后,需要对后继节点进行唤醒。 |
CONDITION |
-2 | 表示节点在等待 Condition。当其他线程调用了 Condition 的 signal() 方法后,节点会从等待队列转移到同步队列中等待获取资源。 |
PROPAGATE |
-3 | 用于共享模式,在共享模式下,前继节点不仅会唤醒后继节点,同时也可能会唤醒后继节点的后继节点。 |
0 | 加入队列的新节点的初始状态。 |
如果 waitStatus > 0
,表明节点的状态已经取消等待获取资源。
如果 waitStatus < 0
,表明节点的处于有效的等待状态。
因此在 AQS 的源码中,经常使用 > 0
、 < 0
来对 waitStatus
进行判断。
FIFO双向同步队列
AQS的内部队列是CLH队列的变种,每当线程通过AQS获取锁失败时,线程将被封装成一个Node节点,通过CAS原子操作插入队列尾部。
当有线程释放锁时,AQS会尝试让队头的后继节点占用锁。AQS通过内置的FIFO双向队列来完成线程的排队工作,内部通过节点head和tail记录队首和队尾元素,元素的节点类型为Node类型,具
体如下:
1 | /*首节点的引用*/ |
AQS的首节点和尾节点都是懒加载的。懒加载的意思是在需要的时候才真正创建。只有在线程竞争失败的情况下,有新线程加入同步队列时,AQS才创建一个head节点。head节点只能被setHead()方法修改,并且节点的waitStatus不能为CANCELLED。尾节点只在有新线程阻塞时才被创建。
AQS 资源共享方式
AQS 定义两种资源共享方式:Exclusive
(独占,只有一个线程能执行,如ReentrantLock
)和Share
(共享,多个线程可同时执行,如Semaphore
/CountDownLatch
)。
一般来说,自定义同步器的共享方式要么是独占,要么是共享,他们也只需实现tryAcquire-tryRelease
、tryAcquireShared-tryReleaseShared
中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock
。
AQS中的钩子方法
什么是钩子方法呢? 钩子方法是一种被声明在抽象类中的方法,一般使用 protected
关键字修饰,它可以是空方法(由子类实现),也可以是默认实现的方法。模板设计模式通过钩子方法控制固定步骤的实现。
1 | //独占方式。尝试获取资源,成功则返回true,失败则返回false。 |
以上钩子方法的默认实现会抛出UnsupportedOperationException异常。除了这些钩子方法外,AQS类中的其他方法都是final类型的方法,所以无法被其他类继承,只有这几个方法可以被其他类继承。
自定义同步器
通过AQS实现一把简单的独占锁,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):
- 自定义的同步器继承
AbstractQueuedSynchronizer
。 - 重写 AQS 暴露的模板方法。
简单的独占锁的实现
使用SimpleMockLock实现基于AQS的、简单的非公平独占锁,代码如下:
1 | public abstract class SimpleMockLock implements Lock { |
1 | private static int count = 0; |
AQS锁抢占的原理
AbstractQueuedSynchronizer的实现非常精巧,令人叹为观止,不入细节难以完全领会其精髓。下面基于SimpleMockLock公平独占锁的抢占过程详细说明AQS锁抢占的原理。
1 | // 显式锁的抢占方法 |
AQS模板方法:acquire(arg)
AQS 中以独占模式获取资源的入口方法是 acquire()
,如下:
1 | public final void acquire(int arg) { |
在 acquire()
中,线程会先尝试获取共享资源;如果获取失败,会将线程封装为 Node 节点加入到 AQS 的等待队列中;加入队列之后,会让等待队列中的线程尝试获取资源,并且会对线程进行阻塞操作。分别对应以下三个方法:
tryAcquire()
:尝试获取锁(模板方法),AQS
不提供具体实现,由子类实现。addWaiter()
:如果获取锁失败,会将当前线程封装为 Node 节点加入到 AQS 的 CLH 变体队列中等待获取锁。acquireQueued()
:对线程进行阻塞、唤醒,并调用tryAcquire()
方法让队列中的线程尝试获取锁。
钩子实现:tryAcquire(arg)
SimpleMockLock的tryAcquire()流程是:CAS操作state字段,将其值从0改为1,若成功,则表示锁未被占用,可成功占用,并且返回
true;若失败,则获取锁失败,返回false。
1 | // AQS |
SimpleMockLock的实现非常简单,是不可以重入的,仅仅为了学习AQS而编写。如果是可以重入的锁,在重复抢锁时会累计state字段值,表示重入锁的次数,具体可参考ReentrantLock源码。
1 | // ReentrantLock |
- 通过
CAS
更新state
变量。state == 0
表示资源没有被占用。state > 0
表示资源被占用,此时state
表示重入次数。 - 通过
setExclusiveOwnerThread()
设置持有资源的线程。
如果线程更新 state
变量成功,就表明获取到了资源, 因此将持有资源的线程设置为当前线程即可。
直接入队:addWaiter
在acquire模板方法中,如果钩子方法tryAcquire尝试获取同步状态失败的话,就构造同步节点(独占式节点模式为Node.EXCLUSIVE),通过addWaiter(Node node,int args)
方法将该节
点加入同步队列的队尾。
1 | // AQS |
在 addWaiter()
方法中,需要执行 Node 节点 入队 的操作。由于是在多线程环境下,因此需要通过 CAS
操作保证并发安全。通过 CAS
操作去更新 tail
指针指向新入队的 Node 节点,CAS
可以保证只有一个线程会成功修改 tail
指针,以此来保证 Node 节点入队时的并发安全。
自旋入队:enq
执行 addWaiter()
时,如果发现 pred == null
,即 tail
指针为 null,则证明队列没有初始化,需要调用 enq()
方法初始化队列,并将 Node
节点加入到初始化后的队列中,代码如下:
1 | // AQS |
自旋抢占:acquireQueued()
在 acquire()
方法中,通过 addWaiter()
方法将 Node
节点加入队列之后,就会调用 acquireQueued()
方法。在节点入队之后,启动自旋抢锁的流程。acquireQueued()
方法的主要逻辑:当前Node节点线程在死循环中不断获取同步状态,并且不断在前驱节点上自旋,只有当前驱节点是头节点时才能尝试获取锁,原因是:
- 头节点是成功获取同步状态(锁)的节点,而头节点的线程释放了同步状态以后,将会唤醒其后继节点,后继节点的线程被唤醒后要检查自己的前驱节点是否为头节点。
- 维护同步队列的FIFO原则,节点进入同步队列之后,就进入了自旋的过程,每个节点都在不断地执行for死循环。
1 | // AQS:令队列中的节点尝试获取锁,并且对线程进行阻塞。 |
为了不浪费资源,acquireQueued()
自旋过程中会阻塞线程,等待被前驱节点唤醒后才启动循环。如果成功就返回,否则执行shouldParkAfterFailedAcquire()
、parkAndCheckInterrupt()
来达到阻塞的效果。
调用acquireQueued()
方法的线程一定是node所绑定的线程(由它的thread属性所引用),该线程也是最开始调用lock()方法抢锁的那个线程,在acquireQueued()
的死循环中,该线程可能重复进行阻塞和被唤醒。
在 acquireQueued()
方法中,主要做两件事情:
尝试获取资源: 在
acquireQueued()
方法中,尝试获取资源总共有 2 个步骤:p == head
:表明当前节点的前继节点为head
节点。此时当前节点为 AQS 队列中的第一个等待节点。tryAcquire(arg) == true
:表明当前线程尝试获取资源成功。- 在成功获取资源之后,就需要将当前线程的节点 从等待队列中移除 。移除操作为:将当前等待的线程节点设置为
head
节点(head
节点是虚拟节点,并不参与排队获取资源)。
阻塞当前线程
在
AQS
中,当前节点的唤醒需要依赖于上一个节点。如果上一个节点取消获取锁,它的状态就会变为CANCELLED
,CANCELLED
状态的节点没有获取到锁,也就无法执行解锁操作对当前节点进行唤醒。因此在阻塞当前线程之前,需要跳过CANCELLED
状态的节点。
挂起预判:shouldParkAfterFailedAcquire()
acquireQueued()
自旋在阻塞自己的线程之前会进行挂起预判。shouldParkAfterFailedAcquire()
方法的主要功能是:将当前节点的有效前驱节点(是指有效节点不是CANCELLED类型的节点)找到,并且将有效前驱节点的状态设置为SIGNAL,之后返回true代表当前线程可以马上被阻塞了。具体可以分为三种情况:
- 如果发现前继节点的状态是
-1 SIGNAL
,则可以阻塞当前线程。- 说明前驱的等待标志已设好,返回true表示设置完毕。
- 如果发现前继节点的状态是
1 CANCELLED
,则需要跳过1 CANCELLED
状态的节点- 说明前驱节点本身不再等待了,需要跨越这些节点,然后找到一个有效节点,再把当前节点和这个有效节点的唤醒关系建立好:调整前驱节点的next指针为自己。
- 如果发现前继节点的状态不是
SIGNAL
和CANCELLED
,表明前继节点的状态处于正常等待资源的状态,因此将前继节点的状态设置为SIGNAL
,表明该前继节点需要对后续节点进行唤醒。- 如果是其他情况:
-3(PROPAGATE,共享锁等待)
、-2(CONDITION,条件等待)
、0(初始状态)
,那么通过CAS尝试设置前驱节点为SIGNAL,表示只要前驱节点释放锁,当前节点就可以抢占锁了。
- 如果是其他情况:
1 | // AQS:判断当前线程节点是否可以阻塞。 |
在独占锁的场景中,shouldParkAfterFailedAcquire()
方法是在acquireQueued()
方法的死循环中被调用的,由于此方法返回false时acquireQueued()
不会阻塞当前线程,只有此方法返回true时当前线程才阻塞,因此在一般情况下,此方法至少需要执行两次,当前线程才会被阻塞。
线程挂起:parkAndCheckInterrupt()
当判断当前线程可以阻塞之后,通过调用 parkAndCheckInterrupt()
方法来阻塞当前线程。内部使用了 LockSupport
来实现阻塞。LockSupoprt
底层是基于 Unsafe
类来阻塞线程,代码如下:
1 | // AQS |
当线程被唤醒之后,需要执行
Thread.interrupted()
来返回线程的中断状态,这是为什么呢?
这个和线程的中断协作机制有关系,线程被唤醒之后,并不确定是被中断唤醒,还是被 LockSupport.unpark()
唤醒,因此需要通过线程的中断状态来判断。
AbstractQueuedSynchronizer会把所有的等待线程构成一个阻塞等待队列,当一个线程执行完lock.unlock()
时,会激活其后继节点,通过调用LockSupport.unpark(postThread)
完成后继线程的唤醒。
selfInterrupt()
在 acquire()
方法中,当 if
语句的条件返回 true
后,就会调用 selfInterrupt()
,该方法会中断当前线程,为什么需要中断当前线程呢?
1 | static void selfInterrupt() { |
- 当
if
判断为true
时,需要tryAcquire()
返回false
,并且acquireQueued()
返回true
。 - 其中
acquireQueued()
方法返回的是线程被唤醒之后的 中断状态 ,通过执行Thread.interrupted()
来返回。该方法在返回中断状态的同时,会清除线程的中断状态。 - 因此如果
if
判断为true
,表明线程的中断状态为true
,但是调用Thread.interrupted()
之后,线程的中断状态被清除为false
,因此需要重新执行selfInterrupt()
来重新设置线程的中断状态。
AQS锁释放的原理
下面基于SimpleMockLock公平独占锁的释放流程详细说明AQS锁释放的原理。
1 | // 显式锁的释放方法 |
AQS模板方法:release()
这段代码逻辑比较简单,如果同步状态的钩子方法执行成功(tryRelease返回true),就会执行if块中的代码,当head指向的头节点不为null,并且该节点的状态值不为0时才会执行unparkSuccessor()
方法。钩子方法tryRelease()尝试释放当前线程持有的资源,由子类提供具体的实现。
1 | // AQS |
钩子实现:tryRelease()
tryRelease()方法是需要子类提供实现的一个钩子方法,需要子类根据具体业务进行具体的实现。SimpleMockLock的钩子实现如下:核心逻辑是设置同步状态state的值为0,方便后继节点执行抢
占。
1 | // 钩子方法 |
ReentrantLock在 tryRelease()
方法中,会先计算释放锁之后的 state
值,判断 state
值是否为 0。
如果
state == 0
,表明该线程没有重入次数了,更新free = true
,并修改持有资源的线程为 null,表明该线程完全释放这把锁。如果
state != 0
,表明该线程还存在重入次数,因此不更新free
值,free
值为false
表明该线程没有完全释放这把锁。之后更新state
值,并返回free
值,free
值表明线程是否完全释放锁。
1 | // ReentrantLock |
唤醒后继:unparkSuccessor()
如果 tryRelease()
返回 true
,表明线程已经没有重入次数了,锁已经被完全释放,因此需要唤醒后继节点。在唤醒后继节点之前,需要判断是否可以唤醒后继节点,判断条件为: h != null && h.waitStatus != 0
。这里解释一下为什么要这样判断:
h == null
:表明head
节点还没有被初始化,也就是 AQS 中的队列没有被初始化,因此无法唤醒队列中的线程节点。h != null && h.waitStatus == 0
:表明头节点刚刚初始化完毕(节点的初始化状态为 0),后继节点线程还没有成功入队,因此不需要对后续节点进行唤醒。(当后继节点入队之后,会将前继节点的状态修改为SIGNAL
,表明需要对后继节点进行唤醒)h != null && h.waitStatus != 0
:其中waitStatus
有可能大于 0,也有可能小于 0。其中> 0
表明节点已经取消等待获取资源,< 0
表明节点处于正常等待状态。
接下来进入 unparkSuccessor()
方法查看如何唤醒后继节点:
1 | // AQS:这里的入参 node 为队列的头节点(虚拟头节点) |
在
unparkSuccessor()
中,如果头节点的状态< 0
(在正常情况下,只要有后继节点,头节点的状态应该为SIGNAL
,即 -1),表示需要对后继节点进行唤醒,因此这里提前清除头节点的状态标识,将状态修改为 0,表示已经执行了对后续节点唤醒的操作。如果
s == null
或者s.waitStatus > 0
,表明后继节点异常,此时不能唤醒异常节点,而是要找到正常状态的节点进行唤醒。因此需要从
tail
指针向前遍历,来找到第一个状态正常(waitStatus <= 0
)的节点进行唤醒。
unparkSuccessor()
唤醒后继节点的线程后,后继节点的线程重新执行方法acquireQueued()
中的自旋抢占逻辑。
tail
指针向前遍历原因
为什么要从
tail
指针向前遍历,而不是从head
指针向后遍历,寻找正常状态的节点呢?
遍历的方向和 节点的入队操作 有关。入队方法如下:
1 | // AQS:节点入队方法 |
在 addWaiter()
方法中,node
节点入队需要修改 node.prev
和 pred.next
两个指针,但是这两个操作并不是 原子操作 ,先修改了 node.prev
指针,之后才修改 pred.next
指针。在极端情况下,可能会出现 head
节点的下一个节点状态为 CANCELLED
,此时新入队的节点仅更新了 node.prev
指针,还未更新 pred.next
指针,如下图:
这样如果从 head
指针向后遍历,无法找到新入队的节点,因此需要从 tail
指针向前遍历找到新入队的节点。
图解 AQS 工作原理
这里基于 ReentrantLock
来画图进行讲解。
设总共有 3 个线程尝试获取锁,线程分别为
T1
、T2
和T3
。此时,假设线程
T1
先获取到锁,线程T2
排队等待获取锁。在线程T2
进入队列之前,需要对 AQS 内部队列进行初始化。head
节点在初始化后状态为0
。AQS 内部初始化后的队列如下图:
此时,线程
T2
尝试获取锁。由于线程T1
持有锁,因此线程T2
会进入队列中等待获取锁。同时会将前继节点(head
节点)的状态由0
更新为SIGNAL
,表示需要对head
节点的后继节点进行唤醒。此时,AQS 内部队列如下图所示:此时,线程
T3
尝试获取锁。由于线程T1
持有锁,因此线程T3
会进入队列中等待获取锁。同时会将前继节点(线程T2
节点)的状态由0
更新为SIGNAL
,表示线程T2
节点需要对后继节点进行唤醒。此时,AQS 内部队列如下图所示:此时,假设线程
T1
释放锁,会唤醒后继节点T2
。线程T2
被唤醒后获取到锁,并且会从等待队列中退出。- 这里线程
T2
节点退出等待队列并不是直接从队列移除,而是令线程T2
节点成为新的head
节点,以此来退出资源获取的等待。此时 AQS 内部队列如下所示:
- 这里线程
此时,假设线程
T2
释放锁,会唤醒后继节点T3
。线程T3
获取到锁之后,同样也退出等待队列,即将线程T3
节点变为head
节点来退出资源获取的等待。此时 AQS 内部队列如下所示:
ReentrantLock的抢锁流程
非公平抢占的钩子方法:tryAcquire(arg)
1 | static final class NonfairSync extends Sync { |
非公平同步器ReentrantLock.NonfairSync
的核心思想是当前线程尝试获取锁的时候,如果发现锁的状态位是0,就直接尝试将锁拿过来,然后执行setExclusiveOwnerThread()
,根本不管同步队列中的排队节点。
公平抢占的钩子方法:tryAcquire(arg)
1 | static final class FairSync extends Sync { |
公平抢占的钩子方法中,首先判断是否有后继节点,如果有后继节点,并且当前线程不是锁的占有线程,钩子方法就返回false,模板方法会进入排队的执行流程,可见公平锁是真正公平的。
是否有后继节点的判断
head节点是获取到锁的节点,但是任意时刻head节点可能占用着锁,也可能释放了锁,如果释放了锁,那么此时state=0,未被阻塞的head.next
节点对应的线程在任意时刻都是在自旋地尝试获取锁。FairSync进行是否有后继节点的判断代码如下:
1 | public final boolean hasQueuedPredecessors() { |
hasQueuedPredecessors的执行场景大致如下:
- 当
h!=t
不成立的时候,说明h头节点、t尾节点要么是同一个节点,要么都是null,此时hasQueuedPredecessors()
返回false,表示没有后继节点。 - 当
h!=t
成立的时候,进一步检查head.next
是否为null,如果为null,就返回true。什么情况下h!=t
,同时h.next==null
呢?- 有其他线程第一次正在入队时可能会出现。其他线程执行AQS的
enq()
方法,compareAndSetHead(node)
完成,还没执行tail=head
语句时,此时t=null、head=new Node()、head.next=null
。
- 有其他线程第一次正在入队时可能会出现。其他线程执行AQS的
- 如果
h!=t
成立,head.next != null
,判断head.next
是不是当前线程,如果是就返回false,否则返回true。
AQS条件队列
Condition是JUC用来替代传统Object的wait()/notify()
线程间通信与协作机制的新组件,相比调用Object的wait()/notify()
,调用Condition的await()/signal()
这种方式实现线程间协作更加高效。
- Condition的await()方法会将线程包装为等待节点,加入等待队列中,并将AQS同步队列中的节点移除,接着不断检查
isOnSyncQueue(Node node)
,如果在等待队列中,就一直等着,如果signal将它移到AQS队列中,则退出循环。 - Condition的signal()方法则是先检查当前线程是否获取了锁,接着将等待队列中的节点通过Node的操作直接加入AQS队列。线程并不会立即获取到资源,从while循环退出后,会通过acquireQueued方法加入获取同步状态的竞争中。
AQS,Lock,Condition,ConditionObject
之间的关系:ConditionObject是AQS的内部类,实现了Condition接口,Lock中提供newCondition()方法,委托给内部AQS的实现Sync来创建ConditionObject对象,享受AQS对Condition的支持。
1 | // ReentrantLock#newCondition |
Condition基本原理
Condition与Object的wait()/notify()
作用是相似的,都是使得一个线程等待某个条件,只有当该条件具备signal()
或者signalAll()
方法被调用时等待线程才会被唤醒,从而重新争夺锁。不同的是,Object的wait()/notify()由JVM底层实现,而Condition接口与实现类完全使用Java代码实现。当需要进行线程间的通信时,建议结合使用ReetrantLock与Condition,通过Condition的await()和signal()方法进行线程间的阻塞与唤醒。
ConditionObject类是实现条件队列的关键,每个ConditionObject对象都维护一个单独的条件等待队列。每个ConditionObject对应一个条件队列,它记录该队列的头节点和尾节点。
1 | // AQS中Node类与Condition相关的字段: |
条件队列为单向列表,只有指向下一个节点的引用;没有被唤醒的节点全部存储在条件队列上。下图描述的是一个长度为 5 的条件队列,即有5个线程执行了await()
方法;与阻塞队列不同,条件队列没有常驻内存的“head结点”,且一个处于正常状态节点的waitStatus
为 -2 condition
。当有新节点加入时,将会追加至队列尾部
在一个显式锁上,我们可以创建多个等待任务队列,这点和内置锁不同,Java内置锁上只有唯一的一个等待队列。比如,我们可以调用newCondition()
创建两个等待队列,具体如下:
1 | private Lock lock = new ReentrantLock(); |
Condition条件队列是单向的,而AQS同步队列是双向的,AQS节点会有前驱指针。一个AQS实例可以有多个条件队列,是聚合关系;但是一个AQS实例只有一个同步队列,是逻辑上的组合关系。
await()等待方法原理
当线程调用await()方法时,说明当前线程的节点为当前AQS队列的头节点,正好处于占有锁的状态,await()方法需要把该线程从AQS队列挪到Condition等待队列里
在await()
方法将当前线程挪动到Condition等待队列后,还会唤醒AQS同步队列中head节点的下一个节点。await()方法的核心代码如下:
1 | public final void await() throws InterruptedException { |
addConditionWaiter
创建一个新节点并放入Condition队列尾部的工作由addConditionWaiter()
方法完成,该方法具体如下:
- 首先判断条件队列的尾节点是否被取消了,这里用
last.ws != CONDITION
来判断,如果是的话,就需要从头到尾遍历,消除被不是condition的节点。 - 接着将当前线程包装为Node,指定ws为CONDITION。
1 | private Node addConditionWaiter() { |
unlinkCancelledWaiters
unlinkCancelledWaiters用于清除队列中已经取消等待的节点。
1 | private void unlinkCancelledWaiters() { |
fullyRelease
将节点加入等待队列中后,就需要完全释放线程拥有的独占锁了,完全释放针对重入锁的情况。
我们看到这个方法返回了一个savedState变量,简单的理解就是保存状态。我们知道重入锁的state由重入的次数,如果一个state为N,我们可以认为它持有N把锁。
await()
方法必须将state置0,也就是完全释放锁,后面的线程才能获取到这把锁,置0之后,我们需要用个变量标记一下,也就是这里的savedState。这样它被重新唤醒的时候,我们就知道,他需要获取savedState把锁。
1 | final int fullyRelease(Node node) { |
等待进入阻塞队列
完全释放锁之后,将会来到这几步,如果这个节点的线程不在同步队列中,说明该线程还不具备竞争锁的资格,将被一直挂起,这里的同步队列指的是AQS的阻塞队列。
1 | int interruptMode = 0; |
isOnSyncQueue
判断节点是不是已经到阻塞队列中了,如果是的话,就直接返回true
1 | final boolean isOnSyncQueue(Node node) { |
findNodeFromTail
从阻塞队列的尾部向前遍历,如果找到这个node,表示它已经在了,那就返回true。
1 | private boolean findNodeFromTail(Node node) { |
signal()唤醒方法原理
线程在某个ConditionObject对象上调用signal()
方法后,等待队列中的firstWaiter
会被加入同步队列中,等待节点被唤醒
1 | public final void signal() { |
doSignal
doSignal()
方法会从头到尾遍历条件队列,找到需要移到同步队列的节点。
- 这里的while循环表示,如果first没有转移成功,就接着判断first后面的节点是不是需要转移。
1 | private void doSignal(Node first) { |
transferForSignal
transferForSignal
该方法将节点从条件队列转移到阻塞队列。
1 | final boolean transferForSignal(Node node) { |
整体流程如下:
- 通过enq()方法自旋(该方法已经介绍过)将条件队列中的头节点放入AQS同步队列尾部,并获取它在AQS队列中的前驱节点。
- 如果前驱节点的状态是取消状态,或者设置前驱节点为Signal状态失败,就唤醒当前节点的线程;否则节点在同步队列的尾部,参与排队。
- 同步队列中的线程被唤醒后,表示重新获取了显式锁,然后继续执行condition.await()语句后面的临界区代码。
唤醒线程
LockSupport.park(this)
挂起的线程是什么时候唤醒的:
- signal方法将节点转移到同步队列中,且获取到了锁或者对前驱节点的cas操作失败,调用了
LockSupport.unpark(node.thread);
方法。 - 在park的时候,另外一个线程对挂起的线程进行了中断。
一旦signal之后,节点被成功转移到同步队列后,这时下面这个循环就会退出了,继续回到这里:
1 | int interruptMode = 0; |
interruptMode
1 | /** await 返回的时候,需要重新设置中断状态 */ |
checkInterruptWhileWaiting
该方法用于判断该线程是否在挂起期间发生了中断。
1 | private int checkInterruptWhileWaiting(Node node) { |
transferAfterCancelledWait
该方法判断何时中断,是否在signal之前。
1 | final boolean transferAfterCancelledWait(Node node) { |
处理中断状态
1 | // 第一部分 |
第一部分
signal唤醒的线程并不会立即获取到资源,从while循环退出后,会通过acquireQueued方法加入获取同步状态的竞争中。
1 | final boolean acquireQueued(final Node node, int arg) { |
acquireQueued方法返回时,表示已经获取到了锁,且返回的是interrupted值,如果返回true,表示已经被中断。
接着判断interruptMode != THROW_IE
表示是在signal之后发生的中断,需要重新中断当前线程,将interruptMode设置为REINTERRUPT。
第二部分
1 | // 第二部分 |
前面说了,signal会将节点移到同步队列中,最后一步需要和条件队列断开关系,也就是:node.nextWaiter = null
,但这是想象中比较正常的情况,如果在signal之前被中断,节点也会被加入同步队列中,这时其实是没有调用这个断开关系的。
因此这边做一点处理,unlinkCancelledWaiters()
逻辑上面也说过了,可以回过头去看看,主要是清除队列中已经取消等待的节点。
第三部分
最后一个部分,就是对两种interruptMode的情况进行处理,看看代码就知道了:
1 | private void reportInterruptAfterWait(int interruptMode) |
AQS的实际应用
AQS建立在CAS原子操作和volatile可见性变量的基础之上,为上层的显式锁、同步工具类、阻塞队列、线程池、并发容器、Future异步工具提供线程之间同步的基础设施。所以,AQS在JUC框架中的使用是非常广泛的。