前面介绍的在争用激烈的场景下,使用基于CAS自旋实现的轻量级锁有两个大的问题:
(1)CAS恶性空自旋会浪费大量的CPU资源。
(2)在SMP架构的CPU上会导致“总线风暴”。

解决CAS恶性空自旋的有效方式之一是以空间换时间,较为常见的方案有两种:分散操作热点和使用队列削峰。JUC并发包使用的是队列削峰的方案解决CAS的性能问题,并提供了一个基于双向队列的削峰基类——抽象基础类AbstractQueuedSynchronizer(抽象同步器类,简称为AQS)。

锁与队列的关系

无论是单体服务应用内部的锁,还是分布式环境下多体服务应用所使用的分布式锁,为了减少由于无效争夺导致的资源浪费和性能恶化,一般都基于队列进行排队与削峰。

  • CLH锁的内部队列

    • CLH自旋锁使用的CLH(Craig,Landin,andHagersten Lock Queue)是一个单向队列,也是一个FIFO队列。在独占锁中,竞争资源在一个时间点只能被一个线程锁访问,队列头部的
      节点表示占有锁的节点,新加入的抢锁线程则需要等待,会插入队列的尾部。

    CLH 锁的队列结构

  • 分布式锁的内部队列

    • 在分布式锁的实现中,比较常见的是基于队列的方式进行不同节点中“等锁线程”的统一调度和管理。
  • AQS的内部队列

    • AQS是JUC提供的一个用于构建锁和同步容器的基础类。JUC包内许多类都是基于AQS构建的,例如ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock、FutureTask等。AQS解决了在实现同步容器时设计的大量细节问题。AQS是CLH队列的一个变种,主要原理和CLH队列差不多,这也是前面对CLH队列进行长篇大论介绍的原因。
    • AQS队列内部维护的是一个FIFO的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的前驱节点和直接的后继节点。所以双向链表可以从任意一个节点开始很方便地访问前驱节点和后继节点。每个节点其实是由线程封装的,当线程争抢锁失败后会封装成节点加入AQS队列中;当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。

    CLH 变体队列结构

AQS的核心成员

AQS出于“分离变与不变”的原则,基于模板模式实现。AQS为锁获取、锁释放的排队和出队过程提供了一系列的模板方法。由于JUC的显式锁种类丰富,因此AQS将不同锁的具体操作抽取为钩子方法,供各种锁的子类(或者其内部类)去实现。

状态标志位

AQS中维持了一个单一的volatile修饰的状态信息state,AQS使用int类型的state标示锁的状态,可以理解为锁的同步状态。

1
2
//同步状态,使用 volatile保证线程可见
private volatile int state;

state因为使用volatile保证了操作的可见性,所以任何线程通过getState()获得状态都可以得到最新值。AQS提供了getState()setState()来获取和设置同步状态,具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 获取同步的状态
protected final int getState() {
return state;
}
// 设置同步的状态
protected final void setState(int newState) {
state = newState;
}

//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

由于setState()无法保证原子性,因此AQS给我们提供了compareAndSetState()方法利用底层UnSafe的CAS机制来实现原子性。 compareAndSetState()方法实际上调用的是unsafe成员的compareAndSwapInt()方法。

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程执行该锁的lock()操作时,会调用tryAcquire()独占该锁并将state加1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(释放锁)为止,其他线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么
次,这样才能保证state能回到零态。

AbstractQueuedSynchronizer继承了AbstractOwnableSynchronizer,这个基类只有一个变量叫exclusiveOwnerThread,表示当前占用该锁的线程,并且提供了相应
的get()和set()方法,具体如下:

1
2
3
4
5
6
7
 public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {

//表示当前占用该锁的线程
private transient Thread exclusiveOwnerThread;
// 省略get/set方法
}
}

队列节点Node类

AQS是一个虚拟队列,不存在队列实例,仅存在节点之间的前后关系。节点类型通过内部类Node定义,其核心的成员如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
static final class Node {
// 标记一个结点(对应的线程)在共享模式下等待
static final Node SHARED = new Node();
// 标记一个结点(对应的线程)在独占模式下等待
static final Node EXCLUSIVE = null;

// waitStatus的值,表示该结点(对应的线程)已被取消
static final int CANCELLED = 1;
// waitStatus的值,表示后继结点(对应的线程)需要被唤醒
static final int SIGNAL = -1;
// waitStatus的值,表示该结点(对应的线程)在等待某一条件
static final int CONDITION = -2;
/*waitStatus的值,表示有资源可用,新head结点需要继续唤醒后继结点(共享模式下,多线程并发释放资源,而head唤醒其后继结点后,需要把多出来的资源留给后面的结点;设置新的head结点时,会继续唤醒其后继结点)*/
static final int PROPAGATE = -3;

// 等待状态,取值范围,-3,-2,-1,0,1
volatile int waitStatus;
volatile Node prev; // 前驱结点
volatile Node next; // 后继结点
volatile Thread thread; // 结点对应的线程
Node nextWaiter; // 等待队列里下一个等待条件的结点


// 判断共享模式的方法
final boolean isShared() {
return nextWaiter == SHARED;
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

// 其它方法忽略,可以参考具体的源码
}

// AQS里面的addWaiter私有方法
private Node addWaiter(Node mode) {
// 使用了Node的这个构造函数
Node node = new Node(Thread.currentThread(), mode);
// 其它代码省略
}

Node 节点 waitStatus 状态含义

Node 节点状态 含义
CANCELLED 1 表示线程已经取消获取锁。线程在等待获取资源时被中断、等待资源超时会更新为该状态。
SIGNAL -1 表示后继节点需要当前节点唤醒。在当前线程节点释放锁之后,需要对后继节点进行唤醒。
CONDITION -2 表示节点在等待 Condition。当其他线程调用了 Condition 的 signal() 方法后,节点会从等待队列转移到同步队列中等待获取资源。
PROPAGATE -3 用于共享模式,在共享模式下,前继节点不仅会唤醒后继节点,同时也可能会唤醒后继节点的后继节点。
0 加入队列的新节点的初始状态。

如果 waitStatus > 0 ,表明节点的状态已经取消等待获取资源。

如果 waitStatus < 0 ,表明节点的处于有效的等待状态。

因此在 AQS 的源码中,经常使用 > 0< 0 来对 waitStatus 进行判断。

FIFO双向同步队列

AQS的内部队列是CLH队列的变种,每当线程通过AQS获取锁失败时,线程将被封装成一个Node节点,通过CAS原子操作插入队列尾部。
当有线程释放锁时,AQS会尝试让队头的后继节点占用锁。AQS通过内置的FIFO双向队列来完成线程的排队工作,内部通过节点head和tail记录队首和队尾元素,元素的节点类型为Node类型,具
体如下:

1
2
3
4
/*首节点的引用*/
private transient volatile Node head;
/*尾节点的引用*/
private transient volatile Node tail;

AQS的首节点和尾节点都是懒加载的。懒加载的意思是在需要的时候才真正创建。只有在线程竞争失败的情况下,有新线程加入同步队列时,AQS才创建一个head节点。head节点只能被setHead()方法修改,并且节点的waitStatus不能为CANCELLED。尾节点只在有新线程阻塞时才被创建。

AQS 资源共享方式

AQS 定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

一般来说,自定义同步器的共享方式要么是独占,要么是共享,他们也只需实现tryAcquire-tryReleasetryAcquireShared-tryReleaseShared中的一种即可。但 AQS 也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock

AQS中的钩子方法

什么是钩子方法呢? 钩子方法是一种被声明在抽象类中的方法,一般使用 protected 关键字修饰,它可以是空方法(由子类实现),也可以是默认实现的方法。模板设计模式通过钩子方法控制固定步骤的实现。

1
2
3
4
5
6
7
8
9
10
//独占方式。尝试获取资源,成功则返回true,失败则返回false。
protected boolean tryAcquire(int)
//独占方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryRelease(int)
//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected int tryAcquireShared(int)
//共享方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryReleaseShared(int)
//该线程是否正在独占资源。只有用到condition才需要去实现它。
protected boolean isHeldExclusively()

以上钩子方法的默认实现会抛出UnsupportedOperationException异常。除了这些钩子方法外,AQS类中的其他方法都是final类型的方法,所以无法被其他类继承,只有这几个方法可以被其他类继承。

自定义同步器

通过AQS实现一把简单的独占锁,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):

  1. 自定义的同步器继承 AbstractQueuedSynchronizer
  2. 重写 AQS 暴露的模板方法。

简单的独占锁的实现

使用SimpleMockLock实现基于AQS的、简单的非公平独占锁,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public abstract class SimpleMockLock implements Lock {
// 同步器实例
private final Sync sync = new Sync();

// 自定义的内部类:同步器
// 直接使用 AbstractQueuedSynchronizer.state 值表示锁的状态

// AbstractQueuedSynchronizer.state=1 表示锁没有被占用
// AbstractQueuedSynchronizer.state=0 表示锁没已经被占用
private static class Sync extends AbstractQueuedSynchronizer {
// 钩子方法
protected boolean tryAcquire(int arg) {
// CAS更新状态值为1
if (compareAndSetState(0, 1)) {

setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

// 钩子方法
protected boolean tryRelease(int arg) {
// 如果当前线程不是占用锁的线程
if (Thread.currentThread() != getExclusiveOwnerThread()) {
// 抛出非法状态的异常
throw new IllegalMonitorStateException();
}
// 如果锁的状态为没有占用
if (getState() == 0) {
// 抛出非法状态的异常
throw new IllegalMonitorStateException();
}
// 接下来不需要使用CAS操作,因为下面的操作不存在并发场景
setExclusiveOwnerThread(null);
// 设置状态
setState(0);
return true;
}
}

// 显式锁的抢占方法
@Override
public void lock() {
// 委托给同步器的acquire()抢占方法
sync.acquire(1);
}

// 显式锁的释放方法
@Override
public void unlock() {
// 委托给同步器的release()释放方法
sync.release(1);
}
// 省略其他未实现的方法
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private static int count = 0;

public static void main(String[] args) throws InterruptedException {
int sum = 0;
// 每个线程的执行轮数
final int TURNS = 1000;
// 线程数
final int THREADS = 10;

// 线程池,用于多线程模拟测试
ExecutorService pool = Executors.newFixedThreadPool(THREADS);

// 自定义的独占锁
Lock lock = new SimpleMockLock();

// 倒数闩
CountDownLatch countDownLatch = new CountDownLatch(THREADS);
long start = System.currentTimeMillis();

// 10个线程并发执行
for (int i = 0; i < THREADS; i++) {
pool.submit(() -> {
// 调用自定义的独占锁的加锁方法
lock.lock();
try {
for (int j = 0; j < TURNS; j++) {
// 执行一次累加 1000 次
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 调用自定义的独占锁的释放方法
lock.unlock();
}
// 线程执行完成,倒数闩减少一次
countDownLatch.countDown();
});
}
countDownLatch.await();
// 省略等待并发执行完成、结果输出的代码
System.out.println("count=" + count);
}

AQS锁抢占的原理

AbstractQueuedSynchronizer的实现非常精巧,令人叹为观止,不入细节难以完全领会其精髓。下面基于SimpleMockLock公平独占锁的抢占过程详细说明AQS锁抢占的原理。

1
2
3
4
5
6
// 显式锁的抢占方法
@Override
public void lock() {
// 委托给同步器的acquire()抢占方法
sync.acquire(1);
}

AQS模板方法:acquire(arg)

AQS 中以独占模式获取资源的入口方法是 acquire() ,如下:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

acquire() 中,线程会先尝试获取共享资源;如果获取失败,会将线程封装为 Node 节点加入到 AQS 的等待队列中;加入队列之后,会让等待队列中的线程尝试获取资源,并且会对线程进行阻塞操作。分别对应以下三个方法:

  • tryAcquire() :尝试获取锁(模板方法),AQS 不提供具体实现,由子类实现。
  • addWaiter() :如果获取锁失败,会将当前线程封装为 Node 节点加入到 AQS 的 CLH 变体队列中等待获取锁。
  • acquireQueued() :对线程进行阻塞、唤醒,并调用 tryAcquire() 方法让队列中的线程尝试获取锁。

钩子实现:tryAcquire(arg)

SimpleMockLock的tryAcquire()流程是:CAS操作state字段,将其值从0改为1,若成功,则表示锁未被占用,可成功占用,并且返回
true;若失败,则获取锁失败,返回false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// AQS
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

// 钩子方法
protected boolean tryAcquire(int arg) {
// CAS更新状态值为1
if (compareAndSetState(0, 1)) {

setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

SimpleMockLock的实现非常简单,是不可以重入的,仅仅为了学习AQS而编写。如果是可以重入的锁,在重复抢锁时会累计state字段值,表示重入锁的次数,具体可参考ReentrantLock源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// ReentrantLock
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 1、获取 AQS 中的 state 状态
int c = getState();
// 2、如果 state 为 0,证明锁没有被其他线程占用
if (c == 0) {
// 2.1、通过 CAS 对 state 进行更新
if (compareAndSetState(0, acquires)) {
// 2.2、如果 CAS 更新成功,就将锁的持有者设置为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
// 3、如果当前线程和锁的持有线程相同,说明发生了「锁的重入」
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 3.1、将锁的重入次数加 1
setState(nextc);
return true;
}
// 4、如果锁被其他线程占用,就返回 false,表示获取锁失败
return false;
}
  • 通过 CAS 更新 state 变量。state == 0 表示资源没有被占用。state > 0 表示资源被占用,此时 state 表示重入次数。
  • 通过 setExclusiveOwnerThread() 设置持有资源的线程。

如果线程更新 state 变量成功,就表明获取到了资源, 因此将持有资源的线程设置为当前线程即可。

直接入队:addWaiter

在acquire模板方法中,如果钩子方法tryAcquire尝试获取同步状态失败的话,就构造同步节点(独占式节点模式为Node.EXCLUSIVE),通过addWaiter(Node node,int args)方法将该节
点加入同步队列的队尾。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// AQS
private Node addWaiter(Node mode) {
// 1、将当前线程封装为 Node 节点。
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 2、如果 pred != null,则证明 tail 节点已经被初始化,直接将 Node 节点加入队列即可。
if (pred != null) {
node.prev = pred;
// 2.1、通过 CAS 控制并发安全。
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 3、初始化队列,并将新创建的 Node 节点加入队列。
enq(node);
return node;
}

addWaiter() 方法中,需要执行 Node 节点 入队 的操作。由于是在多线程环境下,因此需要通过 CAS 操作保证并发安全。通过 CAS 操作去更新 tail 指针指向新入队的 Node 节点,CAS 可以保证只有一个线程会成功修改 tail 指针,以此来保证 Node 节点入队时的并发安全。

自旋入队:enq

执行 addWaiter() 时,如果发现 pred == null ,即 tail 指针为 null,则证明队列没有初始化,需要调用 enq() 方法初始化队列,并将 Node 节点加入到初始化后的队列中,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// AQS
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 1、通过 CAS 操作保证队列初始化的并发安全,初始化尾节点和头节点为新节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 2、与 addWaiter() 方法中节点入队的操作相同,队列不为空,将新节点插入队列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

/**
* CAS 操作head指针,仅仅被enq()调用
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

/**
* CAS 操作head指针,仅仅被enq()调用
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

自旋抢占:acquireQueued()

acquire() 方法中,通过 addWaiter() 方法将 Node 节点加入队列之后,就会调用 acquireQueued() 方法。在节点入队之后,启动自旋抢锁的流程。acquireQueued()方法的主要逻辑:当前Node节点线程在死循环中不断获取同步状态,并且不断在前驱节点上自旋,只有当前驱节点是头节点时才能尝试获取锁,原因是:

  1. 头节点是成功获取同步状态(锁)的节点,而头节点的线程释放了同步状态以后,将会唤醒其后继节点,后继节点的线程被唤醒后要检查自己的前驱节点是否为头节点。
  2. 维护同步队列的FIFO原则,节点进入同步队列之后,就进入了自旋的过程,每个节点都在不断地执行for死循环。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// AQS:令队列中的节点尝试获取锁,并且对线程进行阻塞。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋检查当前节点的前驱节点是否为头节点,才能获取锁
for (;;) {
// 1、尝试获取锁。
// 获取节点的前驱节点
final Node p = node.predecessor();
// 节点中的线程循环地检查自己的前驱节点是否为 head节点
// 前驱节点是head时,进一步调用子类的tryAcquire(…)实现
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 2、判断线程是否可以阻塞,如果可以,则阻塞当前线程。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 3、如果获取锁失败,就会取消获取锁,将节点状态更新为 CANCELLED。
if (failed)
//取消请求,将当前节点从队列中移除
cancelAcquire(node);
}
}

为了不浪费资源,acquireQueued()自旋过程中会阻塞线程,等待被前驱节点唤醒后才启动循环。如果成功就返回,否则执行shouldParkAfterFailedAcquire()parkAndCheckInterrupt()来达到阻塞的效果。

调用acquireQueued()方法的线程一定是node所绑定的线程(由它的thread属性所引用),该线程也是最开始调用lock()方法抢锁的那个线程,在acquireQueued()的死循环中,该线程可能重复进行阻塞和被唤醒。

acquireQueued() 方法中,主要做两件事情:

  • 尝试获取资源: 在 acquireQueued() 方法中,尝试获取资源总共有 2 个步骤:

    1. p == head :表明当前节点的前继节点为 head 节点。此时当前节点为 AQS 队列中的第一个等待节点。
    2. tryAcquire(arg) == true :表明当前线程尝试获取资源成功。
    3. 在成功获取资源之后,就需要将当前线程的节点 从等待队列中移除 。移除操作为:将当前等待的线程节点设置为 head 节点(head 节点是虚拟节点,并不参与排队获取资源)。
  • 阻塞当前线程

    AQS 中,当前节点的唤醒需要依赖于上一个节点。如果上一个节点取消获取锁,它的状态就会变为 CANCELLEDCANCELLED 状态的节点没有获取到锁,也就无法执行解锁操作对当前节点进行唤醒。因此在阻塞当前线程之前,需要跳过 CANCELLED 状态的节点。

挂起预判:shouldParkAfterFailedAcquire()

acquireQueued()自旋在阻塞自己的线程之前会进行挂起预判。shouldParkAfterFailedAcquire()方法的主要功能是:将当前节点的有效前驱节点(是指有效节点不是CANCELLED类型的节点)找到,并且将有效前驱节点的状态设置为SIGNAL,之后返回true代表当前线程可以马上被阻塞了。具体可以分为三种情况:

  • 如果发现前继节点的状态是-1 SIGNAL ,则可以阻塞当前线程。
    • 说明前驱的等待标志已设好,返回true表示设置完毕。
  • 如果发现前继节点的状态是1 CANCELLED ,则需要跳过 1 CANCELLED 状态的节点
    • 说明前驱节点本身不再等待了,需要跨越这些节点,然后找到一个有效节点,再把当前节点和这个有效节点的唤醒关系建立好:调整前驱节点的next指针为自己。
  • 如果发现前继节点的状态不是 SIGNALCANCELLED ,表明前继节点的状态处于正常等待资源的状态,因此将前继节点的状态设置为 SIGNAL ,表明该前继节点需要对后续节点进行唤醒。
    • 如果是其他情况:-3(PROPAGATE,共享锁等待)-2(CONDITION,条件等待)0(初始状态),那么通过CAS尝试设置前驱节点为SIGNAL,表示只要前驱节点释放锁,当前节点就可以抢占锁了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// AQS:判断当前线程节点是否可以阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 1、前继节点状态正常,直接返回 true 即可。
if (ws == Node.SIGNAL)
return true;
// 2、ws > 0 表示前继节点的状态异常,即为 CANCELLED 状态,需要跳过异常状态的节点。
if (ws > 0) {
do {
// 将pred记录前驱的前驱
// 调整当前节点的prev指针,保持为前驱的前驱
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 调整前驱节点的next指针
pred.next = node;
} else {
// 3、如果前继节点的状态不是 SIGNAL,也不是 CANCELLED,就将状态设置为 SIGNAL。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
// 设置前驱状态之后,此方法返回值还是为false,表示线程不可用,被阻塞
}
return false;
}

在独占锁的场景中,shouldParkAfterFailedAcquire()方法是在acquireQueued()方法的死循环中被调用的,由于此方法返回false时acquireQueued()不会阻塞当前线程,只有此方法返回true时当前线程才阻塞,因此在一般情况下,此方法至少需要执行两次,当前线程才会被阻塞。

线程挂起:parkAndCheckInterrupt()

当判断当前线程可以阻塞之后,通过调用 parkAndCheckInterrupt() 方法来阻塞当前线程。内部使用了 LockSupport 来实现阻塞。LockSupoprt 底层是基于 Unsafe 类来阻塞线程,代码如下:

1
2
3
4
5
6
7
// AQS
private final boolean parkAndCheckInterrupt() {
// 1、线程阻塞到这里 调用park()使线程进入waiting状态
LockSupport.park(this);
// 2、线程被唤醒之后,返回线程中断状态
return Thread.interrupted();
}

当线程被唤醒之后,需要执行 Thread.interrupted() 来返回线程的中断状态,这是为什么呢?

这个和线程的中断协作机制有关系,线程被唤醒之后,并不确定是被中断唤醒,还是被 LockSupport.unpark() 唤醒,因此需要通过线程的中断状态来判断。

AbstractQueuedSynchronizer会把所有的等待线程构成一个阻塞等待队列,当一个线程执行完lock.unlock()时,会激活其后继节点,通过调用LockSupport.unpark(postThread)完成后继线程的唤醒。

selfInterrupt()

acquire() 方法中,当 if 语句的条件返回 true 后,就会调用 selfInterrupt() ,该方法会中断当前线程,为什么需要中断当前线程呢?

1
2
3
4
static void selfInterrupt() {
// 重新设置线程的中断状态。
Thread.currentThread().interrupt();
}
  • if 判断为 true 时,需要 tryAcquire() 返回 false ,并且 acquireQueued() 返回 true
  • 其中 acquireQueued() 方法返回的是线程被唤醒之后的 中断状态 ,通过执行 Thread.interrupted() 来返回。该方法在返回中断状态的同时,会清除线程的中断状态。
  • 因此如果 if 判断为 true ,表明线程的中断状态为 true ,但是调用 Thread.interrupted() 之后,线程的中断状态被清除为 false ,因此需要重新执行 selfInterrupt() 来重新设置线程的中断状态。

AQS锁释放的原理

下面基于SimpleMockLock公平独占锁的释放流程详细说明AQS锁释放的原理。

1
2
3
4
5
6
// 显式锁的释放方法
@Override
public void unlock() {
// 委托给同步器的release()释放方法
sync.release(1);
}

AQS模板方法:release()

这段代码逻辑比较简单,如果同步状态的钩子方法执行成功(tryRelease返回true),就会执行if块中的代码,当head指向的头节点不为null,并且该节点的状态值不为0时才会执行unparkSuccessor()方法。钩子方法tryRelease()尝试释放当前线程持有的资源,由子类提供具体的实现。

1
2
3
4
5
6
7
8
9
10
11
12
// AQS
public final boolean release(int arg) {
// 1、尝试释放锁
if (tryRelease(arg)) {
Node h = head;
// 2、唤醒后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

钩子实现:tryRelease()

tryRelease()方法是需要子类提供实现的一个钩子方法,需要子类根据具体业务进行具体的实现。SimpleMockLock的钩子实现如下:核心逻辑是设置同步状态state的值为0,方便后继节点执行抢
占。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 钩子方法
protected boolean tryRelease(int arg) {
// 如果当前线程不是占用锁的线程
if (Thread.currentThread() != getExclusiveOwnerThread()) {
// 抛出非法状态的异常
throw new IllegalMonitorStateException();
}
// 如果锁的状态为没有占用
if (getState() == 0) {
// 抛出非法状态的异常
throw new IllegalMonitorStateException();
}
// 接下来不需要使用CAS操作,因为下面的操作不存在并发场景
setExclusiveOwnerThread(null);
// 设置状态
setState(0);
return true;
}

ReentrantLock在 tryRelease() 方法中,会先计算释放锁之后的 state 值,判断 state 值是否为 0。

  • 如果 state == 0 ,表明该线程没有重入次数了,更新 free = true ,并修改持有资源的线程为 null,表明该线程完全释放这把锁。

  • 如果 state != 0 ,表明该线程还存在重入次数,因此不更新 free 值,free 值为 false 表明该线程没有完全释放这把锁。之后更新 state 值,并返回 free 值,free 值表明线程是否完全释放锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// ReentrantLock
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 1、判断持有锁的线程是否为当前线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 2、如果 state 为 0,则表明当前线程已经没有重入次数。因此将 free 更新为 true,表明该线程会释放锁。
if (c == 0) {
free = true;
// 3、更新持有资源的线程为 null
setExclusiveOwnerThread(null);
}
// 4、更新 state 值
setState(c);
return free;
}

唤醒后继:unparkSuccessor()

如果 tryRelease() 返回 true ,表明线程已经没有重入次数了,锁已经被完全释放,因此需要唤醒后继节点。在唤醒后继节点之前,需要判断是否可以唤醒后继节点,判断条件为: h != null && h.waitStatus != 0 。这里解释一下为什么要这样判断:

  • h == null :表明 head 节点还没有被初始化,也就是 AQS 中的队列没有被初始化,因此无法唤醒队列中的线程节点。
  • h != null && h.waitStatus == 0 :表明头节点刚刚初始化完毕(节点的初始化状态为 0),后继节点线程还没有成功入队,因此不需要对后续节点进行唤醒。(当后继节点入队之后,会将前继节点的状态修改为 SIGNAL ,表明需要对后继节点进行唤醒)
  • h != null && h.waitStatus != 0 :其中 waitStatus 有可能大于 0,也有可能小于 0。其中 > 0 表明节点已经取消等待获取资源,< 0 表明节点处于正常等待状态。

接下来进入 unparkSuccessor() 方法查看如何唤醒后继节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// AQS:这里的入参 node 为队列的头节点(虚拟头节点)
private void unparkSuccessor(Node node) {
// // 获得节点状态,释放锁的节点,也就是头节点
//CANCELLED(1)、SIGNAL(-1)、CONDITION (-2)、PROPAGATE(-3)
int ws = node.waitStatus;
// 1、将头节点的状态进行清除,为后续的唤醒做准备。
if (ws < 0)
// 若头节点状态小于0,则将其置为0,表示初始状态
compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;
// 2、如果后继节点异常,则需要从 tail 向前遍历,找到正常状态的节点进行唤醒。
if (s == null || s.waitStatus > 0) {
// // 如果新节点已经被取消CANCELLED(1)
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
// 从队列尾部开始,往前去找最前面的一个 waitStatus 小于等于的节点
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 3、唤醒后继节点
LockSupport.unpark(s.thread);
}
  • unparkSuccessor() 中,如果头节点的状态 < 0 (在正常情况下,只要有后继节点,头节点的状态应该为 SIGNAL ,即 -1),表示需要对后继节点进行唤醒,因此这里提前清除头节点的状态标识,将状态修改为 0,表示已经执行了对后续节点唤醒的操作。

  • 如果 s == null 或者 s.waitStatus > 0 ,表明后继节点异常,此时不能唤醒异常节点,而是要找到正常状态的节点进行唤醒。

  • 因此需要从 tail 指针向前遍历,来找到第一个状态正常(waitStatus <= 0)的节点进行唤醒。

unparkSuccessor()唤醒后继节点的线程后,后继节点的线程重新执行方法acquireQueued()中的自旋抢占逻辑。

tail 指针向前遍历原因

为什么要从 tail 指针向前遍历,而不是从 head 指针向后遍历,寻找正常状态的节点呢?

遍历的方向和 节点的入队操作 有关。入队方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// AQS:节点入队方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
// 1、先修改 prev 指针。
node.prev = pred;
if (compareAndSetTail(pred, node)) {
// 2、再修改 next 指针。
pred.next = node;
return node;
}
}
enq(node);
return node;
}

addWaiter() 方法中,node 节点入队需要修改 node.prevpred.next 两个指针,但是这两个操作并不是 原子操作 ,先修改了 node.prev 指针,之后才修改 pred.next 指针。在极端情况下,可能会出现 head 节点的下一个节点状态为 CANCELLED ,此时新入队的节点仅更新了 node.prev 指针,还未更新 pred.next 指针,如下图:

这样如果从 head 指针向后遍历,无法找到新入队的节点,因此需要从 tail 指针向前遍历找到新入队的节点。

图解 AQS 工作原理

这里基于 ReentrantLock 来画图进行讲解。

  1. 设总共有 3 个线程尝试获取锁,线程分别为 T1T2T3

    • 此时,假设线程 T1 先获取到锁,线程 T2 排队等待获取锁。在线程 T2 进入队列之前,需要对 AQS 内部队列进行初始化。head 节点在初始化后状态为 0 。AQS 内部初始化后的队列如下图:

  2. 此时,线程 T2 尝试获取锁。由于线程 T1 持有锁,因此线程 T2 会进入队列中等待获取锁。同时会将前继节点( head 节点)的状态由 0 更新为 SIGNAL ,表示需要对 head 节点的后继节点进行唤醒。此时,AQS 内部队列如下图所示:

  3. 此时,线程 T3 尝试获取锁。由于线程 T1 持有锁,因此线程 T3 会进入队列中等待获取锁。同时会将前继节点(线程 T2 节点)的状态由 0 更新为 SIGNAL ,表示线程 T2 节点需要对后继节点进行唤醒。此时,AQS 内部队列如下图所示:

  4. 此时,假设线程 T1 释放锁,会唤醒后继节点 T2 。线程 T2 被唤醒后获取到锁,并且会从等待队列中退出。

    • 这里线程 T2 节点退出等待队列并不是直接从队列移除,而是令线程 T2 节点成为新的 head 节点,以此来退出资源获取的等待。此时 AQS 内部队列如下所示:

  5. 此时,假设线程 T2 释放锁,会唤醒后继节点 T3 。线程 T3 获取到锁之后,同样也退出等待队列,即将线程 T3 节点变为 head 节点来退出资源获取的等待。此时 AQS 内部队列如下所示:

ReentrantLock的抢锁流程

非公平抢占的钩子方法:tryAcquire(arg)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
static final class NonfairSync extends Sync {
//非公平锁抢占的钩子方法
protected final boolean tryAcquire(int acquires)
{
return nonfairTryAcquire(acquires);
}
// 省略其他
}

abstract static class Sync extends AbstractQueuedSynchronizer {

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 先直接获得锁的状态
int c = getState();
if (c == 0) {
// 如果内部队列首节点的线程执行完了,它会将锁的state设置为0
// 当前抢锁线程的下一步就是直接进行抢占,不管不顾
// 发现state是空的,就直接拿来加锁使用,根本不考虑后面继承者的存在
if (compareAndSetState(0, acquires)) {
// 1. 利用CAS自旋方式判断当前state确实为0,然后设置成acquire(1)
// 这是原子性的操作,可以保证线程安全
setExclusiveOwnerThread(current);
// 设置当前执行的线程,直接返回true
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 2.当前的线程和执行中的线程是同一个,也就意味着可重入操作 int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count
exceeded");
setState(nextc);
// 表示当前锁被1个线程重复获取了nextc次
return true;
}
// 否则就返回false,表示没有成功获取当前锁,进入排队过程
return false;
}

// 省略其他
}

非公平同步器ReentrantLock.NonfairSync的核心思想是当前线程尝试获取锁的时候,如果发现锁的状态位是0,就直接尝试将锁拿过来,然后执行setExclusiveOwnerThread(),根本不管同步队列中的排队节点。

公平抢占的钩子方法:tryAcquire(arg)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static final class FairSync extends Sync {
// 公平抢占的钩子方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // 锁状态
if (c == 0) {
if (!hasQueuedPredecessors() && // 有后继节点就返回,足够讲义气
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

公平抢占的钩子方法中,首先判断是否有后继节点,如果有后继节点,并且当前线程不是锁的占有线程,钩子方法就返回false,模板方法会进入排队的执行流程,可见公平锁是真正公平的。

是否有后继节点的判断

head节点是获取到锁的节点,但是任意时刻head节点可能占用着锁,也可能释放了锁,如果释放了锁,那么此时state=0,未被阻塞的head.next节点对应的线程在任意时刻都是在自旋地尝试获取锁。FairSync进行是否有后继节点的判断代码如下:

1
2
3
4
5
6
7
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

hasQueuedPredecessors的执行场景大致如下:

  1. h!=t不成立的时候,说明h头节点、t尾节点要么是同一个节点,要么都是null,此时hasQueuedPredecessors()返回false,表示没有后继节点。
  2. h!=t成立的时候,进一步检查head.next是否为null,如果为null,就返回true。什么情况下h!=t,同时h.next==null呢?
    • 有其他线程第一次正在入队时可能会出现。其他线程执行AQS的enq()方法,compareAndSetHead(node)完成,还没执行tail=head语句时,此时t=null、head=new Node()、head.next=null
  3. 如果h!=t成立,head.next != null,判断head.next是不是当前线程,如果是就返回false,否则返回true。

AQS条件队列

Condition是JUC用来替代传统Object的wait()/notify()线程间通信与协作机制的新组件,相比调用Object的wait()/notify(),调用Condition的await()/signal()这种方式实现线程间协作更加高效。

  • Condition的await()方法会将线程包装为等待节点,加入等待队列中,并将AQS同步队列中的节点移除,接着不断检查isOnSyncQueue(Node node),如果在等待队列中,就一直等着,如果signal将它移到AQS队列中,则退出循环。
  • Condition的signal()方法则是先检查当前线程是否获取了锁,接着将等待队列中的节点通过Node的操作直接加入AQS队列。线程并不会立即获取到资源,从while循环退出后,会通过acquireQueued方法加入获取同步状态的竞争中。

AQS,Lock,Condition,ConditionObject之间的关系:ConditionObject是AQS的内部类,实现了Condition接口,Lock中提供newCondition()方法,委托给内部AQS的实现Sync来创建ConditionObject对象,享受AQS对Condition的支持。

1
2
3
4
5
6
7
8
9
// ReentrantLock#newCondition
public Condition newCondition() {
return sync.newCondition();
}
// Sync#newCondition
final ConditionObject newCondition() {
// 返回Contition的实现,定义在AQS中
return new ConditionObject();
}

Condition基本原理

Condition与Object的wait()/notify()作用是相似的,都是使得一个线程等待某个条件,只有当该条件具备signal()或者signalAll()方法被调用时等待线程才会被唤醒,从而重新争夺锁。不同的是,Object的wait()/notify()由JVM底层实现,而Condition接口与实现类完全使用Java代码实现。当需要进行线程间的通信时,建议结合使用ReetrantLock与Condition,通过Condition的await()和signal()方法进行线程间的阻塞与唤醒。

ConditionObject类是实现条件队列的关键,每个ConditionObject对象都维护一个单独的条件等待队列。每个ConditionObject对应一个条件队列,它记录该队列的头节点和尾节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// AQS中Node类与Condition相关的字段:
static final class Node {
// 记录当前线程的等待状态,
volatile int waitStatus;

// 前驱节点
volatile Node prev;

// 后继节点
volatile Node next;

// node存储的线程
volatile Thread thread;

// 当前节点在Condition中等待队列上的下一个节点
Node nextWaiter;
//...
}

public class ConditionObject implements Condition,java.io.Serializable {
//记录该队列的头节点
private transient Node firstWaiter;
//记录该队列的尾节点
private transient Node lastWaiter;
}

条件队列为单向列表,只有指向下一个节点的引用;没有被唤醒的节点全部存储在条件队列上。下图描述的是一个长度为 5 的条件队列,即有5个线程执行了await()方法;与阻塞队列不同,条件队列没有常驻内存的“head结点”,且一个处于正常状态节点的waitStatus -2 condition 。当有新节点加入时,将会追加至队列尾部

条件队列数据结构

在一个显式锁上,我们可以创建多个等待任务队列,这点和内置锁不同,Java内置锁上只有唯一的一个等待队列。比如,我们可以调用newCondition()创建两个等待队列,具体如下:

1
2
3
4
5
private Lock lock = new ReentrantLock();
//创建第一个等待队列
private Condition firstCond = lock.newCondition();
//创建第二个等待队列
private Condition secondCond = lock.newCondition();

Condition条件队列是单向的,而AQS同步队列是双向的,AQS节点会有前驱指针。一个AQS实例可以有多个条件队列,是聚合关系;但是一个AQS实例只有一个同步队列,是逻辑上的组合关系。

await()等待方法原理

当线程调用await()方法时,说明当前线程的节点为当前AQS队列的头节点,正好处于占有锁的状态,await()方法需要把该线程从AQS队列挪到Condition等待队列里

await()方法将当前线程挪动到Condition等待队列后,还会唤醒AQS同步队列中head节点的下一个节点。await()方法的核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final void await() throws InterruptedException {
// 这个方法是响应中断的
if (Thread.interrupted())
throw new InterruptedException();
// 添加到条件队列中
Node node = addConditionWaiter();
// 释放同步资源,也就是释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果这个节点的线程不在同步队列中,说明该线程还不具备竞争锁的资格
while (!isOnSyncQueue(node)) {
// 挂起线程
LockSupport.park(this);
// 如果线程中断,退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 上面的循环退出有两种情况:
// 1. isOnSyncQueue(node) 为true,即当前的node已经转移到阻塞队列了
// 2. checkInterruptWhileWaiting != 0, 表示线程中断

// 退出循环,被唤醒之后,进入阻塞队列,等待获取锁 acquireQueued
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

addConditionWaiter

创建一个新节点并放入Condition队列尾部的工作由addConditionWaiter()方法完成,该方法具体如下:

  • 首先判断条件队列的尾节点是否被取消了,这里用last.ws != CONDITION来判断,如果是的话,就需要从头到尾遍历,消除被不是condition的节点。
  • 接着将当前线程包装为Node,指定ws为CONDITION。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果lastWaiter被取消了,将其清除
if (t != null && t.waitStatus != Node.CONDITION) {
// 遍历整个条件队列,将已取消的所有节点清除出列
unlinkCancelledWaiters();
// t重新赋值一下,因为last可能改变了
t = lastWaiter;
}
//注意这里,node在初始化的时候,会指定ws为CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// t == null 表示队列此时为空,初始化firstWaiter
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;// 入队尾
lastWaiter = node;// 将尾指针指向新建的node
return node;
}

unlinkCancelledWaiters

unlinkCancelledWaiters用于清除队列中已经取消等待的节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
// trail这里表示取消节点的前驱节点
Node trail = null;
// t会从头到尾遍历这个单链表
while (t != null) {
// next用于保存下一个
Node next = t.nextWaiter;
// 如果发现当前这个节点 不是 condition了, 那么考虑移除它
// 下面是单链表的移除节点操作 简单来说就是 trail.next = t.next
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
// 说明first就是不是condition了
if (trail == null)
firstWaiter = next;
else
//trail.next = t.next
trail.nextWaiter = next;
// trail后面没东西,自然trail就是lastWaiter了
if (next == null)
lastWaiter = trail;
}
// 当前节点是一直跟到不是condition节点的上一个
else
trail = t;
// 向后遍历 t = t.next
t = next;
}
}

fullyRelease

将节点加入等待队列中后,就需要完全释放线程拥有的独占锁了,完全释放针对重入锁的情况。

我们看到这个方法返回了一个savedState变量,简单的理解就是保存状态。我们知道重入锁的state由重入的次数,如果一个state为N,我们可以认为它持有N把锁。

await()方法必须将state置0,也就是完全释放锁,后面的线程才能获取到这把锁,置0之后,我们需要用个变量标记一下,也就是这里的savedState。这样它被重新唤醒的时候,我们就知道,他需要获取savedState把锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取当前的state值,重入次数
int savedState = getState();
// 释放N = savedState资源
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
// 如果获取失败,将会将节点设置为取消状态,并抛出异常
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

等待进入阻塞队列

完全释放锁之后,将会来到这几步,如果这个节点的线程不在同步队列中,说明该线程还不具备竞争锁的资格,将被一直挂起,这里的同步队列指的是AQS的阻塞队列。

1
2
3
4
5
6
7
8
9
int interruptMode = 0;
// 如果这个节点的线程不在同步队列中,说明该线程还不具备竞争锁的资格,会一直挂起
while (!isOnSyncQueue(node)) {
// 挂起线程
LockSupport.park(this);
// 如果线程中断,退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}

isOnSyncQueue

判断节点是不是已经到阻塞队列中了,如果是的话,就直接返回true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final boolean isOnSyncQueue(Node node) {
// 1. 节点的等待状态还是condition表示还在等待队列中
// 2. node.prev == null 表示还没移到阻塞队列中[prev和next都是阻塞队列中用的]

if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;

// 如果node已经有了后继节点,表示已经在阻塞队列中了
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
// 来到这里的情况:ws != condition && node.prev != null && node.next == null

// 想想:为什么node.prev != null不能作为判断不在阻塞队列的依据呢?
// CAS首先设置node.prev 指向tail,这个时候node.prev 是不为null的,但CAS可能会失败
return findNodeFromTail(node);
}

findNodeFromTail

从阻塞队列的尾部向前遍历,如果找到这个node,表示它已经在了,那就返回true。

1
2
3
4
5
6
7
8
9
10
11
12
13
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
// 已经有了
if (t == node)
return true;
// 尾都没有,找啥呢,返回false
if (t == null)
return false;
// 一直往前找
t = t.prev;
}
}

signal()唤醒方法原理

线程在某个ConditionObject对象上调用signal()方法后,等待队列中的firstWaiter会被加入同步队列中,等待节点被唤醒

1
2
3
4
5
6
7
8
public final void signal() {
// 一样的,必须占有当前这个锁才能用signal方法
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

doSignal

doSignal()方法会从头到尾遍历条件队列,找到需要移到同步队列的节点。

  • 这里的while循环表示,如果first没有转移成功,就接着判断first后面的节点是不是需要转移。
1
2
3
4
5
6
7
8
9
10
11
12
private void doSignal(Node first) {
do {
// firstWaiter 指向first的下一个
if ( (firstWaiter = first.nextWaiter) == null)
// 如果first是最后一个且要被移除了,就将last置null
lastWaiter = null;
// first断绝与条件队列的连接
first.nextWaiter = null;
// fisrt转移失败,就看看后面是不是需要的
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

transferForSignal

transferForSignal该方法将节点从条件队列转移到阻塞队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
final boolean transferForSignal(Node node) {
/*
* CAS操作尝试将Condition的节点的ws改为0
* 如果失败,意味着:节点的ws已经不是CONDITION,说明节点已经被取消了
* 如果成功,则该节点的状态ws被改为0了
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* 通过enq方法将node自旋的方式加入同步队列队尾
* 这里放回的p是node在同步队列的前驱节点
*/
Node p = enq(node);
int ws = p.waitStatus;
// ws大于0 的情况只有 cancenlled,表示node的前驱节点取消了争取锁,那直接唤醒node线程
// ws <= 0 会使用cas操作将前驱节点的ws置为signal,如果cas失败也会唤醒node
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
// 自旋的方式入队
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
// 返回的是node的前驱节点
return t;
}
}
}
}

整体流程如下:

  1. 通过enq()方法自旋(该方法已经介绍过)将条件队列中的头节点放入AQS同步队列尾部,并获取它在AQS队列中的前驱节点。
  2. 如果前驱节点的状态是取消状态,或者设置前驱节点为Signal状态失败,就唤醒当前节点的线程;否则节点在同步队列的尾部,参与排队。
  3. 同步队列中的线程被唤醒后,表示重新获取了显式锁,然后继续执行condition.await()语句后面的临界区代码。

唤醒线程

LockSupport.park(this)挂起的线程是什么时候唤醒的:

  1. signal方法将节点转移到同步队列中,且获取到了锁或者对前驱节点的cas操作失败,调用了LockSupport.unpark(node.thread);方法。
  2. 在park的时候,另外一个线程对挂起的线程进行了中断。

一旦signal之后,节点被成功转移到同步队列后,这时下面这个循环就会退出了,继续回到这里:

1
2
3
4
5
6
7
8
9
int interruptMode = 0;
// 如果这个节点的线程不在同步队列中,说明该线程还不具备竞争锁的资格,会一直挂起
while (!isOnSyncQueue(node)) {
// 挂起线程
LockSupport.park(this);
// 如果线程中断,退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}

interruptMode

1
2
3
4
5
/** await 返回的时候,需要重新设置中断状态 */
private static final int REINTERRUPT = 1;
/** await 返回的时候,需要抛出 InterruptedException 异常 */
private static final int THROW_IE = -1;
/** interruptMode取0的时候表示在await()期间,没有发生中断 */

checkInterruptWhileWaiting

该方法用于判断该线程是否在挂起期间发生了中断。

1
2
3
4
5
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?// 如果处于中断状态,返回true,且将重置中断状态
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :// 如果中断了,判断何时中断
0; // 没有中断, 返回0
}

transferAfterCancelledWait

该方法判断何时中断,是否在signal之前。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean transferAfterCancelledWait(Node node) {
// 尝试使用CAS操作将node 的ws设置为0
// 如果成功,说明在signal方法之前中断就已经发生:
// 原因在于:signal如果在此之前发生,必然已经cas操作将ws设置为0了,这里不可能设置成功
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 就算中断了,也将节点入队
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
* 这里就是signal之后发生的中断
* 但是signal可能还在进行转移中,这边自旋等一下它完成
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}

处理中断状态

1
2
3
4
5
6
7
8
9
// 第一部分
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 第二部分
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters(); // 清除取消的节点
// 第三部分
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
第一部分

signal唤醒的线程并不会立即获取到资源,从while循环退出后,会通过acquireQueued方法加入获取同步状态的竞争中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted; //
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

acquireQueued方法返回时,表示已经获取到了锁,且返回的是interrupted值,如果返回true,表示已经被中断。

接着判断interruptMode != THROW_IE表示是在signal之后发生的中断,需要重新中断当前线程,将interruptMode设置为REINTERRUPT。

第二部分
1
2
3
// 第二部分
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters(); // 清除取消的节点

前面说了,signal会将节点移到同步队列中,最后一步需要和条件队列断开关系,也就是:node.nextWaiter = null,但这是想象中比较正常的情况,如果在signal之前被中断,节点也会被加入同步队列中,这时其实是没有调用这个断开关系的。

因此这边做一点处理,unlinkCancelledWaiters()逻辑上面也说过了,可以回过头去看看,主要是清除队列中已经取消等待的节点。

第三部分

最后一个部分,就是对两种interruptMode的情况进行处理,看看代码就知道了:

1
2
3
4
5
6
7
8
9
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
// signal 之前的中断, 需要抛出异常
if (interruptMode == THROW_IE)
throw new InterruptedException();
// signal 之后发生的中断, 需要重新中断
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}

AQS的实际应用

AQS建立在CAS原子操作和volatile可见性变量的基础之上,为上层的显式锁、同步工具类、阻塞队列、线程池、并发容器、Future异步工具提供线程之间同步的基础设施。所以,AQS在JUC框架中的使用是非常广泛的。