0%

概述

首先要明白范式(NF)是什么意思。按照教材中的定义,范式是符合某一种级别的关系模式的集合,表示一个关系内部各属性之间的联系的合理化程度。实际上你可以把它粗略地理解为一张数据表的表结构所符合的某种设计标准的级别。就像家里装修买建材,最环保的是E0级,其次是E1级,还有E2级等等。数据库范式也分为1NF,2NF,3NF,BCNF,4NF,5NF。一般在我们设计关系型数据库的时候,最多考虑到BCNF就够。符合高一级范式的设计,必定符合低一级范式,例如符合2NF的关系模式,必定符合1NF。

同时应用数据库范式可以带来许多好处,但是最重要的好处归结为三点:

  1. 减少数据冗余(这是最主要的好处,其他好处都是由此而附带的)
  2. 消除异常(插入异常,更新异常,删除异常)
  3. 让数据组织的更加准确。
  4. 但剑是双刃的,应用数据库范式同样也会带来弊端,这会在文章后面说到。

第一范式

1NF的定义为:符合1NF的关系中的每个属性都不可再分

数据库表的每一列都是不可分割的基本数据项,同一列中不能有多个值,即实体中的某个属性不能有多个值或者不能有重复的属性。(保持数据的原子性)

表1所示的情况,就不符合1NF的要求。
表1

实际上,1NF是所有关系型数据库的最基本要求,你在关系型数据库管理系统(RDBMS),例如SQL Server,Oracle,MySQL中创建数据表的时候,如果数据表的设计不符合这个最基本的要求,那么操作一定是不能成功的。也就是说,只要在RDBMS中已经存在的数据表,一定是符合1NF的。如果我们要在RDBMS存储表中的数据,就得设计为表2的形式:
表2
但是仅仅符合1NF的设计,仍然会存在数据冗余过大,插入异常,删除异常,修改异常的问题,例如对于表3中的设计:
表三

  1. 数据冗余过大:每一名学生的学号、姓名、系名、系主任这些数据重复多次。每个系与对应的系主任的数据也重复多次
  2. 插入异常:假如学校新建了一个系,但是暂时还没有招收任何学生(比如3月份就新建了,但要等到8月份才招生),那么是无法将系名与系主任的数据单独地添加到数据表中去的 (注2)
    • 注1:码:关系中的某个属性或者某几个属性的组合,用于区分每个元组(可以把元组理解为一张表中的每条记录,也就是每一行)。
    • 注2:根据三种关系完整性约束中实体完整性的要求,关系中的码所包含的任意一个属性都不能为空,所有属性的组合也不能重复。为了满足此要求,图中的表,只能将学号与课名的组合作为码,否则就无法唯一地区分每一条记录。
  3. 删除异常:假如将某个系中所有学生相关的记录都删除,那么所有系与系主任的数据也就随之消失了(一个系所有学生都没有了,并不表示这个系就没有了)。
  4. 修改异常:假如李小明转系到法律系,那么为了保证数据库中数据的一致性,需要修改三条记录中系与系主任的数据。

正因为仅符合1NF的数据库设计存在着这样那样的问题,我们需要提高设计标准,去掉导致上述四种问题的因素,使其符合更高一级的范式(2NF),这就是所谓的规范化。

第二范式

在满足第一范式的基础上,实体的每个非主键属性完全函数依赖于主键属性(消除部分依赖)

接下来对这句话中涉及到的概念——函数依赖非主属性 进行解释。

函数依赖

若在一张表中,在属性(或属性组)X的值确定的情况下,必定能确定属性Y的值,那么就可以说Y函数依赖于X,写作 X → Y。也就是说,在数据表中,不存在任意两条记录,它们在X属性(或属性组)上的值相同,而在Y属性上的值不同。这也就是函数依赖名字的由来,类似于函数关系 y = f(x),在x的值确定的情况下,y的值一定是确定的。

例如,对于表3中的数据,找不到任何一条记录,它们的学号相同而对应的姓名不同。所以我们可以说姓名函数依赖于学号,写作学号 → 姓名。但是反过来,因为可能出现同名的学生,所以有可能不同的两条学生记录,它们在姓名上的值相同,但对应的学号不同,所以我们不能说学号函数依赖于姓名。表中其他的函数依赖关系还有如:

  • 系名 → 系主任
  • 学号 → 系主任
  • (学号,课名) → 分数

但是以下函数依赖关系则不成立

  • 学号->课名
  • 学号->分数
  • 课名->系主任

从函数依赖这个概念展开,还会有三个概念:

完全函数依赖

在一张表中,如果X->Y,且对于X的任何一个真子集(假如属性组X包含超过一个属性的话,如果是一个那么真子集就是自己),任意真子集用X1表示,X1->Y都不成立,那么我们称Y对于X完全函数依赖,记做:
upload successful

继续用上面表三来做例子:

  • 姓名完全依赖于学号
  • 分数完全依赖于(学号,课名),(一个学生可能有多门课,因此有多个分数,一个课程有多个学生,就有多个分数)

部分函数依赖:

假如Y函数依赖于X,但同时Y并不完全函数依赖于X。那么我们就称Y部分依赖于X,记作:

upload successful

上面定义比较绕,我们可以这样理解,当主键由两个或两个以上字段构成,而表中的某些信息通过主键的一个字段就能唯一确定,我们称这样的依赖关系为部分依赖

比如:

  • 姓名完全依赖于(学号,课名),但是学号本身就可以决定姓名。

传递函数依赖

假如Z函数依赖于Y,且Y函数依赖于X ,同时Y不包含于X,且X不函数依赖于Y这个前提,那么我们就称Z传递函数依赖于X ,简单的理解就是A依赖于B,B依赖于C,就可以说A依赖C,记作

upload successful

码、候选码和主码

:简单的说就是能唯一标识实体的属性。如果用前面函数依赖关系来解释,假设K是某表中的一个属性或属性组,若除K之外的所有属性都完全函数依赖于K,那么我们称K为候选码,简称为码。在实际中我们通常可以理解为:假如当 K 确定的情况下,该表除 K 之外的所有属性的值也就随之确定,那么K就是码。

一张表中可以有超过一个码。(实际应用中为了方便,通常选择其中的一个码作为主码)主码我们在建立数据库的时候,需要为每张表指定一个主码,主码也叫主键。所谓主码就是在实体集中区分不同实体的候选码。

例如:在学生实体中,“学号”是能唯一的区分学生实体的,同时又假设“姓名”、“年龄”的属性组合足以区分学生实体,那么{学号}和{姓名,年龄}都是候选码。而我们选择学号极少变化,比较稳定,因此我们选他做主码。

主属性与非主属性

主属性:包含在任一候选码中的属性称主属性。

非主属性 不包含在候选码中的属性称为非主属性。
非主属性是相对与主属性来定义的。
例如:在关系——学生(学号,姓名,年龄,性别,班级)中,

主码是“学号”,那么其他的“姓名”、“年龄”、“性别”、“班级”就都可以称为非主属性

判断表是否符合2NF

接着以上面的表3来举例,看看表3符合第二范式吗?判断的依据实际上就是看数据表中是否存在非主属性对于码的部分函数依赖。若存在,则数据表最高只符合1NF的要求,若不存在,则符合2NF的要求。判断的方法是:

第一步:找出数据表中所有的
第二步:根据第一步所得到的码,找出所有的主属性
第三步:数据表中,除去所有的主属性,剩下的就都是非主属性了。
第四步:查看是否存在非主属性对码的部分函数依赖

下面我们一步步的来看:

第一步:找出数据表中所有的码

  1. 查看所有每一单个属性,当它的值确定了,是否剩下的所有属性值都能确定。
  2. 查看所有包含有两个属性的属性组,当它的值确定了,是否剩下的所有属性值都能确定。
  3. ……
  4. 查看所有包含了六个属性,也就是所有属性的属性组,当它的值确定了,是否剩下的所有属性值都能确定。

看起来很麻烦是,但是这里有一个诀窍,就是假如A是码,这里的A可能是一个单独属性也可能是多个属性,那么所有包含了A的属性组,如(A,B)、(A,C)、(A,B,C)等等,都不是码了(因为码的要求里有一个“完全函数依赖要求)。

下图是表3中所有的函数依赖关系:

upload successful

这一步完成以后,可以得到,表3的码只有一个,就是(学号、课名)。

第二步:根据第一步所得到的码,找出所有的主属性

主属性有俩个:学号与课名

第三步:找出所有的非主属性

非主属性有四个:姓名系名系主任分数

第四步:是否存在非主属性对码的部分函数依赖

对于(学号,课名) → 姓名,有 学号 → 姓名,存在非主属性 姓名 对码(学号,课名)的部分函数依赖。
对于(学号,课名) → 系名,有 学号 → 系名,存在非主属性 系 对码(学号,课名)的部分函数依赖。
对于(学号,课名) → 系主任,有 学号 → 系主任,存在非主属性 对码(学号,课名)的部分函数依赖。

所以表3存在非主属性对于码的部分函数依赖,最高只符合1NF的要求,不符合2NF的要求。

为了让表3符合2NF的要求,我们必须消除这些部分函数依赖,只有一个办法,就是将大数据表拆分成两个或者更多个更小的数据表,在拆分的过程中,要达到更高一级范式的要求,这个过程叫做模式分解。模式分解的方法不是唯一的,以下是其中一种方法:
选课(学号,课名,分数)
学生(学号,姓名,系名,系主任)

我们先来判断以下,选课表与学生表,是否符合了2NF的要求?

对于选课表,其码是(学号,课名),主属性是学号课名,非主属性是分数学号确定,并不能唯一确定分数课名确定,也不能唯一确定分数,所以不存在非主属性分数对于码 (学号,课名)的部分函数依赖,所以此表符合2NF的要求。

对于学生表,其码是学号,主属性是学号,非主属性是姓名、系名系主任,因为码只有一个属性,所以不可能存在非主属性对于码 的部分函数依赖,所以此表符合2NF的要求。

下表表示了模式分解以后的新的函数依赖关系

upload successful

下图是相对应上面表3进行模式分解之后的数据表4图

表4

这里来看看相对于上面的第一范式出现的问题,使用上表进行同样的操作,是否还存在问题?

  1. 数据冗余减少:从上表可以明显的看出,学生的姓名,系主任和系名明显的减少。有改进
  2. 修改异常:如果李晓明转到法律系,这里只需要修改一条数据。有改进
  3. 删除异常:删除某个系中所有的学生记录,改系的信息任然全部丢失。无改进。
  4. 插入异常:插入一个尚无学生的新系的信息,因为学生的学号是主码。不能为空,所以操作不允许。无改进

从上面可以看出,仅仅符合2NF还是不够。在于仍然存在非主属性系主任对于码学号的传递函数依赖。为了能进一步解决这些问题,我们还需要将符合2NF要求的数据表改进为符合3NF的要求。

第三范式

在2NF的基础之上,消除了非主属性对于码的传递函数依赖

接下来看看我们上面分解出来的表4是否符合3NF的要求:

对于选课表,主码为(学号,课名),主属性为学号和课名,非主属性只有一个,为分数,不可能存在函数传递依赖,所以此表符合3NF。

对于学生表,主码为学号,主属性为学号,非主属性姓名、系名和系主任。因为学号->系名,同时系名->系主任,所以存在非主属性系主任对于主码学号的函数传递依赖,因此学生表不符合3NF。

为了让数据表设计达到3NF,我们必须进一步进行模式分解为以下形式:
选课(学号,课名,分数)
学生(学号,姓名,系名)
系(系名,系主任)

对于选课表,符合3NF的要求,之前已经分析过了。

对于学生表,码为学号,主属性为学号,非主属性为系名,不可能存在非主属性对于码的传递函数依赖,所以符合3NF的要求。

对于表,码为系名,主属性为系名,非主属性为系主任,不可能存在非主属性对于码的传递函数依赖(至少要有三个属性才可能存在传递函数依赖关系),所以符合3NF的要求。

新的依赖关系如下图:

upload successful

新的数据表如下图

upload successful

现在我们来看一下,进行同样的操作,是否还存在着之前的那些问题?

  1. 删除某个系中所有的学生记录
    该系的信息不会丢失。——有改进
  2. 插入一个尚无学生的新系的信息。
    因为系表与学生表目前是独立的两张表,所以不影响。——有改进
  3. 数据冗余更加少了。——有改进

由此可见,符合3NF要求的数据库设计,基本上解决了数据冗余过大,插入异常,修改异常,删除异常的问题。当然,在实际中,往往为了性能上或者应对扩展的需要,经常 做到2NF或者1NF,但是作为数据库设计人员,至少应该知道,3NF的要求是怎样的。

BCNF 范式

要了解 BCNF 范式,那么先看这样一个问题:

  1. 某公司有若干个仓库;
  2. 每个仓库只能有一名管理员,一名管理员只能在一个仓库中工作;
  3. 一个仓库中可以存放多种物品,一种物品也可以存放在不同的仓库中。每种物品在每个仓库中都有对应的数量。

那么关系模式仓库(仓库名,管理员,物品名,数量) 属于哪一级范式?

答:已知函数依赖集:仓库名 → 管理员,管理员 → 仓库名,(仓库名,物品名)→ 数量
码:(管理员,物品名),(仓库名,物品名)
主属性:仓库名、管理员、物品名
非主属性:数量
所以不存在非主属性对码的部分函数依赖和传递函数依赖,此关系模式属于3NF。

基于此关系模式的关系(具体的数据)可能如图所示:

upload successful

好,既然此关系模式已经属于了 3NF,那么这个关系模式是否存在问题呢?我们来看以下几种操作:

  1. 先新增加一个仓库,但尚未存放任何物品,是否可以为该仓库指派管理员?——不可以,因为物品名也是主属性,根据实体完整性的要求,主属性不能为空。
  2. 某仓库被清空后,需要删除所有与这个仓库相关的物品存放记录,会带来什么问题?——仓库本身与管理员的信息也被随之删除了。
  3. 如果某仓库更换了管理员,会带来什么问题?——这个仓库有几条物品存放记录,就要修改多少次管理员信息。

从这里我们可以得出结论,在某些特殊情况下,即使关系模式符合 3NF 的要求,仍然存在着插入异常,修改异常与删除异常的问题,仍然不是好的设计。

造成此问题的原因:存在着主属性对于码的部分函数依赖与传递函数依赖。(在此例中就是存在主属性【仓库名】对于码【(管理员,物品名)】的部分函数依赖。

解决办法就是要在 3NF 的基础上消除主属性对于码的部分与传递函数依赖。

仓库(仓库名,管理员)
库存(仓库名,物品名,数量)

这样,之前的插入异常,修改异常与删除异常的问题就被解决了。

总结

从上面可以看出,在设计表的时候,通过提高表的数据库范式,可以减少数据冗余,删除异常,插入异常和修改异常。

但是数据库范式越高,则表越多,表多会带来很多问题:

  1. 查询时要连接多个表,增加了查询的复杂度

  2. 查询时需要连接多个表,降低了数据库查询性能

而现在的情况,磁盘空间成本基本可以忽略不计,所以数据冗余所造成的问题也并不是应用数据库范式的理由。

因此,并不是应用的范式越高越好,要看实际情况而定。

参考

  1. 数据库三大范式【面试+工作】
  2. 如何解释关系数据库的第一第二第三范式?
  3. 数据库范式那些事

概述

开发多用户、数据库驱动的应用时,最大的一个难点是:一方面要最大成都地利用数据库的并发访问,另外一方面还要确保每个用户能以一致的方式读取和修改数据,为此就有了锁的机制,同时这也是数据库系统区别于文件系统的一个关键特性。InnoDB存储引擎较之mysql数据库的其它存储引擎在这方面技高一筹。而只有正确的了解这些锁的内部机制才能充分发挥InnoDB存储引擎在这方面的优势。本篇文章会详细介绍InnoDB存储引擎对表中数据的锁定,同时分析InnoDB存储引擎会以怎样的力度锁定数据。

阅读全文 »

概述

  1. 索引基础
  2. 索引主要类型
  3. 索引优化
  4. InnoDB和MyISAM数据分布对比

1. 索引基础

  • 索引,又叫key(键)

  • 在mysql中,存储引擎先在索引中找到检索的内容,然后根据索引结果找到对应的数据行

  • 索引可以包含一个或多个列的值,如果索引包含多个列,那么列的顺序十分重要,因为mysql只能高效的使用索引的最左前缀列最左前缀列就是KEY(id, name, sex)idid、name、sex里面是写在左边的,这就叫最左前缀

    阅读全文 »

在多个事务并发执行时,事务的隔离性不一定能保持。为保持事务的隔离性,系统必须对并发事务之间的相互协作加以控制;这种控制是通过一种称为并发控制来实现。下面将介绍多种机制,但是没有哪种机制明显是最好的。每种机制都有优势。在实践中,最常用的是俩阶段封锁和快照隔离。本篇文章的主要内容就是分别介绍下面几种并发控制机制。

  1. 基于封锁的协议
  2. 基于时间戳的协议
  3. 多版本机制
阅读全文 »

本文主要讲解事务四大特性ACID中的隔离性,这里简单介绍什么是隔离性,接着介绍隔离性级别以及每种隔离性会带来的相应的读的问题,最后简单介绍数据库是如何实现事务的隔离性。

  1. 什么是隔离性
  2. 隔离性级别
  3. 隔离性级别的实现方式简介

什么是隔离性

在数据库系统中会存在多个事务可能并发执行,系统保证,对于任何一对事务T1和T2,在T1看来,T2或者在T1开始之前就已经完成执行,或者在T1完成之后开始执行。因此,每个事务都感觉不到系统中有其他事务在并发的执行。这就是事务的隔离性

隔离性级别

  • 可串行化(serializable):通常保证可串行化调度。
  • 可重复读(repeatable read):只允许读取已提交数据,而且在一个事务俩次读取一个数据项期间,其他事务不得更新该数据。但该事务不要求与其他事务可串行化。
  • 已提交度(read committed):只允许读取已提交数据,但不要求可重复读。比如,在事务俩次读取一个数据项期间,另一个事务更新了该数据并提交。
  • 未提交读(read uncommitted):允许读取未提交数据。

以上所有的隔离级别都不允许脏写:如果一个数据项已经被另外一个尚未提交或终止的事务写入,则不允许对该数据项执行写操作。另外上面四中隔离性界别,从下到上,性能越来越低,并发度越来越差。

下面介绍每种隔离级别带来的问题,

脏读

发生在未提交读隔离级别

脏读又称无效数据的读出,是指在数据库访问中,事务T1将某一值修改,然后事务T2读取该值,此后T1因为某种原因撤销对该值的修改,这就导致了T2所读取到的数据是无效的。

脏读就是指当一个事务正在访问数据,并且对数据进行了修改,而这种修改还没有提交(commit)到数据库中,这时,另外一个事务也访问这个数据,然后使用了这个数据。因为这个数据是还没有提交的数据,那么另外一个事务读到的这个数据是脏数据,依据脏数据所做的操作可能是不正确的。

举例说明:

在下面的例子中,事务2修改了一行,但是没有提交,事务1读了这个没有提交的数据。现在如果事务2回滚了刚才的修改或者做了另外的修改的话,事务1中查到的数据就是不正确的了。

事务1 事务2
SELECT age FROM users WHERE id = 1;
/* will read 20 */
- UPDATE users SET age = 21 WHERE id = 1;
/* No commit here */
SELECT age FROM users WHERE id = 1;
/* will read 21 */
- ROLLBACK;
/* lock-based DIRTY READ */

在这个例子中,事务2回滚后就没有id是1,age是21的数据了。所以,事务一读到了一条脏数据。

不可重复读

发生在已提交读隔离级别
不可重复读,是指在数据库访问中,一个事务范围内两个相同的查询却返回了不同数据。这是由于查询时系统中其他事务修改的提交而引起的。比如事务T1读取某一数据,事务T2读取并修改了该数据,T1为了对读取值进行检验而再次读取该数据,便得到了不同的结果。

一种更易理解的说法是:在一个事务内,多次读同一个数据。在这个事务还没有结束时,另一个事务也访问该同一数据。那么,在第一个事务的两次读数据之间。由于第二个事务的修改,那么第一个事务读到的数据可能不一样,这样就发生了在一个事务内两次读到的数据是不一样的,因此称为不可重复读,即原始读取不可重复。

举例说明:

在基于锁的并发控制中“不可重复读(non-repeatable read)”现象发生在当执行SELECT 操作时没有获得读锁(read locks)或者SELECT操作执行完后马上释放了读锁; 多版本并发控制中当没有要求一个提交冲突的事务回滚也会发生“不可重复读(non-repeatable read)”现象。

事务一 事务二
SELECT * FROM users WHERE id = 1;
- UPDATE users SET age = 21 WHERE id = 1;
COMMIT;
SELECT * FROM users WHERE id = 1;
COMMIT; `

在这个例子中,事务2提交成功,因此他对id为1的行的修改就对其他事务可见了。但是事务1在此前已经从这行读到了另外一个age的值。

幻读

幻读是指当事务不是独立执行时发生的一种现象,例如第一个事务对一个表中的数据进行了修改,比如这种修改涉及到表中的全部数据行。同时,第二个事务也修改这个表中的数据,这种修改是向表中插入一行新数据。那么,以后就会发生操作第一个事务的用户发现表中还有没有修改的数据行,就好象发生了幻觉一样.一般解决幻读的方法是增加范围锁RangeS,锁定检锁范围为只读,这样就避免了幻读。  

幻读(phantom read)”是不可重复读(Non-repeatable reads)的一种特殊场景:当事务没有获取范围锁的情况下执行SELECT … WHERE操作可能会发生“幻影读(phantom read)”。

举例说明:

当事务1两次执行SELECT … WHERE检索一定范围内数据的操作中间,事务2在这个表中创建了(如INSERT)了一行新数据,这条新数据正好满足事务1的“WHERE”子句。

事务一 事务二
SELECT * FROM usersWHERE age BETWEEN 10 AND 30;
- INSERT INTO users VALUES ( 3, 'Bob', 27 );
COMMIT;
SELECT * FROM usersWHERE age BETWEEN 10 AND 30;

在这个例子中,事务一执行了两次相同的查询操作。但是两次操作中间事务二向数据库中增加了一条符合事务一的查询条件的数据,导致幻读。

隔离性级别的实现方式简介

在数据库中使用并发控制机制来保证事务的隔离性,使用并发控制的目的是为了提高事务的并发性,从而提数据库的整体性能。具体的实现有以下几种形式。

一个数据库可以封锁其访问的数据项,而不用封锁整个数据库。这种策略下,事务必须在足够长的时间内持有锁来保证可串行化。但是这一周期又要足够短致使不会过度影响性能。在封锁协议中,俩阶段封锁协议就是一种简单且广泛的确保可串行化的封锁协议:简单的说就是,俩阶段封锁要求一个事务封锁有俩个阶段,一个阶段只获得锁但不释放锁,第二个阶段只释放锁但不获得锁。

当我们有俩种锁,则封锁的结果可以将进一步得到改善:读锁和写锁,

读写锁的概念很平常,当你在读取数据的时候,应该先加读锁,读取完之后的某个时间再解开读锁,那么加了读锁的数据,应该需要有什么特性呢,应该只能读,不能写,因为加了读锁,说明有事务准备读取这个数据,如果被别的事务重写这个数据,那数据就不准确了。所以一个事务给这个数据加了读锁,别的事务也可以对这个数据加读锁,因为大家都是只读不写。

写锁则具有排他性(exclusive lock),当一个事务准备对一个数据进行写操作的时候,先要对数据加写锁,那么数据就是可变的,这时候,其他事务就无法对这个数据加读锁了,除非这个写锁释放。

时间戳

这种是为每一个事务分配一个时间戳,通常当他开始的时候。对于每个数据项,系统维护俩个时间戳。数据项的读时间戳记录读该数据项的事务的最大时间戳。数据项的写时间戳记录写入该数据项的时间戳。时间戳用来确保在访问冲突情况下,事务按照事务的时间戳的顺序来访问数据项。当不能访问时,事务将会终止,并且分配一个新的时间戳重新开始。

多版本和快照隔离

通过维护数据项的多个版本,一个事务允许读取一个旧版本的数据项,而不是被另一个未提交或者在串行化序列中应该排在后面的事务写入的新版本的数据项。有许多的版本控制并发控制技术,其中一种应用比较广泛的就是快照隔离。

在快照隔离中,我们可以想象每个事务开始时有其自身的数据库版本或者快照。他从这个私有的版本中读取数据,因此和其他事务所做的更新隔离开。如果事务更新数据库,更新只出现在其私有版本中,而不是实际的数据库版本中。当事务提交时,和更新有关的信息将被保存,使得更新被写入真正的数据库。当一个事务进入部分提交状态后,只有在没有其他并发事务修改了该事务想要更新的数据项的情况下,事务进入提交状态。而不能提交的事务则终止。

快照隔离可以保证读数据的尝试永远无需等待(不向封锁情况)。只读事务不会中止,只有修改数据的事务有微小的中止风险。由于每个事务读取他自己的数据版本或快照,因此读数据不会导致此后其他事务的更新尝试被迫等待(不想封锁情况)。因为大部分事务是只读的(并且大多数其他事务读数据情况多余更新),所以这是与锁相比往往带来性能改善的主要原因。

总结

本篇文章简单总结了事务不同隔离性级别会带来的问题,以及事务隔离性的实现方式。后面会在写几篇文章来说明事务隔离性实现的具体细节。

参考

  1. 数据库系统概念
  2. 数据库的读现象浅析

从字面意思理解就是信号量,本质上来说是用于线程之间访问共享资源,是一种同步原语,只是访问的资源可能有多个,其实现是通过AQS框架。在我们开发中,经常会碰见使用信号量的场景,比如出于系统性能的考虑需要限流,这时需要控制同时访问共享资源的最大线程数量,或者共享资源是稀缺资源,我们需要有一种办法能够协调各个线程,以保证合理的使用公共资源。
可以看下图来理解
upload successful
有四个线程来共同竞争资源,现在信号量是5,则表明共享资源的数量是5。如果每个线程申请一个资源,则可以同时满足5个线程申请资源,每个线程在使用完之后,需要释放资源。如果在线程在申请资源的时候,没有足够的资源来满足,则会阻塞线程。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package JUC.tools;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**************************************
* Author : zhangke
* Date : 2018/4/20 20:00
* Desc : Semaphore 学习
***************************************/
public class SemaphoreTest1 {

public static void main(String[] args) {
Semaphore sem = new Semaphore(10);
ExecutorService threadPool = Executors.newFixedThreadPool(3);

//在线程池中执行任务
threadPool.execute(new MyThread(sem, 5));
threadPool.execute(new MyThread(sem, 4));
threadPool.execute(new MyThread(sem, 7));

//关闭池
threadPool.shutdown();
}


static class MyThread extends Thread {
private Semaphore sem; //信号量
private int count; //申请信号量的大小


public MyThread(Semaphore sem, int count) {
this.sem = sem;
this.count = count;
}


@Override
public void run() {

try {
//从信号量中获取count个许可
sem.acquire(count);
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName()
+ " acquire count=" + count);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放给定数目的许可,将其返回到信号量。
sem.release(count);
System.out.println(Thread.currentThread().getName()
+ " release " + count + "");
}
}
}
}

上面演示了基本的信号量使用机制,当有线程尝试使用共享资源时,我们要求线程先获得许可(调用Semaphoreacquire方法),这样线程就拥有了权限,否则就需要等待。当使用完资源后,线程需要调用Semaphorerelease方法释放许可。

运行结果如下

1
2
3
4
5
6
pool-1-thread-2 acquire count=4
pool-1-thread-1 acquire count=5
pool-1-thread-1 release 5
pool-1-thread-2 release 4
pool-1-thread-3 acquire count=7
pool-1-thread-3 release 7

从结果可以看出,这有点类似于共享锁,锁的获取可以不用等待锁的释放。但必须满足下面的条件许可数 ≤ 0代表共享资源不可用。许可数 > 0,代表共享资源可用,且多个线程可以同时访问共享资源。

源码分析

类图如下:

upload successful

  1. Semaphore也包含sync对象,sync是Sync类型;而且,Sync是一个继承于AQS的抽象类。
  2. Sync包括两个子类:”公平信号量”FairSync 和 “非公平信号量”NonfairSync。sync是”FairSync的实例”,或者”NonfairSync的实例”;默认情况下,sync是NonfairSync(即,默认是非公平信号量)。

构造函数

1
2
3
4
5
6
7
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

从中,我们可以信号量分为公平信号量(FairSync)和非公平信号量(NonfairSync)。Semaphore(int permits)函数会默认创建非公平信号量。permits表示许可数,可以理解为资源可以被共享的数量。

公平信号量的获取

获取信号量的源码如下:

1
2
3
4
5
6
7
8
9
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public void acquire(int permits) throws InterruptedException {
if (permits < 0)
throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}

从上面可以看出,内部是同过Sync对象的acquireSharedInterruptibly方法来获取,源码如下

1
2
3
4
5
6
7
8
9
10
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 如果线程是中断状态,则抛出异常。
if (Thread.interrupted())
throw new InterruptedException();
// 否则,尝试获取“共享锁”;获取成功则直接返回,
// 获取失败,则通过doAcquireSharedInterruptibly()获取。
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

tryAcquireShared对应公平锁的源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected int tryAcquireShared(int acquires) {
for (;;) {
// 判断当前线程是不是CLH队列中的第一个线程线程,
// 若是的话,则返回-1。
if (hasQueuedPredecessors())
return -1;
// 设置可以获得的信号量的许可数
int available = getState();
// 设置获得acquires个信号量许可之后,剩余的信号量许可数
int remaining = available - acquires;
// 如果剩余的信号量许可数>=0,则设置可以获得的信号量许可数为remaining。
// 设置成功则返回remaining
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

tryAcquireShared()的作用是尝试获取acquires个信号量许可数。对于Semaphore而言,state表示的是当前可获得的信号量许可数。

下面看看AQS中doAcquireSharedInterruptibly的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void doAcquireSharedInterruptibly(long arg)
throws InterruptedException {
// 创建当前线程的Node节点,且Node中记录的锁是共享锁类型;
// 并将该节点添加到CLH队列末尾。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取上一个节点。
// 如果上一节点是CLH队列的表头,则”尝试获取共享锁“。
final Node p = node.predecessor();
if (p == head) {
long r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 当前线程一直等待,直到获取到共享锁。
// 如果线程在等待过程中被中断过,则再次中断该线程(还原之前的中断状态)。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

doAcquireSharedInterruptibly()会使当前线程一直等待,直到当前线程获取到共享锁(或被中断)才返回。主要流程如下:

  1. addWaiter(Node.SHARED)的作用是,创建当前线程的Node节点,且Node中记录的锁的类型是共享锁(Node.SHARED);并将该节点添加到CLH队列末尾。
  2. node.predecessor()的作用是,获取上一个节点。如果上一节点是CLH队列的表头,则尝试获取共享锁。
  3. shouldParkAfterFailedAcquire()的作用和它的名称一样,如果在尝试获取锁失败之后,线程应该等待,则返回true;否则,返回false。当shouldParkAfterFailedAcquire()返回ture时,则调用parkAndCheckInterrupt(),当前线程会进入等待状态,直到获取到共享锁才继续运行。如果检测到时中断导致的返回,则抛出异常。

上面的函数在前面几篇文章中都已经介绍过,这里就不在重复讲,如果不理解可以看这几篇文章JUC 锁介绍

公平信号量的释放

1
2
3
4
5
6
7
8
public void release() {
sync.releaseShared(1);
}
public void release(int permits) {
if (permits < 0)
throw new IllegalArgumentException();
sync.releaseShared(permits);
}

信号量的释放是通过releases()释放函数,实际上调用的AQS中的releaseShared()

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

releaseShared()的目的是让当前线程释放它所持有的共享锁。它首先会通过tryReleaseShared()去尝试释放共享锁。尝试成功,则直接返回;尝试失败,则通过doReleaseShared()去释放共享锁。
Semaphore重写了tryReleaseShared(),它的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 获取“可以获得的信号量的许可数”
int current = getState();
// 获取“释放releases个信号量许可之后,剩余的信号量许可数”
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 设置“可以获得的信号量的许可数”为next。
if (compareAndSetState(current, next))
return true;
}
}

如果tryReleaseShared()尝试释放共享锁失败,则会调用doReleaseShared()去释放共享锁。doReleaseShared()的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void doReleaseShared() {
for (;;) {
// 获取CLH队列的头节点
Node h = head;
// 如果头节点不为null,并且头节点不等于tail节点。
if (h != null && h != tail) {
// 获取头节点对应的线程的状态
int ws = h.waitStatus;
// 如果头节点对应的线程是SIGNAL状态,则意味着“头节点的下一个节点所对应的线程”需要被unpark唤醒。
if (ws == Node.SIGNAL) {
// 设置“头节点对应的线程状态”为空状态。失败的话,则继续循环。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒“头节点的下一个节点所对应的线程”。
unparkSuccessor(h);
}
// 如果头节点对应的线程是空状态,则设置“节点对应的线程所拥有的共享锁”为其它线程获取锁的空状态。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果头节点发生变化,则继续循环。否则,退出循环。
if (h == head) // loop if head changed
break;
}
}

doReleaseShared()会释放共享锁。它会从前往后的遍历CLH队列,依次唤醒然后执行队列中每个节点对应的线程;最终的目的是让这些线程释放它们所持有的信号量。

非公平信号量获取和释放

Semaphore中的非公平信号量是NonFairSync。在Semaphore中,非公平信号量许可的释放(release)与公平信号量许可的释放(release)是一样的。
不同的是它们获取信号量许可的机制不同,下面是非公平信号量获取信号量许可的代码。

非公平信号量的tryAcquireShared()实现如下:

1
2
3
4

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}

nonfairTryAcquireShared()的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 设置“可以获得的信号量的许可数”
int available = getState();
// 设置“获得acquires个信号量许可之后,剩余的信号量许可数”
int remaining = available - acquires;
// 如果“剩余的信号量许可数>=0”,则设置“可以获得的信号量许可数”为remaining。
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

非公平信号量的tryAcquireShared()调用AQS中的nonfairTryAcquireShared()。而在nonfairTryAcquireShared()的for循环中,它都会直接判断当前剩余的信号量许可数是否足够;足够的话,则直接设置可以获得的信号量许可数,进而再获取信号量。
而公平信号量的tryAcquireShared()中,在获取信号量之前会通过if (hasQueuedPredecessors())来判断当前线程是不是在CLH队列的头部,是的话,则返回-1。

总结

Semaphore其实就是实现了AQS共享功能的同步器,对于Semaphore来说,资源就是许可证的数量:

  • 剩余许可证数(State值) - 尝试获取的许可数(acquire方法入参) ≥ 0:资源可用
  • 剩余许可证数(State值) - 尝试获取的许可数(acquire方法入参) < 0:资源不可用
    这里共享的含义是多个线程可以同时获取资源,当计算出的剩余资源不足时,线程就会阻塞。
    注意:Semaphore不是锁,只能限制同时访问资源的线程数,至于对数据一致性的控制,Semaphore是不关心的。当前,如果是只有一个许可的Semaphore,可以当作锁使用。

字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。

可以看下面这个图来理解下:
一共4个线程A、B、C、D,它们到达栅栏的顺序可能各不相同。当A、B、C到达栅栏后,由于没有满足总数4的要求,所以会一直等待,当线程D到达后,栅栏才会放行。

upload successful

阅读全文 »

正如每个Java文档所描述的那样,CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。在Java并发中,countdownlatch的概念是一个常见的面试题,所以一定要确保你很好的理解了它。

CountDownLatch是什么

CountDownLatch是在java1.5被引入的,跟它一起被引入的并发工具类还有CyclicBarrier和Semaphore,它们都存在于java.util.concurrent包下,后面会讲解另外俩个。CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。

CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

upload successful

如上图:TA主线程会一直等待,等待T1、T2和T3将计数器减为0,才继续执行。

阅读全文 »

ReentrantReadWriteLock 简介

在前面我们已经分析过JUC中的独占锁:ReentrantLock。本篇文章将对JUC的读写锁ReentrantReadWriteLock进行介绍。

类图如如下:

类图

从上图可以看出ReentrantReadWriteLock实现了ReadWriteLock接口,而这个接口从名字就可以看出是读写锁。它维护了一对相关连的锁:读锁和写锁。作用如下

  • 读锁:用于只读操作,不会修改共享数据。是共享锁,能够同时被多个线程锁获取。
  • 写锁:用于写入操作,是独占锁,只能被一个线程锁获取。

而这个接口提供了俩个抽象函数,获取读锁的readLock()函数和获取写锁的writeLock()函数。

ReentrantReadWriteLock中包含:Sync对象,读锁ReadLock和写锁WriteLock。

读锁ReadLock和写锁WriteLock都实现了Lock接口。读锁ReadLock和写锁WriteLock中也都分别包含了相同的Sync对象,里面所有的功能实现也都是靠这个对象。它们的Sync对象和ReentrantReadWriteLock的Sync对象是一样,就是通过sync,读锁和写锁实现了对同一个对象的访问。

和ReentrantLock一样,Sync也是一个继承于AQS的抽象类。Sync也包括公平锁FairSync和非公平锁NonfairSync。在创建读写锁时可以选择其中俩个其中一个,默认是NonfairSync。

公平读写锁源码分析

这里我们先对公平锁方式实现的读写锁进行源码分析,首先把后面要用到的属性在这里写出来,方便后面源码的理解:

1
2
3
4
5
6
7
8
9

// 内部使用的读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
// 内部使用的写锁
private final ReentrantReadWriteLock.WriteLock writerLock;

// 读锁和写锁共同使用的锁类型,可以是公平锁和非公平锁
final Sync sync;

这里先看看构造函数和如何获取读锁和写锁

1
2
3
4
5
6
7
8
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }

从上面可以看出,在创建ReentrantReadWriteLock对象时就会根据是否选择公平锁来创建一个sync锁对象。然后分别创建响应的读锁和写锁。后面获取和使用的读写锁都是在构造函数中创建出来的。

下面开始首先对读锁的获取和释放进行分析。

读锁的获取(公平锁篇)

读锁也就是共享锁,获取锁的源码如下:

1
2
3
4
5
6
7
8
9
10
11
// ReadLock 类中
public void lock() {
// 获取共享锁
sync.acquireShared(1);
}

//AQS 类中
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

从上面可以看出,这里调用的是AQS类中的acquireShared来获取锁。参数和ReentrantLock一样,表示获取锁的数量,1表示当前获取一把共享锁。锁的状态也会加1.

acquireShared()首先会通过tryAcquireShared()来尝试获取锁。尝试成功的话,直接返回。尝试失败的话,则通过doAcquireShared()来获取锁。doAcquireShared()会获取到锁才返回。

tryAcquireShared

尝试获取共享锁,此函数定义在Sync类中,源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
protected final int tryAcquireShared(int unused) {

Thread current = Thread.currentThread();
// 获取锁的状态
int c = getState();
// 如果锁被独占锁获取并且获取独占锁的线程不是当前线程,
// 直接返回-1 达标获取锁失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取读锁的共享计数
int r = sharedCount(c);
// 判断不需要阻塞,并且已经获取读锁的数量小于MAX_COUNT
// 则通过CAS函数更新读锁的状态,将读锁的共享计数加1
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 第一次获取读锁
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;

} else if (firstReader == current) {
// 如果当前获取锁的线程是第一个获取读锁的线程
firstReaderHoldCount++;
} else {
// HoldCounter是用来统计该线程获取“读取锁”的次数。
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
// 将该线程获取“读取锁”的次数+1。
rh.count++;
}
return 1;
}
// 如果获取读锁失败,则通过下面函数来进行获取读锁
return fullTryAcquireShared(current);
}

上面流程比较清晰,但是有很多地点可能看不明白,先跳过,看完后面所有的分析,你就会明白。先总结上面的流程。

  1. 判断当前锁是否是独占锁,如果是并且独占锁的线程和当前获取锁的线程不相同,则直接返回-1,获取读锁失败。
  2. 如果当前线程不应该被阻塞,并且已获取读锁的数量小于最大值,则尝试使用CAS更改读锁的状态值。如果操作成功,进行下一步,操作失败进入最后一步。
  3. 这一步主要设置每一个线程获取读锁的数量,主要分为三类来讨论:
    1. 如果是第一个线程来获取读锁,则设置firstReader为当前线程和当前线程拥有的读锁数量为1.
    2. 如果不是,则判断当前线程和firstReader线程是否一样,如果一样,则当前线程获取读锁的数量加1
    3. 以上都不是,则通过HoldCounter来对当前线程获取读锁的数量加1,而HoldCounter是一个ThreadLocal对象。保证每个线程都有一个不一样的HoldCounter变量。下面会详细解释
  4. 如果上面没有成功获取到读锁,但也没有返回。则通过fullTryAcquireShared来获取锁

下面对上面每一步使用到的函数进行详细的解释。

计算读锁和写锁已被获取的数量

1
2
3
4
5
6
7
8
9
static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

// 返回共享锁的数量
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 返回独占说的数量
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

从上面可以看出,读锁使用state的高16位来表示数量,而写锁则使用低16位来表示数量。然后通过后面的俩个函数来分别计算对应的数量。

readerShouldBlock

判断当前获取读锁的线程是否应该阻塞,源码在FairSync中,源码如下

1
2
3
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}

代码比较简单,就是判断当前线程是否是队列中的第一个节点,如果是,则不需要阻塞,不是则需要阻塞。具体的和前面ReentrantLock中的一样,这里具体分析。

HoldCounter

计算每个线程获取读锁的数量,这里HoldCounter是ThreaLocal类型的变量,如果不了解这个对象,可以看这篇文章深入分析ThreadLocal,在分析这个之前,首先看一些定义在Sync类中的属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 计数器对象,用于记录每个线程保持读锁的数量
// 这个对象被记录在ThreadLocal中,缓存在cachedHoldCounter
static final class HoldCounter {
int count = 0;
final long tid = getThreadId(Thread.currentThread());
}

// 自定的ThreadLocal对象,设置初始化方法
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {

public HoldCounter initialValue() {
return new HoldCounter();
}
}
// 记录当前线程获取读锁的数量,在构造器中初始化,当保持的读锁数量为空的时候删除
private transient ThreadLocalHoldCounter readHolds;

// 用于记录上一个线程成功获取读锁的数量
private transient HoldCounter cachedHoldCounter;

// 下面来个一个是记录第一个获取读锁的线程和获取读锁的数量
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;

有了上面的预备知识,下面可以解释tryAcquireShared中的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 第一次获取读锁
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;

} else if (firstReader == current) {
// 如果当前获取锁的线程是第一个获取读锁的线程
firstReaderHoldCount++;
} else {
// HoldCounter是用来统计该线程获取“读取锁”的次数。
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
// 将该线程获取“读取锁”的次数+1。
rh.count++;
}
  1. 首先判断是否是第一个线程获取读锁,如果是,则设置firstReader和firstReaderHoldCount值,可以加快后续此线程的获取读锁和记录读锁的数量。
  2. 判断线程是否是firstReader,如果是直接使用firstReaderHoldCount进行累加,可以加快获取的速度。
  3. 前面俩个都不是,则获取cachedHoldCounter,判断这个变量中保存的线程id是否和当前线程对应的id相同,如果是,则判断当前读锁的数量是否为0,如果为0,则调用 readHolds.set(rh)初始化这个对象然后在原有的读锁数量上加1。
  4. 不是则通过readHold获取当前线程对应的HoldCounter,并缓存在cachedHoldCounter中,加速下一次的操作,接着读锁数量加1。

fullTryAcquireShared

这个是tryAcquireShared的最后一步,也就是前面没有获取到共享锁,才会走到这一步,源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
final int fullTryAcquireShared(Thread current) {
// 下面这部分代码和tryAcquireShared有一部分是重复的。
// 但是tryAcquireShared只是先尝试获取,但是如果出现竞争则获取
// 不到共享锁,即前面那部分加快锁的获取。下面这部分通过
// 循环尝试,保证如果可以获取读锁,则一定获取到
HoldCounter rh = null;
for (;;) {
// 获取锁的状态
int c = getState();
// 如果是独占锁,并且获取锁的线程不是current线程;则返回-1。
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;

// 如果需要阻塞等待。
// 当需要阻塞等待的线程是第1个获取锁的线程的话,则继续往下执行。
// 当需要阻塞等待的线程获取锁的次数为0时,则返回-1。
} else if (readerShouldBlock()) {
// 确保不是有一次获取读锁
if (firstReader == current) {
//忽略
} else {
// 获取当前线程获取读锁的数量,如果为0,则调用
// ThreadLocal.remove删除这个ThreadLocal
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
// 如果当前线程获取读锁的计数=0,则返回-1。
// 表示还没有获取过读锁,不在这里进行获取,
// 则需要阻塞获取读锁的进程
if (rh.count == 0)
return -1;
}
}
// 则获取读取锁的共享统计数;
// 如果共享统计数超过MAX_COUNT,则抛出异常。
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 将线程的读取锁次数加1
// 这一步和上面一样,就不具体解释
// 放在这里主要是因为CAS失败,如果失败,进入下一次循环
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

fullTryAcquireShared()会根据是否需要阻塞等待,读取锁的共享计数是否超过限制进行处理。如果不需要阻塞等待,并且锁的共享计数没有超过限制,则通过CAS尝试获取锁,并返回1。

至此tryAcquireShared已经解析完成,这里做一个总结:tryAcquireShared将代码分成俩个大部分

  1. 首先通过尝试获取锁,如果获取成功直接返回。这是为了加快获取锁。
  2. 如果没有获取成功,说明CAS失败则进入fullTryAcquireShared函数进行获取,这里会循环获取,直到CAS交换成功。

当然我只是说了一个精简的过程。具体的可以看上面。其他异常情况我也没有总结。

doAcquireShared

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private void doAcquireShared(int arg) {
// 创建当前线程对应的节点,并将该线程添加到CLH队列中。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取前继节点
final Node p = node.predecessor();
// 如果当前节点是头结点,尝试获取锁
if (p == head) {
int r = tryAcquireShared(arg);
// 获取成功设置节点为可传播状态,
// 然后释放后继获取读锁的节点
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 如果当前线程不是CLH队列的表头,
// 则通过shouldParkAfterFailedAcquire()判断是否需要等待,
// 需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。
// 若阻塞等待过程中,线程被中断过,则设置interrupted为true。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 上面出现异常,则取消当前节点
if (failed)
cancelAcquire(node);
}
}

doAcquireShared()的作用是获取共享锁,流程如下

  1. 创建线程对应的CLH队列的节点,然后将该节点添加到CLH队列中。CLH队列是管理获取锁的等待线程的队列。
  2. 获取前继节点,判断当前节点是否是表头,如果当前线程是CLH队列的表头,则尝试获取共享锁;如果获取成功,则释放后继等待获取获取共享锁的线程。然后判断是否中断过,如果产生过中断,则调动中断函数产生一次中断。
  3. 上一步没有成功获取锁,需要通过shouldParkAfterFailedAcquire()判断是否阻塞等待,需要阻塞,则通过parkAndCheckInterrupt()进行阻塞等待。

doAcquireShared()会通过for循环,不断的进行上面的操作;目的就是获取共享锁。需要注意的是:doAcquireShared()在每一次尝试获取锁时,是通过tryAcquireShared()来执行的!

其实和前面获取独占锁的流程差不多,只不过这里会有一个释放后继获取共享锁的节点。这一步放到下面讲解共享锁的释放中来说。

读锁的释放(公平锁)

释放锁是调用下面的函数,源码如下:

1
2
3
4
5
6
7
8
9
10
11
public void unlock() {
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

上面的过程比较简单,先通过tryReleaseShared释放共享锁,尝试失败则直接返回;如果释放成功,则通过doReleaseShared()去释放共享锁并唤醒后继节点。

tryReleaseShared

tryReleaseShared()定义在ReentrantReadWriteLock中,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
protected final boolean tryReleaseShared(int unused) {

// 获取当前线程,即释放共享锁的线程。
Thread current = Thread.currentThread();

// 如果想要释放锁的线程(current)是第1个获取锁(firstReader)的线程,
// 并且第1个获取锁的线程获取锁的次数=1,则设置firstReader为null;
// 否则,将第1个获取锁的线程的获取次数-1。
if (firstReader == current) {
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
// 获取rh对象,并更新当前线程获取锁的信息。
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
// 获取锁的状态
int c = getState();
// 将锁的获取次数-1。
int nextc = c - SHARED_UNIT;
// 通过CAS更新锁的状态。通过判断锁的状态是否为0来判断锁是否可以释放。
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

主要流程如下:

  1. 判断是否是当前线程,如果是,则将当前线程持有的读锁数量减1,
  2. 如果不是,则判断是否是缓存的线程,如果是,将读锁数量减1
  3. 以上都不是,则获取ThreadLocal,并将数量减1
  4. 循环CAS将state值减1,如果变成0,则说明释放锁成功

下面来看doReleaseShared

doReleaseShared

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void doReleaseShared() {
for (;;) {
// 获取CLH队列的头节点
Node h = head;

// 如果头节点不为null,并且头节点不等于tail节点。
if (h != null && h != tail) {
// 获取头节点对应的线程的状态
int ws = h.waitStatus;

// 如果头节点对应的线程是SIGNAL状态,
// 则意味着头节点的下一个节点所对应的线程需要被unpark唤醒。
if (ws == Node.SIGNAL) {
// 设置头节点对应的线程状态为空状态。失败的话,则继续循环。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒头节点的下一个节点所对应的线程。
unparkSuccessor(h);
}
// 如果头节点对应的线程的状态值是0,
// 则设置头结点的状态为PROPAGATE状态,等待有线程来获取锁才会结束循环。
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果头节点没有改变,则继续循环。否则,退出循环。
if (h == head) // loop if head changed
break;
}
}

doReleaseShared()会释放共享锁:流程如下:

  1. 判断队列是否为空,如果为空则结束循环,不进行唤醒。
  2. 如果不为空,则判断头结点是否为SIGNAL状态吗,如果是,则设置状态为0,然后唤醒后继获取锁的节点可以是独占或者共享锁。如果唤醒成功,头结点会改变,这一在最后一步就会推出这个循环
  3. 如果头结点状态为0,则设置状态为PROPAGATE,然后继续循环。
  4. 如果头结点发生改变,则继续循环。

主要流程如上,但是为什么要一直循环这是我不明白的地点。

公平共享锁和非公平共享锁

和互斥锁ReentrantLock一样,ReadLock也分为公平锁和非公平锁。

公平锁和非公平锁的区别,体现在判断是否需要阻塞的函数readerShouldBlock()的不同。
公平锁的readerShouldBlock()的源码如下:

1
2
3
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}

在公平共享锁中,如果在当前线程的前面有其他线程在等待获取共享锁,则返回true;否则,返回false。
非公平锁的readerShouldBlock()的源码如下:

1
2
3
4
5
6
7
8
9
10
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}

在非公平共享锁中,它会无视当前线程的前面是否有其他线程在等待获取共享锁。只要该非公平共享锁对应的线程不为null,则返回true。也就是当前锁的类型是共享锁,并且还没有释放。

写锁

写锁的获取和ReentrantLock中独占锁的获取是一样的,这里就不在单独说明。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package JUC.locks;

import lombok.Data;
import lombok.Getter;
import lombok.Setter;

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**************************************
* Author : zhangke
* Date : 2018/4/18 22:13
* Desc : 读写锁
***************************************/
public class ReadWriteLockTest {

public static void main(String[] args) {
// 创建账户
MyCount myCount = new MyCount("4238920615242830", 10000);

// 创建用户,并指定账户
User user = new User("Tommy", myCount);

// 分别启动3个“读取账户金钱”的线程 和 3个“设置账户金钱”的线程
for (int i = 0; i < 3; i++) {
user.getCash();
user.setCash((i + 1) * 1000);
}
}


static class User {
private String name; //用户名

private MyCount myCount; //所要操作的账户

private ReadWriteLock myLock; //执行操作所需的锁对象


public User(String name, MyCount myCount) {
this.name = name;
this.myCount = myCount;
this.myLock = new ReentrantReadWriteLock();

}


public void getCash() {
new Thread(() -> {
try {
myLock.readLock().lock();
System.out.println(Thread.currentThread().getName() + " getCash start");
myCount.getCash();
Thread.sleep(1);
System.out.println(Thread.currentThread().getName() + " getCash end");
} catch (InterruptedException e) {

} finally {
myLock.readLock().unlock();
}
}).start();
}


public void setCash(final int cash) {
new Thread(() -> {
try {
Thread.sleep(100);
myLock.writeLock().lock();
System.out.println(Thread.currentThread().getName()
+ " setCash start");
myCount.setCash(cash);
System.out.println(Thread.currentThread().getName()
+ " setCash end");
} catch (InterruptedException e) {

} finally {
myLock.writeLock().unlock();
}
}).start();
}
}

static class MyCount {

@Getter
@Setter
private String id; //账户id

private int cash; //现金


public MyCount(String id, int cash) {
this.id = id;
this.cash = cash;
}


public int getCash() {
System.out.println(Thread.currentThread().getName() + " getCash" +
" cash= " + cash);
return cash;
}


public void setCash(int cash) {
System.out.println(Thread.currentThread().getName() + " setCash" +
" cash= " + cash);
this.cash = cash;
}
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Thread-0 getCash start
Thread-0 getCash cash= 10000
Thread-0 getCash end
Thread-2 getCash start
Thread-2 getCash cash= 10000
Thread-4 getCash start
Thread-4 getCash cash= 10000
Thread-2 getCash end
Thread-4 getCash end
Thread-5 setCash start
Thread-5 setCash cash= 3000
Thread-5 setCash end
Thread-3 setCash start
Thread-3 setCash cash= 2000
Thread-3 setCash end
Thread-1 setCash start
Thread-1 setCash cash= 1000
Thread-1 setCash end

从上面可以观察到读锁是可以共享,也就是读锁的打印的语句不一定是start-end连着的。但是写锁一定是。

总结

获取锁的流程:

  1. 判断当前锁是否是独占锁,如果是并且独占锁的线程和当前获取锁的线程不相同,则直接返回-1,获取读锁失败。
  2. 如果当前线程不应该被阻塞,并且已获取读锁的数量小于最大值,则尝试使用CAS更改读锁的状态值。如果操作成功,将state加1,表示有多少个线程获取过锁,进行下一步,操作失败进入第四步。
  3. 这一步主要设置每一个线程获取读锁的数量,主要分为三类来讨论:
    1. 如果是第一个线程来获取读锁,则设置firstReader为当前线程和当前线程拥有的读锁数量为1.
    2. 如果不是,则判断当前线程和firstReader线程是否一样,如果一样,则当前线程获取读锁的数量加1
    3. 以上都不是,则通过HoldCounter来对当前线程获取读锁的数量加1,而HoldCounter是一个ThreadLocal对象。保证每个线程都有一个不一样的HoldCounter变量。下面会详细解释
  4. 如果上面没有成功获取到读锁,但也没有返回。则通过fullTryAcquireShared来获取锁,就是将第三步包装成一个循环来进行获取。
  5. 如果上面都没有获取到,则通过doAcquireShared进行获取,如果获取失败则阻塞当前线程等待唤醒

释放锁的流程

  1. 判断是否是当前线程,如果是,则将当前线程持有的读锁数量减1,
  2. 如果不是,则判断是否是缓存的线程,如果是,将读锁数量减1
  3. 以上都不是,则获取ThreadLocal,并将数量减1
  4. 循环CAS将state值减1,如果变成0,则说明释放锁成功
  5. 如果释放锁成功,则走下面的流程,如果失败,直接返回false
  6. 判断队列是否为空,如果为空则继续循环。
  7. 如果不为空,则判断头结点是否为SIGNAL状态吗,如果是,则设置状态为0,然后唤醒后继获取锁的节点可以是独占或者共享锁。如果唤醒成功,头结点会改变,这一在最后一步就会推出这个循环
  8. 如果头结点状态为0,则设置状态为PROPAGATE,然后继续循环。
  9. 如果头结点发生改变,则继续循环。

参考

  1. Java多线程系列–“JUC锁”08之 共享锁和ReentrantReadWriteLock

概述

  1. 用法简介
  2. 源码分析
  3. 底层实现原理

1. 用法简介

LockSupport是用来创建锁和其他同步器的基本线程阻塞原语。LockSupport提供park()和unpark()方法实现阻塞线程和解除线程阻塞。每个使用LockSupport的线程都与一个许可(permit)关联,permit相当于开关,默认是0,调用一次unpark就加1变成1,调用一次park会消费permit, 也就是将1变成0,同时park立即返回。再次调用park会变成block(因为permit为0,会阻塞在这里,直到permit变为1), 这时调用unpark会把permit置为1。每个线程都有一个相关的permit, permit最多只有一个,重复调用unpark也不会积累。

park()和unpark()不会有Thread.suspend和Thread.resume所可能引发的死锁问题。这个死锁问题的产生是由于Thread.resume在Thread.suspend之前调用,使得线程忽略了解除阻塞的信号,而使得线程一直被阻塞。而LockSupport由于许可的存在,调用park的线程和另一个试图将其unpark的线程之间的竞争将保持活性。不会因为前后调用的顺序而产生死锁

如果调用线程被中断,则park方法会返回。同时park也拥有可以设置超时时间的版本。

阅读全文 »