0%

java线程系列 JUC锁 02 AbstractQueuedSynchronizer具体实现

前文已经对AbstractQueuedSynchronizer做了详细的介绍,本篇文章主要从实现的角度来看。

AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。

AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

核心是对下面三个进行实现

  • 同步状态的原子性管理;
  • 线程的阻塞与解除阻塞;
  • 队列的管理;

AQS 核心思想

同步状态的原子性管理
AQS使用一个int成员变量来表示同步状态,使用CAS对该同步状态进行原子操作实现对其值的修改。

1
2
private volatile int state;//共享变量,使用volatile修饰保证线程可见性

状态信息通过procted类型的getState,setState,compareAndSetState进行操作

1
2
3
4
5
6
7
8
9
10
11
12
13

//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

线程的阻塞与解除阻塞
通过LockSupport来实现,主要是当所已经被其他线程获取时,阻塞当前线程直到被唤醒。具体可以参考阻塞原语LockSupport

队列的管理
使用CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。其中Sync queue,即同步队列,是双向链表,包括head结点和tail结点,head结点主要用作后续的调度。而Condition queue不是必须的,其是一个单向链表,只有当使用Condition时,才会存在此单向链表。并且可能会有多个Condition queue。

队列管理

AQS对资源的共享方式

AQS定义两种资源共享方式 :

  • Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:
    • 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
    • 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
  • Share(共享):多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我们都会在后面讲到。

AQS底层使用了模板方法模式

同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用): 使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放)将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

这和我们以往通过实现接口的方式有很大区别,模板模式具体可以参考设计模式行为型 - 模板方法(Template Method)详解

AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:

1
2
3
4
5
 sHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。

默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

源码分析

类的继承关系

AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer抽象类,并且实现了Serializable接口,可以进行序列化。

1
2
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable

其中AbstractOwnableSynchronizer抽象类的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {

// 版本序列号
private static final long serialVersionUID = 3737899427754241961L;
// 构造方法
protected AbstractOwnableSynchronizer() { }
// 独占模式下的线程
private transient Thread exclusiveOwnerThread;

// 设置独占线程
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}

// 获取独占线程
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}

AbstractOwnableSynchronizer抽象类中,可以设置独占资源线程和获取独占资源线程。分别为setExclusiveOwnerThread与getExclusiveOwnerThread方法,这两个方法会被子类调用。

AbstractQueuedSynchronizer类有两个内部类,分别为Node类与ConditionObject类,本篇文章不对ConditionObject做具体介绍,后面会详细说明,主要讲解独占锁的获取。

内部类Node

Mode主要是用于队列的构建,源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
static final class Node {
// 模式,分为共享与独占
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 结点状态
// CANCELLED,值为1,表示当前的线程被取消
// SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
// CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
// PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
// 值为0,表示当前节点在sync队列中,等待着获取锁
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;

// 结点状态
volatile int waitStatus;
// 前驱结点
volatile Node prev;
// 后继结点
volatile Node next;
// 结点所对应的线程
volatile Thread thread;
// 下一个等待者
Node nextWaiter;

// 结点是否在共享模式下等待
final boolean isShared() {
return nextWaiter == SHARED;
}

// 获取前驱结点,若前驱结点为空,抛出异常
final Node predecessor() throws NullPointerException {
// 保存前驱结点
Node p = prev;
if (p == null) // 前驱结点为空,抛出异常
throw new NullPointerException();
else // 前驱结点不为空,返回
return p;
}

// 无参构造方法
// Used to establish initial head or SHARED marker
Node() {
}

// 构造方法
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

// 构造方法
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

每个线程被阻塞的线程都会被封装成一个Node结点,放入队列。每个节点包含了一个Thread类型的引用,并且每个节点都存在一个状态,具体状态如下。

  • CANCELLED,值为1,表示当前的线程被取消。
  • SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作。
  • CONDITION,值为-2,表示当前节点在等待condition,也就是在condition queue中。
  • PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行。
  • 值为0,表示当前节点在sync queue中,等待着获取锁。

类的属性

属性中包含了头节点head,尾结点tail,状态state、自旋时间spinForTimeoutThreshold,还有AbstractQueuedSynchronizer抽象的属性在内存中的偏移地址,通过该偏移地址,可以获取和设置该属性的值,同时还包括一个静态初始化块,用于加载内存偏移地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 版本号
private static final long serialVersionUID = 7373984972572414691L;
// 头节点
private transient volatile Node head;
// 尾结点
private transient volatile Node tail;
// 状态
private volatile int state;
// 自旋时间
static final long spinForTimeoutThreshold = 1000L;

// Unsafe类实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
// state内存偏移地址
private static final long stateOffset;
// head内存偏移地址
private static final long headOffset;
// state内存偏移地址
private static final long tailOffset;
// tail内存偏移地址
private static final long waitStatusOffset;
// next内存偏移地址
private static final long nextOffset;
// 静态初始化块
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));

} catch (Exception ex) { throw new Error(ex); }
}
}

类的构造方法

此类构造方法为从抽象构造方法,供子类调用。

1
protected AbstractQueuedSynchronizer() { } 

核心方法acquire

该方法以独占模式获取(资源),忽略中断,即线程在aquire过程中,中断此线程是无效的。源码如下:

1
2
3
4
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

由上述源码可以知道,当一个线程调用acquire时,调用方法流程如下

流程

  • 首先调用tryAcquire方法,调用此方法的线程会试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。在AbstractQueuedSynchronizer源码中没有实现此方法,即需要子类去重写此方法完成自己的逻辑。
  • 若tryAcquire失败,则调用addWaiter方法,addWaiter方法完成的功能是将调用此方法的线程封装成为一个结点并放入Sync queue。
  • 调用acquireQueued方法,此方法完成的功能是Sync queue中的结点不断尝试获取资源,若成功,则返回true,否则,返回false。
  • selfInterrupt是当acquireQueued由于响应中断信号退出时执行,如果是正常退出,则会返回false

由于tryAcquire默认是空,所以此时,不进行分析,之后会结合一个例子进行分析。

首先分析addWaiter方法
addWaiter方法使用快速添加的方式往sync queue尾部添加结点,首先使用compareAndSetTail进行尝试设置,如果成功,说明没有竞争,则添加成功,添加不成功则说明有竞争,使用enq进行添加,如果sync queue队列还没有初始化,也会使用enq插入队列中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 // 添加等待者
private Node addWaiter(Node mode) {
// 新生成一个结点,默认为独占模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 保存尾结点
Node pred = tail;
if (pred != null) { // 尾结点不为空,即已经被初始化
// 将node结点的prev域连接到尾结点
node.prev = pred;
if (compareAndSetTail(pred, node)) { // 比较pred是否为尾结点,是则将尾结点设置为node
// 设置尾结点的next域为node
pred.next = node;
return node; // 返回新生成的结点
}
}
enq(node); // 尾结点为空(即还没有被初始化过),或者是compareAndSetTail操作失败,则入队列
return node;
}

enq方法源码如下
enq方法会使用无限循环来确保节点的成功插入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node enq(final Node node) {
for (;;) { // 无限循环,确保结点能够成功入队列
// 保存尾结点
Node t = tail;
if (t == null) { // 尾结点为空,即还没被初始化
if (compareAndSetHead(new Node())) // 头节点为空,并设置头节点为新生成的结点
tail = head; // 头节点与尾结点都指向同一个新生结点
} else { // 尾结点不为空,即已经被初始化过
// 将node结点的prev域连接到尾结点
node.prev = t;
if (compareAndSetTail(t, node)) { // 比较结点t是否为尾结点,若是则将尾结点设置为node
// 设置尾结点的next域为node
t.next = node;
return t; // 返回尾结点
}
}
}
}

acquireQueued
现在,分析acquireQueue方法。其源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// sync队列中的结点在独占且忽略中断的模式下获取(资源)
final boolean acquireQueued(final Node node, int arg) {
// 标志
boolean failed = true;
try {
// 中断标志
boolean interrupted = false;
for (;;) { // 无限循环
// 获取node节点的前驱结点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 前驱为头节点并且成功获得锁
setHead(node); // 设置头节点
p.next = null; // help GC
failed = false; // 设置标志
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

首先获取当前节点的前驱节点,如果前驱节点是头节点并且能够获取(资源),代表该当前节点能够占有锁,设置头节点为当前节点,返回。否则,调用shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法,首先,我们看shouldParkAfterFailedAcquire方法,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 当获取(资源)失败后,检查并且更新结点状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱结点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 状态为SIGNAL,为-1
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
// 可以进行park操作
return true;
if (ws > 0) { // 表示状态为CANCELLED,为1
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0); // 找到pred结点前面最近的一个状态不为CANCELLED的结点
// 赋值pred结点的next域
pred.next = node;
} else {
// 为PROPAGATE -3 或者是0 表示无状态,(为CONDITION -2时,表示此节点在condition queue中)
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 比较并设置前驱结点的状态为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 不能进行park操作
return false;
}

主要步骤如下:

  1. 只有当该节点的前驱结点的状态为SIGNAL时,才可以对该结点所封装的线程进行park操作。
  2. 一直循环查找前驱节点不为取消状态的情形,并将此节点的后继节点设置为当前节点
  3. 如果前驱节点状态小于等于0,增说明前驱节点是有效状态,设置其状态为acquireQueued
  4. 返回false,表明当前节点不能暂停,从而继续调用acquireQueued,来进入下一次循环

再看parkAndCheckInterrupt方法,源码如下

1
2
3
4
5
6
7

// 进行park操作并且返回该线程是否被中断
private final boolean parkAndCheckInterrupt() {
// 在许可可用之前禁用当前线程,并且设置了blocker
LockSupport.park(this);
return Thread.interrupted(); // 当前线程是否已被中断,并清除中断标记位
}

parkAndCheckInterrupt方法里的逻辑是首先执行park操作,即阻塞当前线程,此时线程已经不能执行,知道被唤醒,才能执行return防范,唤醒后,返回该线程是否已经被中断,同时清楚中断。

再看final块中的cancelAcquire方法,其源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 取消继续获取(资源)
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
// node为空,返回
if (node == null)
return;
// 设置node结点的thread为空
node.thread = null;

// Skip cancelled predecessors
// 保存node的前驱结点
Node pred = node.prev;
while (pred.waitStatus > 0) // 找到node前驱结点中第一个状态小于0的结点,即不为CANCELLED状态的结点
node.prev = pred = pred.prev;

// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
// 获取pred结点的下一个结点
Node predNext = pred.next;

// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
// 设置node结点的状态为CANCELLED
node.waitStatus = Node.CANCELLED;

// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) { // node结点为尾结点,则设置尾结点为pred结点
// 比较并设置pred结点的next节点为null
compareAndSetNext(pred, predNext, null);
} else {
// node结点不为尾结点,或者比较设置不成功
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
// (pred结点不为头节点,并且pred结点的状态为SIGNAL)或者
// pred结点状态小于等于0,并且比较并设置等待状态为SIGNAL成功,并且pred结点所封装的线程不为空
// 保存结点的后继
Node next = node.next;
if (next != null && next.waitStatus <= 0) // 后继不为空并且后继的状态小于等于0
compareAndSetNext(pred, predNext, next); // 比较并设置pred.next = next;
} else {
unparkSuccessor(node); // 释放node的前一个结点
}

node.next = node; // help GC
}
}

该方法调用的时机是,当前阻塞的线程出现任何异常的情况下调用,正常的获取不会调用此方法。

该方法完成的功能就是取消当前线程对资源的获取,即设置该结点的状态为CANCELLED,然后设置当前节点状态有效的前继节点,同时找到当前节点的后继节点,设置前继节点的next节点指向找到的后继节点,或者唤醒下一个节点,接着我们再看unparkSuccessor方法,源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

// 释放后继结点
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 获取node结点的等待状态
int ws = node.waitStatus;
if (ws < 0) // 状态值小于0,为SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3
// 比较并且设置结点等待状态,设置为0
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 获取node节点的下一个结点
Node s = node.next;
if (s == null || s.waitStatus > 0) { // 下一个结点为空或者下一个节点的等待状态大于0,即为CANCELLED
// s赋值为空
s = null;
// 从尾结点开始从后往前开始遍历
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) // 找到等待状态小于等于0的结点,找到最前的状态小于等于0的结点
// 保存结点
s = t;
}
if (s != null) // 该结点不为为空,释放许可
LockSupport.unpark(s.thread);
}

该方法的作用就是为了释放node节点的后继结点。 对于cancelAcquire与unparkSuccessor方法,如下示意图可以清晰的表示:

acquire

其中node为参数,在执行完cancelAcquire方法后的效果就是unpark了s结点所包含的t4线程。 现在,再来看acquireQueued方法的整个的逻辑。逻辑如下:

  • 判断结点的前驱是否为head并且是否成功获取(资源)。
  • 若步骤1均满足,则设置结点为head,之后会判断是否finally模块,然后返回。
  • 若步骤2不满足,则判断是否需要park当前线程,是否需要park当前线程的逻辑是判断结点的前驱结点的状态是否为SIGNAL,若是,则park当前结点,否则,不进行park操作。
  • 若park了当前线程,之后某个线程对本线程unpark后,并且本线程也获得机会运行。那么,将会继续进行步骤①的判断。

类的核心方法 - release方法

以独占模式释放对象,其源码如下:

1
2
3
4
5
6
7
8
9
10
public final boolean release(int arg) {
if (tryRelease(arg)) { // 释放成功
// 保存头节点
Node h = head;
if (h != null && h.waitStatus != 0) // 头节点不为空并且头节点状态不为0
unparkSuccessor(h); //释放头节点的后继结点
return true;
}
return false;
}

其中,tryRelease的默认实现是抛出异常,需要具体的子类实现,如果tryRelease成功,那么如果头节点不为空并且头节点的状态不为0,则释放头节点的后继结点,unparkSuccessor方法已经分析过,不再累赘。 对于其他方法我们也可以分析,与前面分析的方法大同小异,所以,不再累赘。

总结

获取锁的流程可以总结为如下步骤

  1. 使用尝试获取锁,如果获取成功则返回,失败进入第二步
  2. 创建阻塞节点,首先尝试插入节点,如果插入成功进入第4步
  3. 插入失败,使用for循环尝试插入节点,直到插入成功
  4. 无限循环,获取锁,首先判断前继节点是否是头结点,如果是则尝试获取锁,获取成功直接返回,获取失败进入第5步
  5. 判断当前节点是否可以阻塞,依据是前继节点是否有效,如果是有效状态则暂停,无效状态,则需要查找到有效节点,并设置当前节点的前继节点为查找到的有效节点,然后返会false,或者前继节点是刚插入的,则设置其状态为SINGAL,告诉其后继节点需要被唤醒
  6. 上一步如果返回false,则说明节点的前继节点有变化,需要从新走一遍4到5,如果返回true,则阻塞当前线程
  7. 如果上面的步骤执行失败,需要设置当前节点为取消状态,并唤醒后继节点,来获取锁,这个过程只走一次,如果失败,说明已有一个线程在进入队列,由其来校正所有节点的状态与连接

释放锁的流程

如果释放锁成功,则判断头结点是否存在并且状态不是0,及后续可能有节点,唤醒后继节点

参考

1.获取锁的流程图