0%

概述

开发多用户、数据库驱动的应用时,最大的一个难点是:一方面要最大成都地利用数据库的并发访问,另外一方面还要确保每个用户能以一致的方式读取和修改数据,为此就有了锁的机制,同时这也是数据库系统区别于文件系统的一个关键特性。InnoDB存储引擎较之mysql数据库的其它存储引擎在这方面技高一筹。而只有正确的了解这些锁的内部机制才能充分发挥InnoDB存储引擎在这方面的优势。本篇文章会详细介绍InnoDB存储引擎对表中数据的锁定,同时分析InnoDB存储引擎会以怎样的力度锁定数据。

阅读全文 »

概述

  1. 索引基础
  2. 索引主要类型
  3. 索引优化
  4. InnoDB和MyISAM数据分布对比

1. 索引基础

  • 索引,又叫key(键)

  • 在mysql中,存储引擎先在索引中找到检索的内容,然后根据索引结果找到对应的数据行

  • 索引可以包含一个或多个列的值,如果索引包含多个列,那么列的顺序十分重要,因为mysql只能高效的使用索引的最左前缀列最左前缀列就是KEY(id, name, sex)idid、name、sex里面是写在左边的,这就叫最左前缀

    阅读全文 »

在多个事务并发执行时,事务的隔离性不一定能保持。为保持事务的隔离性,系统必须对并发事务之间的相互协作加以控制;这种控制是通过一种称为并发控制来实现。下面将介绍多种机制,但是没有哪种机制明显是最好的。每种机制都有优势。在实践中,最常用的是俩阶段封锁和快照隔离。本篇文章的主要内容就是分别介绍下面几种并发控制机制。

  1. 基于封锁的协议
  2. 基于时间戳的协议
  3. 多版本机制
阅读全文 »

本文主要讲解事务四大特性ACID中的隔离性,这里简单介绍什么是隔离性,接着介绍隔离性级别以及每种隔离性会带来的相应的读的问题,最后简单介绍数据库是如何实现事务的隔离性。

  1. 什么是隔离性
  2. 隔离性级别
  3. 隔离性级别的实现方式简介

什么是隔离性

在数据库系统中会存在多个事务可能并发执行,系统保证,对于任何一对事务T1和T2,在T1看来,T2或者在T1开始之前就已经完成执行,或者在T1完成之后开始执行。因此,每个事务都感觉不到系统中有其他事务在并发的执行。这就是事务的隔离性

隔离性级别

  • 可串行化(serializable):通常保证可串行化调度。
  • 可重复读(repeatable read):只允许读取已提交数据,而且在一个事务俩次读取一个数据项期间,其他事务不得更新该数据。但该事务不要求与其他事务可串行化。
  • 已提交度(read committed):只允许读取已提交数据,但不要求可重复读。比如,在事务俩次读取一个数据项期间,另一个事务更新了该数据并提交。
  • 未提交读(read uncommitted):允许读取未提交数据。

以上所有的隔离级别都不允许脏写:如果一个数据项已经被另外一个尚未提交或终止的事务写入,则不允许对该数据项执行写操作。另外上面四中隔离性界别,从下到上,性能越来越低,并发度越来越差。

下面介绍每种隔离级别带来的问题,

脏读

发生在未提交读隔离级别

脏读又称无效数据的读出,是指在数据库访问中,事务T1将某一值修改,然后事务T2读取该值,此后T1因为某种原因撤销对该值的修改,这就导致了T2所读取到的数据是无效的。

脏读就是指当一个事务正在访问数据,并且对数据进行了修改,而这种修改还没有提交(commit)到数据库中,这时,另外一个事务也访问这个数据,然后使用了这个数据。因为这个数据是还没有提交的数据,那么另外一个事务读到的这个数据是脏数据,依据脏数据所做的操作可能是不正确的。

举例说明:

在下面的例子中,事务2修改了一行,但是没有提交,事务1读了这个没有提交的数据。现在如果事务2回滚了刚才的修改或者做了另外的修改的话,事务1中查到的数据就是不正确的了。

事务1 事务2
SELECT age FROM users WHERE id = 1;
/* will read 20 */
- UPDATE users SET age = 21 WHERE id = 1;
/* No commit here */
SELECT age FROM users WHERE id = 1;
/* will read 21 */
- ROLLBACK;
/* lock-based DIRTY READ */

在这个例子中,事务2回滚后就没有id是1,age是21的数据了。所以,事务一读到了一条脏数据。

不可重复读

发生在已提交读隔离级别
不可重复读,是指在数据库访问中,一个事务范围内两个相同的查询却返回了不同数据。这是由于查询时系统中其他事务修改的提交而引起的。比如事务T1读取某一数据,事务T2读取并修改了该数据,T1为了对读取值进行检验而再次读取该数据,便得到了不同的结果。

一种更易理解的说法是:在一个事务内,多次读同一个数据。在这个事务还没有结束时,另一个事务也访问该同一数据。那么,在第一个事务的两次读数据之间。由于第二个事务的修改,那么第一个事务读到的数据可能不一样,这样就发生了在一个事务内两次读到的数据是不一样的,因此称为不可重复读,即原始读取不可重复。

举例说明:

在基于锁的并发控制中“不可重复读(non-repeatable read)”现象发生在当执行SELECT 操作时没有获得读锁(read locks)或者SELECT操作执行完后马上释放了读锁; 多版本并发控制中当没有要求一个提交冲突的事务回滚也会发生“不可重复读(non-repeatable read)”现象。

事务一 事务二
SELECT * FROM users WHERE id = 1;
- UPDATE users SET age = 21 WHERE id = 1;
COMMIT;
SELECT * FROM users WHERE id = 1;
COMMIT; `

在这个例子中,事务2提交成功,因此他对id为1的行的修改就对其他事务可见了。但是事务1在此前已经从这行读到了另外一个age的值。

幻读

幻读是指当事务不是独立执行时发生的一种现象,例如第一个事务对一个表中的数据进行了修改,比如这种修改涉及到表中的全部数据行。同时,第二个事务也修改这个表中的数据,这种修改是向表中插入一行新数据。那么,以后就会发生操作第一个事务的用户发现表中还有没有修改的数据行,就好象发生了幻觉一样.一般解决幻读的方法是增加范围锁RangeS,锁定检锁范围为只读,这样就避免了幻读。  

幻读(phantom read)”是不可重复读(Non-repeatable reads)的一种特殊场景:当事务没有获取范围锁的情况下执行SELECT … WHERE操作可能会发生“幻影读(phantom read)”。

举例说明:

当事务1两次执行SELECT … WHERE检索一定范围内数据的操作中间,事务2在这个表中创建了(如INSERT)了一行新数据,这条新数据正好满足事务1的“WHERE”子句。

事务一 事务二
SELECT * FROM usersWHERE age BETWEEN 10 AND 30;
- INSERT INTO users VALUES ( 3, 'Bob', 27 );
COMMIT;
SELECT * FROM usersWHERE age BETWEEN 10 AND 30;

在这个例子中,事务一执行了两次相同的查询操作。但是两次操作中间事务二向数据库中增加了一条符合事务一的查询条件的数据,导致幻读。

隔离性级别的实现方式简介

在数据库中使用并发控制机制来保证事务的隔离性,使用并发控制的目的是为了提高事务的并发性,从而提数据库的整体性能。具体的实现有以下几种形式。

一个数据库可以封锁其访问的数据项,而不用封锁整个数据库。这种策略下,事务必须在足够长的时间内持有锁来保证可串行化。但是这一周期又要足够短致使不会过度影响性能。在封锁协议中,俩阶段封锁协议就是一种简单且广泛的确保可串行化的封锁协议:简单的说就是,俩阶段封锁要求一个事务封锁有俩个阶段,一个阶段只获得锁但不释放锁,第二个阶段只释放锁但不获得锁。

当我们有俩种锁,则封锁的结果可以将进一步得到改善:读锁和写锁,

读写锁的概念很平常,当你在读取数据的时候,应该先加读锁,读取完之后的某个时间再解开读锁,那么加了读锁的数据,应该需要有什么特性呢,应该只能读,不能写,因为加了读锁,说明有事务准备读取这个数据,如果被别的事务重写这个数据,那数据就不准确了。所以一个事务给这个数据加了读锁,别的事务也可以对这个数据加读锁,因为大家都是只读不写。

写锁则具有排他性(exclusive lock),当一个事务准备对一个数据进行写操作的时候,先要对数据加写锁,那么数据就是可变的,这时候,其他事务就无法对这个数据加读锁了,除非这个写锁释放。

时间戳

这种是为每一个事务分配一个时间戳,通常当他开始的时候。对于每个数据项,系统维护俩个时间戳。数据项的读时间戳记录读该数据项的事务的最大时间戳。数据项的写时间戳记录写入该数据项的时间戳。时间戳用来确保在访问冲突情况下,事务按照事务的时间戳的顺序来访问数据项。当不能访问时,事务将会终止,并且分配一个新的时间戳重新开始。

多版本和快照隔离

通过维护数据项的多个版本,一个事务允许读取一个旧版本的数据项,而不是被另一个未提交或者在串行化序列中应该排在后面的事务写入的新版本的数据项。有许多的版本控制并发控制技术,其中一种应用比较广泛的就是快照隔离。

在快照隔离中,我们可以想象每个事务开始时有其自身的数据库版本或者快照。他从这个私有的版本中读取数据,因此和其他事务所做的更新隔离开。如果事务更新数据库,更新只出现在其私有版本中,而不是实际的数据库版本中。当事务提交时,和更新有关的信息将被保存,使得更新被写入真正的数据库。当一个事务进入部分提交状态后,只有在没有其他并发事务修改了该事务想要更新的数据项的情况下,事务进入提交状态。而不能提交的事务则终止。

快照隔离可以保证读数据的尝试永远无需等待(不向封锁情况)。只读事务不会中止,只有修改数据的事务有微小的中止风险。由于每个事务读取他自己的数据版本或快照,因此读数据不会导致此后其他事务的更新尝试被迫等待(不想封锁情况)。因为大部分事务是只读的(并且大多数其他事务读数据情况多余更新),所以这是与锁相比往往带来性能改善的主要原因。

总结

本篇文章简单总结了事务不同隔离性级别会带来的问题,以及事务隔离性的实现方式。后面会在写几篇文章来说明事务隔离性实现的具体细节。

参考

  1. 数据库系统概念
  2. 数据库的读现象浅析

从字面意思理解就是信号量,本质上来说是用于线程之间访问共享资源,是一种同步原语,只是访问的资源可能有多个,其实现是通过AQS框架。在我们开发中,经常会碰见使用信号量的场景,比如出于系统性能的考虑需要限流,这时需要控制同时访问共享资源的最大线程数量,或者共享资源是稀缺资源,我们需要有一种办法能够协调各个线程,以保证合理的使用公共资源。
可以看下图来理解
upload successful
有四个线程来共同竞争资源,现在信号量是5,则表明共享资源的数量是5。如果每个线程申请一个资源,则可以同时满足5个线程申请资源,每个线程在使用完之后,需要释放资源。如果在线程在申请资源的时候,没有足够的资源来满足,则会阻塞线程。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package JUC.tools;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**************************************
* Author : zhangke
* Date : 2018/4/20 20:00
* Desc : Semaphore 学习
***************************************/
public class SemaphoreTest1 {

public static void main(String[] args) {
Semaphore sem = new Semaphore(10);
ExecutorService threadPool = Executors.newFixedThreadPool(3);

//在线程池中执行任务
threadPool.execute(new MyThread(sem, 5));
threadPool.execute(new MyThread(sem, 4));
threadPool.execute(new MyThread(sem, 7));

//关闭池
threadPool.shutdown();
}


static class MyThread extends Thread {
private Semaphore sem; //信号量
private int count; //申请信号量的大小


public MyThread(Semaphore sem, int count) {
this.sem = sem;
this.count = count;
}


@Override
public void run() {

try {
//从信号量中获取count个许可
sem.acquire(count);
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName()
+ " acquire count=" + count);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放给定数目的许可,将其返回到信号量。
sem.release(count);
System.out.println(Thread.currentThread().getName()
+ " release " + count + "");
}
}
}
}

上面演示了基本的信号量使用机制,当有线程尝试使用共享资源时,我们要求线程先获得许可(调用Semaphoreacquire方法),这样线程就拥有了权限,否则就需要等待。当使用完资源后,线程需要调用Semaphorerelease方法释放许可。

运行结果如下

1
2
3
4
5
6
pool-1-thread-2 acquire count=4
pool-1-thread-1 acquire count=5
pool-1-thread-1 release 5
pool-1-thread-2 release 4
pool-1-thread-3 acquire count=7
pool-1-thread-3 release 7

从结果可以看出,这有点类似于共享锁,锁的获取可以不用等待锁的释放。但必须满足下面的条件许可数 ≤ 0代表共享资源不可用。许可数 > 0,代表共享资源可用,且多个线程可以同时访问共享资源。

源码分析

类图如下:

upload successful

  1. Semaphore也包含sync对象,sync是Sync类型;而且,Sync是一个继承于AQS的抽象类。
  2. Sync包括两个子类:”公平信号量”FairSync 和 “非公平信号量”NonfairSync。sync是”FairSync的实例”,或者”NonfairSync的实例”;默认情况下,sync是NonfairSync(即,默认是非公平信号量)。

构造函数

1
2
3
4
5
6
7
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

从中,我们可以信号量分为公平信号量(FairSync)和非公平信号量(NonfairSync)。Semaphore(int permits)函数会默认创建非公平信号量。permits表示许可数,可以理解为资源可以被共享的数量。

公平信号量的获取

获取信号量的源码如下:

1
2
3
4
5
6
7
8
9
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public void acquire(int permits) throws InterruptedException {
if (permits < 0)
throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}

从上面可以看出,内部是同过Sync对象的acquireSharedInterruptibly方法来获取,源码如下

1
2
3
4
5
6
7
8
9
10
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 如果线程是中断状态,则抛出异常。
if (Thread.interrupted())
throw new InterruptedException();
// 否则,尝试获取“共享锁”;获取成功则直接返回,
// 获取失败,则通过doAcquireSharedInterruptibly()获取。
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

tryAcquireShared对应公平锁的源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected int tryAcquireShared(int acquires) {
for (;;) {
// 判断当前线程是不是CLH队列中的第一个线程线程,
// 若是的话,则返回-1。
if (hasQueuedPredecessors())
return -1;
// 设置可以获得的信号量的许可数
int available = getState();
// 设置获得acquires个信号量许可之后,剩余的信号量许可数
int remaining = available - acquires;
// 如果剩余的信号量许可数>=0,则设置可以获得的信号量许可数为remaining。
// 设置成功则返回remaining
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

tryAcquireShared()的作用是尝试获取acquires个信号量许可数。对于Semaphore而言,state表示的是当前可获得的信号量许可数。

下面看看AQS中doAcquireSharedInterruptibly的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void doAcquireSharedInterruptibly(long arg)
throws InterruptedException {
// 创建当前线程的Node节点,且Node中记录的锁是共享锁类型;
// 并将该节点添加到CLH队列末尾。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取上一个节点。
// 如果上一节点是CLH队列的表头,则”尝试获取共享锁“。
final Node p = node.predecessor();
if (p == head) {
long r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 当前线程一直等待,直到获取到共享锁。
// 如果线程在等待过程中被中断过,则再次中断该线程(还原之前的中断状态)。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

doAcquireSharedInterruptibly()会使当前线程一直等待,直到当前线程获取到共享锁(或被中断)才返回。主要流程如下:

  1. addWaiter(Node.SHARED)的作用是,创建当前线程的Node节点,且Node中记录的锁的类型是共享锁(Node.SHARED);并将该节点添加到CLH队列末尾。
  2. node.predecessor()的作用是,获取上一个节点。如果上一节点是CLH队列的表头,则尝试获取共享锁。
  3. shouldParkAfterFailedAcquire()的作用和它的名称一样,如果在尝试获取锁失败之后,线程应该等待,则返回true;否则,返回false。当shouldParkAfterFailedAcquire()返回ture时,则调用parkAndCheckInterrupt(),当前线程会进入等待状态,直到获取到共享锁才继续运行。如果检测到时中断导致的返回,则抛出异常。

上面的函数在前面几篇文章中都已经介绍过,这里就不在重复讲,如果不理解可以看这几篇文章JUC 锁介绍

公平信号量的释放

1
2
3
4
5
6
7
8
public void release() {
sync.releaseShared(1);
}
public void release(int permits) {
if (permits < 0)
throw new IllegalArgumentException();
sync.releaseShared(permits);
}

信号量的释放是通过releases()释放函数,实际上调用的AQS中的releaseShared()

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

releaseShared()的目的是让当前线程释放它所持有的共享锁。它首先会通过tryReleaseShared()去尝试释放共享锁。尝试成功,则直接返回;尝试失败,则通过doReleaseShared()去释放共享锁。
Semaphore重写了tryReleaseShared(),它的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 获取“可以获得的信号量的许可数”
int current = getState();
// 获取“释放releases个信号量许可之后,剩余的信号量许可数”
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 设置“可以获得的信号量的许可数”为next。
if (compareAndSetState(current, next))
return true;
}
}

如果tryReleaseShared()尝试释放共享锁失败,则会调用doReleaseShared()去释放共享锁。doReleaseShared()的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void doReleaseShared() {
for (;;) {
// 获取CLH队列的头节点
Node h = head;
// 如果头节点不为null,并且头节点不等于tail节点。
if (h != null && h != tail) {
// 获取头节点对应的线程的状态
int ws = h.waitStatus;
// 如果头节点对应的线程是SIGNAL状态,则意味着“头节点的下一个节点所对应的线程”需要被unpark唤醒。
if (ws == Node.SIGNAL) {
// 设置“头节点对应的线程状态”为空状态。失败的话,则继续循环。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒“头节点的下一个节点所对应的线程”。
unparkSuccessor(h);
}
// 如果头节点对应的线程是空状态,则设置“节点对应的线程所拥有的共享锁”为其它线程获取锁的空状态。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果头节点发生变化,则继续循环。否则,退出循环。
if (h == head) // loop if head changed
break;
}
}

doReleaseShared()会释放共享锁。它会从前往后的遍历CLH队列,依次唤醒然后执行队列中每个节点对应的线程;最终的目的是让这些线程释放它们所持有的信号量。

非公平信号量获取和释放

Semaphore中的非公平信号量是NonFairSync。在Semaphore中,非公平信号量许可的释放(release)与公平信号量许可的释放(release)是一样的。
不同的是它们获取信号量许可的机制不同,下面是非公平信号量获取信号量许可的代码。

非公平信号量的tryAcquireShared()实现如下:

1
2
3
4

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}

nonfairTryAcquireShared()的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 设置“可以获得的信号量的许可数”
int available = getState();
// 设置“获得acquires个信号量许可之后,剩余的信号量许可数”
int remaining = available - acquires;
// 如果“剩余的信号量许可数>=0”,则设置“可以获得的信号量许可数”为remaining。
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

非公平信号量的tryAcquireShared()调用AQS中的nonfairTryAcquireShared()。而在nonfairTryAcquireShared()的for循环中,它都会直接判断当前剩余的信号量许可数是否足够;足够的话,则直接设置可以获得的信号量许可数,进而再获取信号量。
而公平信号量的tryAcquireShared()中,在获取信号量之前会通过if (hasQueuedPredecessors())来判断当前线程是不是在CLH队列的头部,是的话,则返回-1。

总结

Semaphore其实就是实现了AQS共享功能的同步器,对于Semaphore来说,资源就是许可证的数量:

  • 剩余许可证数(State值) - 尝试获取的许可数(acquire方法入参) ≥ 0:资源可用
  • 剩余许可证数(State值) - 尝试获取的许可数(acquire方法入参) < 0:资源不可用
    这里共享的含义是多个线程可以同时获取资源,当计算出的剩余资源不足时,线程就会阻塞。
    注意:Semaphore不是锁,只能限制同时访问资源的线程数,至于对数据一致性的控制,Semaphore是不关心的。当前,如果是只有一个许可的Semaphore,可以当作锁使用。

字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。

可以看下面这个图来理解下:
一共4个线程A、B、C、D,它们到达栅栏的顺序可能各不相同。当A、B、C到达栅栏后,由于没有满足总数4的要求,所以会一直等待,当线程D到达后,栅栏才会放行。

upload successful

阅读全文 »

正如每个Java文档所描述的那样,CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。在Java并发中,countdownlatch的概念是一个常见的面试题,所以一定要确保你很好的理解了它。

CountDownLatch是什么

CountDownLatch是在java1.5被引入的,跟它一起被引入的并发工具类还有CyclicBarrier和Semaphore,它们都存在于java.util.concurrent包下,后面会讲解另外俩个。CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。

CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

upload successful

如上图:TA主线程会一直等待,等待T1、T2和T3将计数器减为0,才继续执行。

阅读全文 »

ReentrantReadWriteLock 简介

在前面我们已经分析过JUC中的独占锁:ReentrantLock。本篇文章将对JUC的读写锁ReentrantReadWriteLock进行介绍。

类图如如下:

类图

从上图可以看出ReentrantReadWriteLock实现了ReadWriteLock接口,而这个接口从名字就可以看出是读写锁。它维护了一对相关连的锁:读锁和写锁。作用如下

  • 读锁:用于只读操作,不会修改共享数据。是共享锁,能够同时被多个线程锁获取。
  • 写锁:用于写入操作,是独占锁,只能被一个线程锁获取。

而这个接口提供了俩个抽象函数,获取读锁的readLock()函数和获取写锁的writeLock()函数。

ReentrantReadWriteLock中包含:Sync对象,读锁ReadLock和写锁WriteLock。

读锁ReadLock和写锁WriteLock都实现了Lock接口。读锁ReadLock和写锁WriteLock中也都分别包含了相同的Sync对象,里面所有的功能实现也都是靠这个对象。它们的Sync对象和ReentrantReadWriteLock的Sync对象是一样,就是通过sync,读锁和写锁实现了对同一个对象的访问。

和ReentrantLock一样,Sync也是一个继承于AQS的抽象类。Sync也包括公平锁FairSync和非公平锁NonfairSync。在创建读写锁时可以选择其中俩个其中一个,默认是NonfairSync。

公平读写锁源码分析

这里我们先对公平锁方式实现的读写锁进行源码分析,首先把后面要用到的属性在这里写出来,方便后面源码的理解:

1
2
3
4
5
6
7
8
9

// 内部使用的读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
// 内部使用的写锁
private final ReentrantReadWriteLock.WriteLock writerLock;

// 读锁和写锁共同使用的锁类型,可以是公平锁和非公平锁
final Sync sync;

这里先看看构造函数和如何获取读锁和写锁

1
2
3
4
5
6
7
8
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }

从上面可以看出,在创建ReentrantReadWriteLock对象时就会根据是否选择公平锁来创建一个sync锁对象。然后分别创建响应的读锁和写锁。后面获取和使用的读写锁都是在构造函数中创建出来的。

下面开始首先对读锁的获取和释放进行分析。

读锁的获取(公平锁篇)

读锁也就是共享锁,获取锁的源码如下:

1
2
3
4
5
6
7
8
9
10
11
// ReadLock 类中
public void lock() {
// 获取共享锁
sync.acquireShared(1);
}

//AQS 类中
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

从上面可以看出,这里调用的是AQS类中的acquireShared来获取锁。参数和ReentrantLock一样,表示获取锁的数量,1表示当前获取一把共享锁。锁的状态也会加1.

acquireShared()首先会通过tryAcquireShared()来尝试获取锁。尝试成功的话,直接返回。尝试失败的话,则通过doAcquireShared()来获取锁。doAcquireShared()会获取到锁才返回。

tryAcquireShared

尝试获取共享锁,此函数定义在Sync类中,源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
protected final int tryAcquireShared(int unused) {

Thread current = Thread.currentThread();
// 获取锁的状态
int c = getState();
// 如果锁被独占锁获取并且获取独占锁的线程不是当前线程,
// 直接返回-1 达标获取锁失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取读锁的共享计数
int r = sharedCount(c);
// 判断不需要阻塞,并且已经获取读锁的数量小于MAX_COUNT
// 则通过CAS函数更新读锁的状态,将读锁的共享计数加1
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 第一次获取读锁
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;

} else if (firstReader == current) {
// 如果当前获取锁的线程是第一个获取读锁的线程
firstReaderHoldCount++;
} else {
// HoldCounter是用来统计该线程获取“读取锁”的次数。
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
// 将该线程获取“读取锁”的次数+1。
rh.count++;
}
return 1;
}
// 如果获取读锁失败,则通过下面函数来进行获取读锁
return fullTryAcquireShared(current);
}

上面流程比较清晰,但是有很多地点可能看不明白,先跳过,看完后面所有的分析,你就会明白。先总结上面的流程。

  1. 判断当前锁是否是独占锁,如果是并且独占锁的线程和当前获取锁的线程不相同,则直接返回-1,获取读锁失败。
  2. 如果当前线程不应该被阻塞,并且已获取读锁的数量小于最大值,则尝试使用CAS更改读锁的状态值。如果操作成功,进行下一步,操作失败进入最后一步。
  3. 这一步主要设置每一个线程获取读锁的数量,主要分为三类来讨论:
    1. 如果是第一个线程来获取读锁,则设置firstReader为当前线程和当前线程拥有的读锁数量为1.
    2. 如果不是,则判断当前线程和firstReader线程是否一样,如果一样,则当前线程获取读锁的数量加1
    3. 以上都不是,则通过HoldCounter来对当前线程获取读锁的数量加1,而HoldCounter是一个ThreadLocal对象。保证每个线程都有一个不一样的HoldCounter变量。下面会详细解释
  4. 如果上面没有成功获取到读锁,但也没有返回。则通过fullTryAcquireShared来获取锁

下面对上面每一步使用到的函数进行详细的解释。

计算读锁和写锁已被获取的数量

1
2
3
4
5
6
7
8
9
static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

// 返回共享锁的数量
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 返回独占说的数量
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

从上面可以看出,读锁使用state的高16位来表示数量,而写锁则使用低16位来表示数量。然后通过后面的俩个函数来分别计算对应的数量。

readerShouldBlock

判断当前获取读锁的线程是否应该阻塞,源码在FairSync中,源码如下

1
2
3
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}

代码比较简单,就是判断当前线程是否是队列中的第一个节点,如果是,则不需要阻塞,不是则需要阻塞。具体的和前面ReentrantLock中的一样,这里具体分析。

HoldCounter

计算每个线程获取读锁的数量,这里HoldCounter是ThreaLocal类型的变量,如果不了解这个对象,可以看这篇文章深入分析ThreadLocal,在分析这个之前,首先看一些定义在Sync类中的属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 计数器对象,用于记录每个线程保持读锁的数量
// 这个对象被记录在ThreadLocal中,缓存在cachedHoldCounter
static final class HoldCounter {
int count = 0;
final long tid = getThreadId(Thread.currentThread());
}

// 自定的ThreadLocal对象,设置初始化方法
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {

public HoldCounter initialValue() {
return new HoldCounter();
}
}
// 记录当前线程获取读锁的数量,在构造器中初始化,当保持的读锁数量为空的时候删除
private transient ThreadLocalHoldCounter readHolds;

// 用于记录上一个线程成功获取读锁的数量
private transient HoldCounter cachedHoldCounter;

// 下面来个一个是记录第一个获取读锁的线程和获取读锁的数量
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;

有了上面的预备知识,下面可以解释tryAcquireShared中的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 第一次获取读锁
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;

} else if (firstReader == current) {
// 如果当前获取锁的线程是第一个获取读锁的线程
firstReaderHoldCount++;
} else {
// HoldCounter是用来统计该线程获取“读取锁”的次数。
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
// 将该线程获取“读取锁”的次数+1。
rh.count++;
}
  1. 首先判断是否是第一个线程获取读锁,如果是,则设置firstReader和firstReaderHoldCount值,可以加快后续此线程的获取读锁和记录读锁的数量。
  2. 判断线程是否是firstReader,如果是直接使用firstReaderHoldCount进行累加,可以加快获取的速度。
  3. 前面俩个都不是,则获取cachedHoldCounter,判断这个变量中保存的线程id是否和当前线程对应的id相同,如果是,则判断当前读锁的数量是否为0,如果为0,则调用 readHolds.set(rh)初始化这个对象然后在原有的读锁数量上加1。
  4. 不是则通过readHold获取当前线程对应的HoldCounter,并缓存在cachedHoldCounter中,加速下一次的操作,接着读锁数量加1。

fullTryAcquireShared

这个是tryAcquireShared的最后一步,也就是前面没有获取到共享锁,才会走到这一步,源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
final int fullTryAcquireShared(Thread current) {
// 下面这部分代码和tryAcquireShared有一部分是重复的。
// 但是tryAcquireShared只是先尝试获取,但是如果出现竞争则获取
// 不到共享锁,即前面那部分加快锁的获取。下面这部分通过
// 循环尝试,保证如果可以获取读锁,则一定获取到
HoldCounter rh = null;
for (;;) {
// 获取锁的状态
int c = getState();
// 如果是独占锁,并且获取锁的线程不是current线程;则返回-1。
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;

// 如果需要阻塞等待。
// 当需要阻塞等待的线程是第1个获取锁的线程的话,则继续往下执行。
// 当需要阻塞等待的线程获取锁的次数为0时,则返回-1。
} else if (readerShouldBlock()) {
// 确保不是有一次获取读锁
if (firstReader == current) {
//忽略
} else {
// 获取当前线程获取读锁的数量,如果为0,则调用
// ThreadLocal.remove删除这个ThreadLocal
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
// 如果当前线程获取读锁的计数=0,则返回-1。
// 表示还没有获取过读锁,不在这里进行获取,
// 则需要阻塞获取读锁的进程
if (rh.count == 0)
return -1;
}
}
// 则获取读取锁的共享统计数;
// 如果共享统计数超过MAX_COUNT,则抛出异常。
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 将线程的读取锁次数加1
// 这一步和上面一样,就不具体解释
// 放在这里主要是因为CAS失败,如果失败,进入下一次循环
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

fullTryAcquireShared()会根据是否需要阻塞等待,读取锁的共享计数是否超过限制进行处理。如果不需要阻塞等待,并且锁的共享计数没有超过限制,则通过CAS尝试获取锁,并返回1。

至此tryAcquireShared已经解析完成,这里做一个总结:tryAcquireShared将代码分成俩个大部分

  1. 首先通过尝试获取锁,如果获取成功直接返回。这是为了加快获取锁。
  2. 如果没有获取成功,说明CAS失败则进入fullTryAcquireShared函数进行获取,这里会循环获取,直到CAS交换成功。

当然我只是说了一个精简的过程。具体的可以看上面。其他异常情况我也没有总结。

doAcquireShared

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private void doAcquireShared(int arg) {
// 创建当前线程对应的节点,并将该线程添加到CLH队列中。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前继节点
final Node p = node.predecessor();
// 如果当前节点是头结点,尝试获取锁
if (p == head) {
int r = tryAcquireShared(arg);
// 获取成功设置节点为可传播状态,
// 然后释放后继获取读锁的节点
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 如果当前线程不是CLH队列的表头,
// 则通过shouldParkAfterFailedAcquire()判断是否需要等待,
// 需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。
// 若阻塞等待过程中,线程被中断过,则设置interrupted为true。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 上面出现异常,则取消当前节点
if (failed)
cancelAcquire(node);
}
}

doAcquireShared()的作用是获取共享锁,流程如下

  1. 创建线程对应的CLH队列的节点,然后将该节点添加到CLH队列中。CLH队列是管理获取锁的等待线程的队列。
  2. 获取前继节点,判断当前节点是否是表头,如果当前线程是CLH队列的表头,则尝试获取共享锁;如果获取成功,则释放后继等待获取获取共享锁的线程。然后判断是否中断过,如果产生过中断,则调动中断函数产生一次中断。
  3. 上一步没有成功获取锁,需要通过shouldParkAfterFailedAcquire()判断是否阻塞等待,需要阻塞,则通过parkAndCheckInterrupt()进行阻塞等待。

doAcquireShared()会通过for循环,不断的进行上面的操作;目的就是获取共享锁。需要注意的是:doAcquireShared()在每一次尝试获取锁时,是通过tryAcquireShared()来执行的!

其实和前面获取独占锁的流程差不多,只不过这里会有一个释放后继获取共享锁的节点。这一步放到下面讲解共享锁的释放中来说。

读锁的释放(公平锁)

释放锁是调用下面的函数,源码如下:

1
2
3
4
5
6
7
8
9
10
11
public void unlock() {
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

上面的过程比较简单,先通过tryReleaseShared释放共享锁,尝试失败则直接返回;如果释放成功,则通过doReleaseShared()去释放共享锁并唤醒后继节点。

tryReleaseShared

tryReleaseShared()定义在ReentrantReadWriteLock中,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
protected final boolean tryReleaseShared(int unused) {

// 获取当前线程,即释放共享锁的线程。
Thread current = Thread.currentThread();

// 如果想要释放锁的线程(current)是第1个获取锁(firstReader)的线程,
// 并且第1个获取锁的线程获取锁的次数=1,则设置firstReader为null;
// 否则,将第1个获取锁的线程的获取次数-1。
if (firstReader == current) {
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
// 获取rh对象,并更新当前线程获取锁的信息。
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
// 获取锁的状态
int c = getState();
// 将锁的获取次数-1。
int nextc = c - SHARED_UNIT;
// 通过CAS更新锁的状态。通过判断锁的状态是否为0来判断锁是否可以释放。
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

主要流程如下:

  1. 判断是否是当前线程,如果是,则将当前线程持有的读锁数量减1,
  2. 如果不是,则判断是否是缓存的线程,如果是,将读锁数量减1
  3. 以上都不是,则获取ThreadLocal,并将数量减1
  4. 循环CAS将state值减1,如果变成0,则说明释放锁成功

下面来看doReleaseShared

doReleaseShared

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void doReleaseShared() {
for (;;) {
// 获取CLH队列的头节点
Node h = head;

// 如果头节点不为null,并且头节点不等于tail节点。
if (h != null && h != tail) {
// 获取头节点对应的线程的状态
int ws = h.waitStatus;

// 如果头节点对应的线程是SIGNAL状态,
// 则意味着头节点的下一个节点所对应的线程需要被unpark唤醒。
if (ws == Node.SIGNAL) {
// 设置头节点对应的线程状态为空状态。失败的话,则继续循环。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒头节点的下一个节点所对应的线程。
unparkSuccessor(h);
}
// 如果头节点对应的线程的状态值是0,
// 则设置头结点的状态为PROPAGATE状态,等待有线程来获取锁才会结束循环。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果头节点没有改变,则继续循环。否则,退出循环。
if (h == head) // loop if head changed
break;
}
}

doReleaseShared()会释放共享锁:流程如下:

  1. 判断队列是否为空,如果为空则结束循环,不进行唤醒。
  2. 如果不为空,则判断头结点是否为SIGNAL状态吗,如果是,则设置状态为0,然后唤醒后继获取锁的节点可以是独占或者共享锁。如果唤醒成功,头结点会改变,这一在最后一步就会推出这个循环
  3. 如果头结点状态为0,则设置状态为PROPAGATE,然后继续循环。
  4. 如果头结点发生改变,则继续循环。

主要流程如上,但是为什么要一直循环这是我不明白的地点。

公平共享锁和非公平共享锁

和互斥锁ReentrantLock一样,ReadLock也分为公平锁和非公平锁。

公平锁和非公平锁的区别,体现在判断是否需要阻塞的函数readerShouldBlock()的不同。
公平锁的readerShouldBlock()的源码如下:

1
2
3
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}

在公平共享锁中,如果在当前线程的前面有其他线程在等待获取共享锁,则返回true;否则,返回false。
非公平锁的readerShouldBlock()的源码如下:

1
2
3
4
5
6
7
8
9
10
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}

在非公平共享锁中,它会无视当前线程的前面是否有其他线程在等待获取共享锁。只要该非公平共享锁对应的线程不为null,则返回true。也就是当前锁的类型是共享锁,并且还没有释放。

写锁

写锁的获取和ReentrantLock中独占锁的获取是一样的,这里就不在单独说明。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package JUC.locks;

import lombok.Data;
import lombok.Getter;
import lombok.Setter;

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**************************************
* Author : zhangke
* Date : 2018/4/18 22:13
* Desc : 读写锁
***************************************/
public class ReadWriteLockTest {

public static void main(String[] args) {
// 创建账户
MyCount myCount = new MyCount("4238920615242830", 10000);

// 创建用户,并指定账户
User user = new User("Tommy", myCount);

// 分别启动3个“读取账户金钱”的线程 和 3个“设置账户金钱”的线程
for (int i = 0; i < 3; i++) {
user.getCash();
user.setCash((i + 1) * 1000);
}
}


static class User {
private String name; //用户名

private MyCount myCount; //所要操作的账户

private ReadWriteLock myLock; //执行操作所需的锁对象


public User(String name, MyCount myCount) {
this.name = name;
this.myCount = myCount;
this.myLock = new ReentrantReadWriteLock();

}


public void getCash() {
new Thread(() -> {
try {
myLock.readLock().lock();
System.out.println(Thread.currentThread().getName() + " getCash start");
myCount.getCash();
Thread.sleep(1);
System.out.println(Thread.currentThread().getName() + " getCash end");
} catch (InterruptedException e) {

} finally {
myLock.readLock().unlock();
}
}).start();
}


public void setCash(final int cash) {
new Thread(() -> {
try {
Thread.sleep(100);
myLock.writeLock().lock();
System.out.println(Thread.currentThread().getName()
+ " setCash start");
myCount.setCash(cash);
System.out.println(Thread.currentThread().getName()
+ " setCash end");
} catch (InterruptedException e) {

} finally {
myLock.writeLock().unlock();
}
}).start();
}
}

static class MyCount {

@Getter
@Setter
private String id; //账户id

private int cash; //现金


public MyCount(String id, int cash) {
this.id = id;
this.cash = cash;
}


public int getCash() {
System.out.println(Thread.currentThread().getName() + " getCash" +
" cash= " + cash);
return cash;
}


public void setCash(int cash) {
System.out.println(Thread.currentThread().getName() + " setCash" +
" cash= " + cash);
this.cash = cash;
}
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Thread-0 getCash start
Thread-0 getCash cash= 10000
Thread-0 getCash end
Thread-2 getCash start
Thread-2 getCash cash= 10000
Thread-4 getCash start
Thread-4 getCash cash= 10000
Thread-2 getCash end
Thread-4 getCash end
Thread-5 setCash start
Thread-5 setCash cash= 3000
Thread-5 setCash end
Thread-3 setCash start
Thread-3 setCash cash= 2000
Thread-3 setCash end
Thread-1 setCash start
Thread-1 setCash cash= 1000
Thread-1 setCash end

从上面可以观察到读锁是可以共享,也就是读锁的打印的语句不一定是start-end连着的。但是写锁一定是。

总结

获取锁的流程:

  1. 判断当前锁是否是独占锁,如果是并且独占锁的线程和当前获取锁的线程不相同,则直接返回-1,获取读锁失败。
  2. 如果当前线程不应该被阻塞,并且已获取读锁的数量小于最大值,则尝试使用CAS更改读锁的状态值。如果操作成功,将state加1,表示有多少个线程获取过锁,进行下一步,操作失败进入第四步。
  3. 这一步主要设置每一个线程获取读锁的数量,主要分为三类来讨论:
    1. 如果是第一个线程来获取读锁,则设置firstReader为当前线程和当前线程拥有的读锁数量为1.
    2. 如果不是,则判断当前线程和firstReader线程是否一样,如果一样,则当前线程获取读锁的数量加1
    3. 以上都不是,则通过HoldCounter来对当前线程获取读锁的数量加1,而HoldCounter是一个ThreadLocal对象。保证每个线程都有一个不一样的HoldCounter变量。下面会详细解释
  4. 如果上面没有成功获取到读锁,但也没有返回。则通过fullTryAcquireShared来获取锁,就是将第三步包装成一个循环来进行获取。
  5. 如果上面都没有获取到,则通过doAcquireShared进行获取,如果获取失败则阻塞当前线程等待唤醒

释放锁的流程

  1. 判断是否是当前线程,如果是,则将当前线程持有的读锁数量减1,
  2. 如果不是,则判断是否是缓存的线程,如果是,将读锁数量减1
  3. 以上都不是,则获取ThreadLocal,并将数量减1
  4. 循环CAS将state值减1,如果变成0,则说明释放锁成功
  5. 如果释放锁成功,则走下面的流程,如果失败,直接返回false
  6. 判断队列是否为空,如果为空则继续循环。
  7. 如果不为空,则判断头结点是否为SIGNAL状态吗,如果是,则设置状态为0,然后唤醒后继获取锁的节点可以是独占或者共享锁。如果唤醒成功,头结点会改变,这一在最后一步就会推出这个循环
  8. 如果头结点状态为0,则设置状态为PROPAGATE,然后继续循环。
  9. 如果头结点发生改变,则继续循环。

参考

  1. Java多线程系列–“JUC锁”08之 共享锁和ReentrantReadWriteLock

概述

  1. 用法简介
  2. 源码分析
  3. 底层实现原理

1. 用法简介

LockSupport是用来创建锁和其他同步器的基本线程阻塞原语。LockSupport提供park()和unpark()方法实现阻塞线程和解除线程阻塞。每个使用LockSupport的线程都与一个许可(permit)关联,permit相当于开关,默认是0,调用一次unpark就加1变成1,调用一次park会消费permit, 也就是将1变成0,同时park立即返回。再次调用park会变成block(因为permit为0,会阻塞在这里,直到permit变为1), 这时调用unpark会把permit置为1。每个线程都有一个相关的permit, permit最多只有一个,重复调用unpark也不会积累。

park()和unpark()不会有Thread.suspend和Thread.resume所可能引发的死锁问题。这个死锁问题的产生是由于Thread.resume在Thread.suspend之前调用,使得线程忽略了解除阻塞的信号,而使得线程一直被阻塞。而LockSupport由于许可的存在,调用park的线程和另一个试图将其unpark的线程之间的竞争将保持活性。不会因为前后调用的顺序而产生死锁

如果调用线程被中断,则park方法会返回。同时park也拥有可以设置超时时间的版本。

阅读全文 »

简单介绍

Condition的作用是对锁进行更精确的控制。Condition中的await()方法相当于Object的wait方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的wait()、notify()、notifyAll()方法是和同步锁(synchronized关键字)捆绑使用的;而Condition是需要与斥锁/共享锁捆绑使用的。互斥锁前面已经说过一个ReentrantLock,后还会说道ReentrantReadWriteLock共享锁。

阅读全文 »