前文已经对AbstractQueuedSynchronizer做了详细的介绍,本篇文章主要从实现的角度来看。
AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。
AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
核心是对下面三个进行实现
- 同步状态的原子性管理;
- 线程的阻塞与解除阻塞;
- 队列的管理;
AQS 核心思想
同步状态的原子性管理
AQS使用一个int成员变量来表示同步状态,使用CAS对该同步状态进行原子操作实现对其值的修改。
1 | private volatile int state;//共享变量,使用volatile修饰保证线程可见性 |
状态信息通过procted类型的getState,setState,compareAndSetState进行操作
1 |
|
线程的阻塞与解除阻塞
通过LockSupport来实现,主要是当所已经被其他线程获取时,阻塞当前线程直到被唤醒。具体可以参考阻塞原语LockSupport
队列的管理
使用CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。其中Sync queue,即同步队列,是双向链表,包括head结点和tail结点,head结点主要用作后续的调度。而Condition queue不是必须的,其是一个单向链表,只有当使用Condition时,才会存在此单向链表。并且可能会有多个Condition queue。
AQS对资源的共享方式
AQS定义两种资源共享方式 :
- Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:
- 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
- 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
- Share(共享):多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我们都会在后面讲到。
AQS底层使用了模板方法模式
同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用): 使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放)将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
这和我们以往通过实现接口的方式有很大区别,模板模式具体可以参考设计模式行为型 - 模板方法(Template Method)详解
AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:
1 | sHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。 |
默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
源码分析
类的继承关系
AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer抽象类,并且实现了Serializable接口,可以进行序列化。
1 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable |
其中AbstractOwnableSynchronizer抽象类的源码如下:
1 |
|
AbstractOwnableSynchronizer抽象类中,可以设置独占资源线程和获取独占资源线程。分别为setExclusiveOwnerThread与getExclusiveOwnerThread方法,这两个方法会被子类调用。
AbstractQueuedSynchronizer类有两个内部类,分别为Node类与ConditionObject类,本篇文章不对ConditionObject做具体介绍,后面会详细说明,主要讲解独占锁的获取。
内部类Node
Mode主要是用于队列的构建,源码如下
1 | static final class Node { |
每个线程被阻塞的线程都会被封装成一个Node结点,放入队列。每个节点包含了一个Thread类型的引用,并且每个节点都存在一个状态,具体状态如下。
- CANCELLED,值为1,表示当前的线程被取消。
- SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作。
- CONDITION,值为-2,表示当前节点在等待condition,也就是在condition queue中。
- PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行。
- 值为0,表示当前节点在sync queue中,等待着获取锁。
类的属性
属性中包含了头节点head,尾结点tail,状态state、自旋时间spinForTimeoutThreshold,还有AbstractQueuedSynchronizer抽象的属性在内存中的偏移地址,通过该偏移地址,可以获取和设置该属性的值,同时还包括一个静态初始化块,用于加载内存偏移地址。
1 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer |
类的构造方法
此类构造方法为从抽象构造方法,供子类调用。
1 | protected AbstractQueuedSynchronizer() { } |
核心方法acquire
该方法以独占模式获取(资源),忽略中断,即线程在aquire过程中,中断此线程是无效的。源码如下:
1 | public final void acquire(int arg) { |
由上述源码可以知道,当一个线程调用acquire时,调用方法流程如下
- 首先调用tryAcquire方法,调用此方法的线程会试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。在AbstractQueuedSynchronizer源码中没有实现此方法,即需要子类去重写此方法完成自己的逻辑。
- 若tryAcquire失败,则调用addWaiter方法,addWaiter方法完成的功能是将调用此方法的线程封装成为一个结点并放入Sync queue。
- 调用acquireQueued方法,此方法完成的功能是Sync queue中的结点不断尝试获取资源,若成功,则返回true,否则,返回false。
- selfInterrupt是当acquireQueued由于响应中断信号退出时执行,如果是正常退出,则会返回false
由于tryAcquire默认是空,所以此时,不进行分析,之后会结合一个例子进行分析。
首先分析addWaiter方法
addWaiter方法使用快速添加的方式往sync queue尾部添加结点,首先使用compareAndSetTail进行尝试设置,如果成功,说明没有竞争,则添加成功,添加不成功则说明有竞争,使用enq进行添加,如果sync queue队列还没有初始化,也会使用enq插入队列中
1 | // 添加等待者 |
enq方法源码如下
enq方法会使用无限循环来确保节点的成功插入。
1 | private Node enq(final Node node) { |
acquireQueued
现在,分析acquireQueue方法。其源码如下
1 | // sync队列中的结点在独占且忽略中断的模式下获取(资源) |
首先获取当前节点的前驱节点,如果前驱节点是头节点并且能够获取(资源),代表该当前节点能够占有锁,设置头节点为当前节点,返回。否则,调用shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法,首先,我们看shouldParkAfterFailedAcquire方法,代码如下
1 | // 当获取(资源)失败后,检查并且更新结点状态 |
主要步骤如下:
- 只有当该节点的前驱结点的状态为SIGNAL时,才可以对该结点所封装的线程进行park操作。
- 一直循环查找前驱节点不为取消状态的情形,并将此节点的后继节点设置为当前节点
- 如果前驱节点状态小于等于0,增说明前驱节点是有效状态,设置其状态为acquireQueued
- 返回false,表明当前节点不能暂停,从而继续调用acquireQueued,来进入下一次循环
再看parkAndCheckInterrupt方法,源码如下
1 |
|
parkAndCheckInterrupt方法里的逻辑是首先执行park操作,即阻塞当前线程,此时线程已经不能执行,知道被唤醒,才能执行return防范,唤醒后,返回该线程是否已经被中断,同时清楚中断。
再看final块中的cancelAcquire方法,其源码如下
1 | // 取消继续获取(资源) |
该方法调用的时机是,当前阻塞的线程出现任何异常的情况下调用,正常的获取不会调用此方法。
该方法完成的功能就是取消当前线程对资源的获取,即设置该结点的状态为CANCELLED,然后设置当前节点状态有效的前继节点,同时找到当前节点的后继节点,设置前继节点的next节点指向找到的后继节点,或者唤醒下一个节点,接着我们再看unparkSuccessor方法,源码如下
1 |
|
该方法的作用就是为了释放node节点的后继结点。 对于cancelAcquire与unparkSuccessor方法,如下示意图可以清晰的表示:
其中node为参数,在执行完cancelAcquire方法后的效果就是unpark了s结点所包含的t4线程。 现在,再来看acquireQueued方法的整个的逻辑。逻辑如下:
- 判断结点的前驱是否为head并且是否成功获取(资源)。
- 若步骤1均满足,则设置结点为head,之后会判断是否finally模块,然后返回。
- 若步骤2不满足,则判断是否需要park当前线程,是否需要park当前线程的逻辑是判断结点的前驱结点的状态是否为SIGNAL,若是,则park当前结点,否则,不进行park操作。
- 若park了当前线程,之后某个线程对本线程unpark后,并且本线程也获得机会运行。那么,将会继续进行步骤①的判断。
类的核心方法 - release方法
以独占模式释放对象,其源码如下:
1 | public final boolean release(int arg) { |
其中,tryRelease的默认实现是抛出异常,需要具体的子类实现,如果tryRelease成功,那么如果头节点不为空并且头节点的状态不为0,则释放头节点的后继结点,unparkSuccessor方法已经分析过,不再累赘。 对于其他方法我们也可以分析,与前面分析的方法大同小异,所以,不再累赘。
总结
获取锁的流程可以总结为如下步骤
- 使用尝试获取锁,如果获取成功则返回,失败进入第二步
- 创建阻塞节点,首先尝试插入节点,如果插入成功进入第4步
- 插入失败,使用for循环尝试插入节点,直到插入成功
- 无限循环,获取锁,首先判断前继节点是否是头结点,如果是则尝试获取锁,获取成功直接返回,获取失败进入第5步
- 判断当前节点是否可以阻塞,依据是前继节点是否有效,如果是有效状态则暂停,无效状态,则需要查找到有效节点,并设置当前节点的前继节点为查找到的有效节点,然后返会false,或者前继节点是刚插入的,则设置其状态为SINGAL,告诉其后继节点需要被唤醒
- 上一步如果返回false,则说明节点的前继节点有变化,需要从新走一遍4到5,如果返回true,则阻塞当前线程
- 如果上面的步骤执行失败,需要设置当前节点为取消状态,并唤醒后继节点,来获取锁,这个过程只走一次,如果失败,说明已有一个线程在进入队列,由其来校正所有节点的状态与连接
释放锁的流程
如果释放锁成功,则判断头结点是否存在并且状态不是0,及后续可能有节点,唤醒后继节点
参考
1.获取锁的流程图