开篇语
怎么确保程序有序的运转,锁是永久都绕不开的一条绊脚石。即能够让程序有条有理的运转,也能够让程序步履蹒跚。而今天,咱们就来剥开 Java 中最常用的锁 ReentrantLock,研讨它,参悟它。
文章结构
- 介绍 ReentrantLock 的简略运用,并快速学习 ReentrantLock 怎么运用 AQS 完结同步器
- 具体分析 CLH
- AQS 怎么改造 CLH
- 介绍 CAS
- AQS 内部数据结构
- AQS 源码解析
- AQS 细节深入讨论
ReentrantLock 运用与解析
简略运用
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockTest {
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();
// 加锁
reentrantLock.lock();
// 同步代码块
System.out.println("获取锁成功");
// 开释锁
reentrantLock.unlock();
}
}
ReentrantLock 的运用十分简略,只需求创立一个 ReentrantLock 目标,获取锁调用 lock()
办法,开释锁调用 unlock()
办法。
与 Synchronized 在运用上的不同点:
- Synchronized 自动开释锁,ReentrantLock 手动开释锁
- Synchronized 能够运用于办法和代码块,ReentrantLock 只能运用于代码块
解析
ReentrantLock 并不直接承继 AbstractQueuedSynchronizer 类,而是界说了一个子类 Sync 去承继 AbstractQueuedSynchronizer,让子类去完结同步器。
ReentrantLock 完结了 Lock 接口,该接口界说了一个锁需求完结的办法。ReentrantLock 完结 Lock 接口中的办法都是经过 Sync 类中的办法来完结。
ReentrantLock 内部运用了一个笼统内部类 Sync 承继了 AbstractQueuedSynchronizer 类。NonfairSync 类承继 Sync,用于供给非公正锁的功用;FairSync 类承继 Sync,用于供给公正锁的功用。
public class ReentrantLock implements Lock, Serializable {
// 同步器
private final Sync sync;
// 内部笼统类
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
/**
* 完结非公正锁的类
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* 完结公正锁的类
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
ReentrantLock 完结了 lock()
办法,露出给外部获取锁;完结了 unlock()
办法,露出给外部开释锁。
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
咱们能够看到这两个办法实际上都是调用了 Sync 类的办法,因而 ReentrantLock 自身并没有完结同步器的功用,而是交给内部类 Sync 类完结。ReentrantLock 经过调用 Sync 供给的同步办法完结了同步器的功用。
CAS
CAS 是 compareAndSwap 的缩写,中文译为:比较和交流。是现代 CPU 广泛支撑的一种对内存中的同享数据进行操作的一种特殊指令,这个指令会对内存中的同享数据做原子读写操作。其效果是让CPU 比较内存中某个值是否和预期的值相同,假设相同则将这个值更新为新值,不相同则不做更新,以此达到同步的效果。
CAS 在 Java 中的运用十分多, java.util.concurrent.atomic 这个包下的类都是运用 CAS 来完结的。
示例
内存地址 | 0x01 | 0x02 | 0x03 |
---|---|---|---|
值 | 1 | 2 | 3 |
咱们假设在内存 0x01 处寄存 1,0x02 处寄存 2,0x03 处寄存 3。现在咱们有一个线程 A,它要运用 CAS 操作修正 0x01 处的值为 4。此刻,它只需求这样操作即可。
给 CAS 函数供给三个参数:内存地址,内存地址中寄存的原始值,内存地址需求寄存的新值。函数原型 cas(*address, oldValue, newValue)
。
更新成功
线程 A 履行 cas(0x01, 1, 4)
,系统会先取出内存地址为 0x01 的值,取出值为1,与传入的旧值 1 相同,此刻就会将 4 写入到内存地址 0x01 处,内存变为:
内存地址 | 0x01 | 0x02 | 0x03 |
---|---|---|---|
值 | 4 | 2 | 3 |
更新失利
假设在线程 A 履行cas(0x01, 1, 4)
时,存在别的一个线程 B,它先于线程 A 修正了 0x01 处的值,它将值改为 5。此刻内存为
内存地址 | 0x01 | 0x02 | 0x03 |
---|---|---|---|
值 | 5 | 2 | 3 |
然后线程 A 持续履行,它取出 0x01 处的值为 5,传入的旧值为 1,1 != 5,此次 cas 操作失利,不会将新值写入到 0x01 中。
ABA 问题
咱们现在来看别的一种特殊状况。
线程 A 履行cas(0x01, 1, 4)
时,线程 A 被挂起。别的一个线程 B,它修正 0x01 处的值,它将值改为 5;此刻又有一个线程 C,它履行 cas(0x01, 5, 1)
,将 0x01 处的值又修正为 1。
内存值变化进程如图所示:
此刻,线程 A 持续履行,发现内存值与传入的旧值相同,因而将值替换为 4。
这便是闻名的 ABA 问题。当一个线程修正了值后,又有另一个线程将值修正回来,这样关于其他线程来说,这个值就相当于没有修正过一样。假设要处理这个问题,咱们能够给值加一个版本号 version。这个 version 代表修正的次数,每一次修正都会将版本号加1,用于奉告其他线程,这个值现已修正过了。此刻,cas 要成功更新,则需求值和版本号都相等的状况下,才干更新。
小结
CAS 由 CPU 供给的指令完结,运用简略,无需频频切换上下文,提高功率。
可是有以下几个缺点:
- 只能确保一个同享变量的读写。
- ABA问题。
- 假设自旋 CAS 长期地不成功,则会给 CPU 带来十分大的开支。
CLH
自旋锁
在介绍 CLH 之前,咱们先来了解一个常常呈现在咱们视野中的锁——自旋锁。望文生义,它在获取不到锁的时分,会循环的测验获取锁,直到获取到锁停止。
import java.util.concurrent.atomic.AtomicReference;
public class SpinLock {
private AtomicReference<Thread> reference = new AtomicReference<>();
public void lock(){
Thread thread = Thread.currentThread();
// 运用 CAS 设置线程,只需 CAS 成功才会获取锁,不然循环履行
while (!reference.compareAndSet(null, thread)){
}
}
public void unlock(){
Thread thread = Thread.currentThread();
// 只需获取到锁的线程才干解锁
reference.compareAndSet(thread, null);
}
}
上面是运用 Java 完结的一个简易自旋锁,它只运用了最简略的 CAS 操作即可完结。当 reference 引证的值为 null 时,表明还没有任何一个线程获取锁,此刻经过 compareAndSet()
办法设置当时线程,假设成功,则表明获取锁成功,退出循环;不然,持续循环,直到 compareAndSet()
办法回来 true停止。
咱们能够看到自旋锁完结简略,而且不会产生内核态和用户态的切换。可是,它却有几个缺点
- 锁饥饿。当锁竞赛剧烈的状况下,会产生一个线程从始至终
compareAndSet()
都无法设置成功,导致一向获取不到锁。 - 功用低。当锁竞赛剧烈的状况下,由于只需一个线程能够获取到锁,因而其他线程都将处于空转的状况。当持有锁时刻越长,空转时刻就越长。而且,由于他们都只拜访同一个同享变量,当同享变量改动时,将导致一切线程的高速缓存都失效。
所以,自旋锁适用于锁竞赛不剧烈,锁持有时刻短的场景。
关于高速缓冲,需求了解 CPU 的三级缓存,这儿暂不涉及。
思考题
怎么改造上面的自旋锁代码,使其支撑可重入?
CLH 锁
CLH 是对简略自旋锁的改进。它本质上也是一个自旋锁,可是它处理了锁饥饿和功用低的问题,供给了一个可扩展,功用高,而且公正的锁服务。
CLH 针对锁饥饿,它运用了根据行列的自旋锁,每个获取锁的线程都需求进入到行列中等候,而且自旋拜访前一个线程是否现已开释了锁,假设未开释,则持续自旋。假设现已开释,那么则能够获取锁。经过行列的先进先服务准则,确保了每一个线程都能公正的获取到锁。
CLH 针对功用低,不再是一切线程都只拜访同一个同享变量,而是在每一个线程的内部都会保护一个变量,这个变量用于表明该线程是否现已获取锁和开释锁。每一个线程都只需求自旋前一个线程的变量即可。因而,当变量的值产生变化时,只会影响后继线程,而不会影响行列中的一切线程,缩小了影响规模。
数据结构
CLH 运用隐式链表行列作为其数据结构。隐式链表的意思便是没有前驱指针和后继指针指向前一个节点和后一个节点,取而代之的是,直接在节点中记载前一个节点。
一切恳求获取锁的线程,都会在进入到行列中,自旋拜访前一个节点的状况变量,只需前一个节点开释锁时,当时节点才干获取锁。
CLH 自身包括一个空节点,每个 CLH 节点包括两个特点:恳求锁的线程和持有锁的状况。空节点的特点线程值为 null,锁状况为开释锁。空节点运用 tail 指针引证。当有一个新的线程恳求锁时,将会包装为一个新的 CLH 节点。而且运用 getAndSet 操作,回来当时 tail 指向的节点,并将新节点设置为 tail,此刻新的 CLH 节点将会成为队尾节点。入队成功后,线程会轮询上一个节点的状况变量,当上一个节点开释锁后,它将得到这个锁。
- 初始化一个 CLH,此刻内部只需一个空节点,线程为 null,锁状况为 false。(进程一)
- 当线程 thread1 恳求锁,包装为 CLH 节点,运用原子的 getAndSet 操作将该节点设置为 tail,并发现前一个节点的锁状况为 false,因而获取锁成功,进入临界区履行代码。(进程二)
- 当 thread1 持有锁期间,thread2 恳求锁,发现前一个节点的锁状况为 true,获取锁失利。自旋 thread1 节点的状况,等候其开释锁。(进程三)
- thread1 开释锁,thread2 自旋发现前一个节点现已开释锁,获取锁成功。(进程四)
- thread2 开释锁。(进程五)
假设后续有新的线程恳求锁,会从 thread2 节点后面开端重复上面的进程。
Java 完结 CLH
import java.util.concurrent.atomic.AtomicReference;
public class CLHLock {
// 队尾节点
private final AtomicReference<Node> tail;
// 前一个节点
private ThreadLocal<Node> preNode;
// 当时节点
private ThreadLocal<Node> currNode;
public CLHLock() {
// 默认为 false 的节点
this.tail = new AtomicReference<>(new Node());
this.currNode = ThreadLocal.withInitial(Node::new);
this.preNode = new ThreadLocal<>();
}
// 节点
private static class Node {
private volatile boolean locked;
}
// 加锁办法
public void lock(){
Node node = currNode.get();
node.locked = true;
Node pre = this.tail.getAndSet(node);
preNode.set(pre);
while (pre.locked){
}
}
// 解锁办法
public void unlock(){
Node node = currNode.get();
node.locked = false;
currNode.set(new Node());
}
}
代码来历:zhuanlan.zhihu.com/p/161629590… 45 行做修正。
为什么需求运用 volatile 修饰 locked 字段?不运用不行吗?
由于在 Java 中,每个线程都有独立的作业内存,和一块一切线程同享的主内存。拜访数据时,线程首要会从同享内存中将变量值复制到作业内存中,然后在作业内存中进行读写。由于作业内存是独立,当线程对变量进行修正时,其他线程是无法感知的,只需写回主内存时(事件告诉),其他线程才会知道该变量值被修正了,然后会将作业内存中该变量的值抛弃,从头从主内存中读取。
关于缓存失效,能够参考 MESI,供给一个模拟 MESI 的网站
运用 volatile 关键字,咱们能够使得线程对变量的修正关于其他线程是可见的。而这个可见的意思便是写回主内存中。而 volatile 的之所以能够完结这个效果得益于 Java 中的 happen-before 准则。
happen-before 准则中有一条关于 volatile 变量规则 :关于 volatile 变量的写操作,先行产生于后续对这个 volatile 变量的读操作。这儿的先行产生是时刻意义上的先后。
所以,当一个线程解锁时,设置 node.locked = false
。当 volatile 变量 locked 写操作完结后,后续线程读取 locked 变量能够立刻获取到最新值。
除了处理内存可见性之外,volatile 关键字还处理了别的一个问题:指令重排序。
大多数现代微处理器都会选用将指令乱序履行的办法,在条件答应的状况下,直接运转当时有才能立即履行的后续指令,避开获取下一条指令所需数据时形成的等候。经过乱序履行的技能,处理器能够大大提高履行功率。
来历 Java内存拜访重排序的研讨
换言之处理器只需求确保单线程内的终究成果是正确即可,因而为了避免指令和数据获取的时延,在不影响终究成果的状况下,会不按照代码顺序履行。
在 lock()
办法中,node.locked = true; Node pre = this.tail.getAndSet(node);
这两行代码是有或许被重排序的。即或许先履行 Node pre = this.tail.getAndSet(node);
,然后再履行node.locked = true;
。
这样就会呈现别的一个状况,当履行完 Node pre = this.tail.getAndSet(node);
时,线程被挂起,此刻 node 中 locked 的值为 false,而且现已被放入了队尾中。此刻其他线程来恳求锁,发现 tail 指向的 node 中的 locked 为 false,则直接获取锁成功,履行临界区代码。
这儿形成了两个问题:
- 被挂起的线程永久都没有办法持续获取到锁,由于它现已被认为开释了锁。
- 后续履行都会由于这个呈现计算过错。
volatile 会阻挠这两行句子进行交流,因而避免了呈现上述状况。
volatile 能够完结内存可见性和阻挠指令重排序首要是依托内存屏障来完结的。
tail 为什么运用 AtomicReference
运用 AtomicReference 的目的是确保 tail 的操作是原子性的。getAndSet 的操作其实是分为多个进程的:
- 读取 tail 值
- 读取 node 值
- 设置 tail
- 回来原 tail 值
由于多线程的特性,这儿的每一个进程都或许被暂停。假设线程 A 履行进程一后,CPU 时刻片用完,让出 CPU。接着线程 B 获得履行权,它履行完四个进程。此刻 tail 值现已被更改为线程 B,可是线程 A 获取到的仍然是本来的那个 tail。
因而,假设不能确保 tail 的原子性,那么就会呈现过错。
currNode.set(new Node()); 这行代码的效果是什么?
这一行代码是为了避免 Node 被复用,导致死锁。
线程 A 开释锁,履行完 node.locked = false
后。后继线程 B 即可开端获取锁,但假设此刻后继线程 B 没有 CPU 的时刻片,而线程 A 再次恳求锁,而且获取锁成功,此刻,tail 会指向线程 A。
此刻,就会变成线程 A 的 pre 指向线程 B,线程 B 的 pre 指向线程 A。双方都在等候对方开释锁,因而形成死锁。
开释锁时,替换掉原 node 目标,再次获取锁时,将会运用新的 node 目标当入队尾,而不会复用本来的 node 目标。可是原 node 目标仍然被后继节点引证着。实际上在后继节点开释锁时,能够断开对前一节点的引证,协助 JVM 更好的 GC。
小结
CLH 功用优异,获取锁和开释锁的开支都相对较小。由于运用了链表作为底层数据结构,确保了线程获取锁的公正,避免了锁饥饿的产生。完结难度较小,扩展性却很好。
可是 CLH 也有两个缺点:
- 运用了自旋操作,假设持有锁时刻过长,形成 CPU 资源的糟蹋。
- 单一的 CLH 过于简略,无法完结杂乱功用。
AQS 改造 CLH
AQS 对 CLH 的改造首要在这三方面:
- 在节点中增加了前驱链接和后继链接
- 运用堵塞操作替代自旋操作
- 扩展锁状况字段,使其能够完结更多功用
咱们先来看 AQS 中的 Node 节点长什么样?
static final class Node {
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
}
在节点中增加了前驱链接和后继链接
咱们能够看到 Node 节点中包括了 prev 和 next 变量,这两个变量说明 AQS 内部是运用双向链表。
prev 指针的效果首要是处理撤销获取锁的节点,当线程被中止或许超时,那么就会被撤销获取锁。而 prev 指针便是处理这些需求撤销获取锁的线程。
next 指针的效果是用来唤醒后继线程。由于 AQS 中运用堵塞来挂起线程的,而不是自旋,因而被挂起的线程并不知道前一个节点是否现已开释了锁。所以当一个节点开释锁时,需求自动唤醒后继线程,使其能够获取到锁。
运用堵塞操作替代自旋操作
在 AQS 中,怎么线程获取不到锁,将不会像原始 CLH 那样,自旋前一个节点的锁状况来得知是否能够获取锁。而是直接将线程堵塞,当时一个节点开释锁时,会自动唤醒后一个线程。线程被唤醒后,将会持续测验获取锁。
扩展锁状况字段,使其能够完结更多功用
在 CLH 中,锁状况运用布尔变量来表明,它所表明的的状况只需两种,true 为等候获取锁/现已获取锁;false 为开释锁。在 AQS 中,状况变量是一个 int 值。
它有五种状况:
- CANCELLED(1):表明该节点现已撤销获取锁
- SIGNAL(-1):表明该节点在开释锁之后,需求唤醒后继节点
- CONDITION(-2):表明该节点在条件行列中
- PROPAGATE(-3):在同享锁中才会运用,目前先不解释,首要处理 Semaphore 的 bug。想要了解的能够检查此链接
- NONE(0):默许值,节点被创立时便是这个值
AQS 源码解析
数据结构
public abstract class AbstractQueuedSynchronizer{
private Node head;
private Node tail;
private int state;
}
AQS 内部的字段极端简略,它的组成便是一个双向链表和一个同步状况。
源码解析
lock
public void lock() {
sync.acquire(1);
}
lock 办法是 ReentrantLock 中露出给运用者获取锁的办法。咱们能够看到它是托付给 sync 目标调用 acquire
办法来获取锁。acquire
办法是 AQS 中供给的一个获取锁的模版办法。
acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
selfInterrupt();
}
}
-
履行
tryAcquire
测验获取锁。- 假设成功,则表明获取锁成功。
- 假设失利,则履行 acquireQueued,将线程参加到行列中。
这段代码便是非公正锁的表现,当一个线程去恳求锁时,它首要会测验去获取锁,而不是直接参加到行列中。非公正就表现在行列中还存在等候获取锁的线程,可是一个新的线程去恳求锁时,也或许会成功。这就会导致一个后恳求锁的线程,或许会比先恳求锁的线程优先获取到锁。
- T1 时刻,thread 1 持有锁。thread2,thread3,thread4 顺次恳求锁,但都失利并进入到行列中等候; 此刻 thread2,thread3,thread4 都是被挂起的状况,需求其他线程去唤醒他们。
- T2 时刻,thread 1 开释锁。此刻 thread1 会去唤醒行列中的榜首个有效线程。在唤醒线程之前,thread5 开端恳求锁。
- T3 时刻,thread5 获取锁成功。一起 thread2 被唤醒,thread2 持续恳求锁失利,从头被挂起。
tryAcquire
tryAcquire 是 AQS 中界说的一个笼统办法,需求子类对其重写。这个办法的效果是在独占锁模式下,测验获取锁。这个办法应该一直由建议恳求的线程来履行。假设这个办法回来 false,而且线程未在行列中排队,则能够将线程参加行列中,等候其他线程将其唤醒。假设回来 true,则表明恳求锁成功。
在 ReentrantLock 中,是经过一个内部笼统类 Sync 承继 AbstractQueuedSynchronizer,一个 NonfairSync 承继 Sync 完结非公正锁。一个 FairSync 承继 Sync 完结公正锁。
因而重写 tryAcquire 办法的是 NonfairSync 类和 FairSync 类。
经过咱们简化后,tryAcquire 的代码
// 非公正锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 重写 tryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 公正锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
履行进程:
-
获取 state 值
-
判别 state 值是否为 0,假设等于 0,则表明还没有线程获得锁。不等于 0,则表明锁已被其他线程获得
- 假设 state 等于 0,则经过 CAS 值设置 state。假设设置成功,则表明该线程获取锁,履行临界区代码
- 假设 state 不等于 0(为1),则判别当时线程是否为现已获取锁的线程。假设是,则表明该线程获取锁,履行临界区代码。(这也表明 ReentrantLock 是可重入锁)。重入时,state 会被设置为重入的数量。
这个办法的首要效果便是应该怎么获取锁。最首要的一段代码便是:compareAndSetState(0, acquires)
。修正 state 字段的值为 acquires
。因而,咱们能够得出一个结论,获取锁的关键就在于能不能成功设置 state。假设能够,那么就表明获取锁成功。
ReentrantLock 是一个可重入的锁,那么则需求记载下重入的数量,以便于能够正确的开释锁。在 ReentrantLock 中,重入数据的记载也是经过 state 来完结,当线程每重入一次时,就会将当时 state 加 1,用于记载重入的数量。
对比公正锁和非公正锁,咱们能够发现他们之间恳求锁的代码中只需一处是不同的。公正锁会先判别行列中是否存在有其他线程等候获取锁,假设有,则直接将当时线程参加到行列中。
addWaiter
private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}
当线程恳求锁失利时,就会调用该办法。首要效果是向同步行列中参加一个等候节点。
-
创立一个新的节点
-
在 for 循环中履行以下进程
-
判别 tail 是否存在
- 存在,说明现已初始化过了。直接经过 CAS 操作,将当时节点设置为 tail
- 不存在,则初始化链表。初始化链表的操作便是,创立一个空节点,让 head 和 tail 指针都指向这个空节点。然后从头进入下一轮 for 循环,将之前创立的节点参加到行列之中。
-
经过这个代码,咱们能够知道两个细节:
- AQS 中的同步行列是推迟创立的,只需锁榜首次存在竞赛时才会创立。
- AQS 中的同步行列运用的是有头双向链表。
acquireQueued
final boolean acquireQueued(final Node node, int arg) {
// 1. 界说中止变量。这个变量首要是为了从头设置线程的中止标记。
boolean interrupted = false;
try {
// 进入 for 循环。为了线程被唤醒之后,能够经过该循环,从头履行获取锁的流程。
for (;;) {
// 获取刚刚参加节点的前驱节点
final Node p = node.predecessor();
// 假设前驱节点是头节点,那么说明当时节点是行列中榜首个获取锁的线程。
// 所以,让该线程调用 tryAcquire() 再次测验获取锁。
if (p == head && tryAcquire(arg)) {
// 获取锁成功,设置该节点为头节点。
setHead(node);
p.next = null; // help GC
// 回来中止状况
return interrupted;
}
// 获取锁失利之后,判别是否需求堵塞
if (shouldParkAfterFailedAcquire(p, node))
// 假设需求,那么则堵塞线程。当线程被唤醒后,会设置中止状况
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
// 撤销恳求
cancelAcquire(node);
if (interrupted)
// 建议中止
selfInterrupt();
throw t;
}
}
当线程经过 addWaiter()
参加到等候行列中后,就会履行该办法。首要效果是为现已在行列中的线程恳求锁,这个办法是整个独占锁模式中的核心代码。咱们一句一句来分析。
一开端界说了 interrupted 变量,这个变量的效果是为了从头设置线程的中止标记。为什么需求从头设置中止呢?先留个悬念。
接下来会进入一个 for 循环中,跳出循环的条件便是成功获取锁。进入 for 循环后,首要获取到当时节点的前驱节点,假设前驱节点是头节点,那么说明当时节点是行列中榜首个等候恳求锁的线程。线程开释锁时,它作为榜首候选人去测验获取锁。获取锁成功后,则将该节点设置为头节点。
假设获取锁失利,那么则会履行 shouldParkAfterFailedAcquire 判别线程是否需求堵塞。假设回来 true,表明需求进行堵塞,因而会调用 parkAndCheckInterrupt 办法进行堵塞。线程被堵塞后,会停留在 LockSupport.park(this);
这一句中。直到以下三种状况才会被唤醒:
- 其他线程调用了
unpark
办法,将该线程唤醒 - 其他线程中止 (interrupts ) 了该线程
- 调用呈现了过错
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
该办法首要判别前驱节点的状况是否为 SIGNAL,假设为 SIGNAL,表明现已奉告前驱节点在开释锁时,需求来告诉我,因而能够直接进行堵塞线程操作。
假设大于 0,其实便是状况为 CANCELLED,那么则持续往前寻觅到一个状况为非 CANCELLED 的节点,而且从头设置 pred 指针,进入下一轮循环。
假设等于 0,那么表明还未奉告前驱节点在开释锁时,需求来告诉我,因而要设置前驱节点的状况为 SIGNAL,表明它开释锁时需求来告诉我,进入下一轮循环。
这儿会有一个疑惑点:为什么设置了前驱节点的状况为 SIGNAL 后,需求再次循环去测验获取锁,而不是直接进行堵塞?这儿需求结合开释锁的代码来一起看。
release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
履行 tryRelease
,该办法与 tryAcquire
办法一样,都是需求子类重写的办法。该办法首要效果是设置 state 的值减 1。
假设履行成功,则表明开释锁成功,那么就需求唤醒后继线程去获取锁。但咱们知道,AQS 中运用的是 CLH 行列,选用先进先服务的准则,所以会先唤醒榜首个线程,也便是唤醒 head 的后继节点。由于 head 是一个空节点,不属于正在等候获取锁的节点。
获取到 head 节点,假设 head 为 null,表明行列中没有需求唤醒的节点,直接回来。假设 head 的状况为 0,表明没有后继线程需求唤醒。由于只需状况为 SIGNAL 时,才会去唤醒后继线程。
当符合上述两个条件时,才会调用 unparkSuccessor
办法,开端唤醒后继线程。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
// 设置头节点的状况为 0
node.compareAndSetWaitStatus(ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 假设后继节点为撤销状况,从后往前找一个有效的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
// 唤醒线程
LockSupport.unpark(s.thread);
}
node 节点其实便是 head 节点,首要判别假设 head 节点小于 0,也便是处于 SIGNAL 状况,则将状况修正为 0。为什么要做这一步操作呢?
为了确保线程唤醒与堵塞的正确性。由于 LockSupport.unpark
办法,在线程运转时调用,那么在下一次调用 LockSupport.park
办法时,线程是不会被堵塞的。
还有便是我现已唤醒了后继线程,现已完结了我的任务,那么状况自然便是默许状况。假设后继线程还需求告诉,那么需求从头设置我的状况。
- 在 T1 时刻,thread2 持有锁。thread1 进入榜首次循环,获取锁失利,开端履行
shouldParkAfterFailedAcquire
。可是由于时刻片运用完,CPU 将 thread1 挂起。此刻 head 的 waitStatus 仍然为 0。 - thread2 获取时刻片,开端开释锁,判别条件
h != null && h.waitStatus != 0
,发现 head 的状况为 0,因而直接回来,不会唤醒后继线程。 - thread1 获取到时刻片,持续履行
shouldParkAfterFailedAcquire
,修正 head 节点的状况设置为 SIGNAL,然后直接堵塞。
为什么设置了前驱节点的状况为 SIGNAL 后,需求再次循环去测验获取锁,而不是直接进行堵塞?这儿就能够给出解答:假设设置了 SIGNAL 后,直接堵塞的话,thread 1 会无法获取锁。
日子中的比如
咱们能够幻想一个治病的比如来理解 AQS 中的行为。
假设我到了诊所门口,能够先去看看是否有人在就诊,假设没有则能够直接进入诊所就诊。就诊便是等于获取锁成功。
假设咱们到了诊所门口,发现现已有其他人在就诊,咱们能够让那个人就诊完结出来后奉告我一声能够进入就诊,而且在诊所门口排队。
假设前一个人现已就诊完结,可是还在告诉我去就诊的路上。此刻有另一个人现已到了诊所门口,发现诊所里边没有人,他直接进入诊所中就诊。我收到告诉之后,赶来门口检查,发现里边现已有人在就诊了,因而只能回去本来的方位持续排队。并让那个人就诊完结出来后奉告我一声能够进入就诊。
深入讨论
interrupted 的效果是什么
在 Java 中,选用的是协作式中止。当中止一个线程时,它只是在 JVM 中设置了线程的中止标志为 true,并不会影响当时正在运转的线程。可是假设当时线程是处于 BLOCK 和 WAITED 的状况下,则会被唤醒。
在 AQS 中,假设线程在堵塞时,被中止唤醒了,首要会履行 Thread.
interrupted
()
办法,这个办法的效果是:回来线程的中止状况,而且设置中止状况为 false,即会铲除中止状况。
为什么 AQS 自己铲除中止状况,然后又自己从头设置回去呢,一开端不铲除不就能够了吗?这是为了不影响获取锁的进程。
当线程被中止唤醒后,会持续测验获取锁,此刻假设获取锁失利了,应该需求持续调用 LockSupport.
park
, 将线程堵塞。可是由于中止状况的存在,此刻线程是无法被堵塞的。由于即便堵塞之后,只需检测到中止状况,又会被唤醒。
因而在线程被唤醒后,他需求先铲除中止标记位,直到线程顺利获取锁之后,再从头设置中止状况,让业务自行去处理中止。
呼应中止
上述的操作是经过疏忽中止的办法来获取锁。AQS 中,还有别的一种呼应中止 acquireInterruptibly
****的办法来获取锁。
它与疏忽中止的不同之处在于,线程被唤醒后,假设发现了中止,则直接抛出 InterruptedException
反常,而且将该线程踢出行列中。
什么时分堵塞线程会被唤醒
- 其他线程调用了
unpark
办法,将该线程唤醒 - 其他线程中止 (interrupts) 了线程
- 调用呈现了过错
经过 LockSupport.
park
**办法堵塞的线程,并不知道它是经过以上哪种办法被唤醒的。
为什么需求循环两次才堵塞线程
检查 release
办法的解析
开释锁时,为什么需求从后往前寻觅后继节点
这儿只需求结合 addWaiter 办法就能够理解了。
private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}
在替换 tail 节点之前,会先设置 node 的前驱节点为当时的 tail 节点。也便是说此刻,经过 node 的 pred 指针是能够找到 oldTail 的,可是 oldTail 的 next 指针找不到 node,然后经过 CAS 操作将 tail 指针指向 node。
假设咱们在开释锁时,从前往后找的话,由于 oldTail 的 next 指针还未有设置值为 node,因而咱们有或许找到 oldTail 节点的时分就认为现已遍历完悉数节点了,但其实还疏忽了一个新参加的节点。
假设咱们从后往前找,在经过 CAS 操作将 tail 指针指向 node 后,咱们就能够直接拿到最新的 tail 值,而且新 tail 的前驱指针现已在前面设置完结,所以,能够查询到悉数节点。
小结:从后往前寻觅后继节点是为了在新节点参加到行列中时,能够扫描到它。原因是 compareAndSetTail(oldTail, node)
和 oldTail.next = node;
这两条句子不是原子操作。
怎么理解 AQS 中的公正锁和非公正锁
在 AQS 中,运用了双向链表作为同步行列,进入行列中的线程选用先进先服务的办法。因而能够得出结论,只需进入行列中的线程获取锁便是公正的。可是在非公正锁的模式下,在进入行列之前,会先测验去获取锁,假设获取锁成功,那么就能够不进入行列去等候,而这便是非公正锁的本质。
而公正锁则是,只需获取锁,那么就必须先进入行列。而行列能够确保先进先服务,能够确保肯定的公正。
什么时分会撤销锁恳求
- 获取锁的进程中产生反常,假设呼应中止的办法下,那么中止反常也会导致撤销获取锁
- 运用有恳求锁时刻的办法,假设在指定时刻内没有成功获取锁,会被撤销获取锁