Java 中的 synchronized 关键字在 JVM 层面来确保线程安全,而在 JUC 包下也有能够确保线程安全的类。
在JDK 1.6之前还没有倾向锁和轻量级锁等优化的时分,使用 synchronized 确保线程安满是一个十分重的操作,原因有以下两点(详细能够看《深化了解 Java 线程和锁》):
-
第一点:由于 synchronized 的原子性是经过操作系统的Mutex Lock互斥量完成的,所以每次请求互斥量都需求从用户态转换到内核态。
-
第二点:JVM 线程是和内核线程 1:1 完成的,所以对线程的堵塞和唤醒也是需求从用户态转换到内核态。
为了削减加锁形成对线程频繁堵塞唤醒带来的开销,Java并发编程之父 Doug Lea 供给了一系列的 Lock 经过 CAS + 少数自旋的办法的去提高锁的功能。Lock 接口下有多重种完成,咱们来经过加锁解锁 API 的角度来要点剖析下可重入锁 ReentrantLock。
一. AQS
ReentrantLock 对线程的堵塞和唤醒都是经过 AbstractQueuedSynchronizer 来完成的。
AbstractQueuedSynchronizer 简称 AQS,又叫做 行列同步器,一般以模板类的办法存在于同步工具类的内部类中。AQS 里面维护了一个双向链表来堵塞和唤醒线程,一个 volatile 润饰的 state 变量维护锁的状况:
public abstract class AbstractQueuedSynchronizer {
// 头节点,懒加载
private transient volatile Node head;
// 尾节点,懒加载
private transient volatile Node tail;
// 锁状况和锁重入的数量,0 代表无线程占用
private volatile int state;
}
双向链表的节点的结构:
static final class Node {
// 节点状况
volatile int waitStatus;
// 前继节点
volatile Node prev;
// 后继节点
volatile Node next;
// 堵塞的线程
volatile Thread thread;
// 下个等候节点,单链表,效果相似于Object.wait()的等候行列
Node nextWaiter;
// Used by addWaiter
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
// Used by Condition
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
waitStatus 状况分为以下几种:
-
**CANCELLED(1):**撤销状况,标志该线程不再参与锁的争抢
-
**SIGNAL(-1):**符号后继节点的线程需求被堵塞
-
**CONDITION(-2):**同步堵塞状况,相似调用了Object.wait()
-
**PROPAGATE(-3):**共享模式,同步状况可传播状况
-
**0:**初始状况
二. ReentrantLock
ReentrantLock 有非公正锁和公正锁两种,能够经过构造办法的参数进行初始化,默许对错公正锁:
// 默许初始化非公正锁
public ReentrantLock() {
sync = new NonfairSync();
}
// 能够经过参数指定公正锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
非公正锁望文生义,就是获取锁不是公正的,咱们来看下非公正锁的加锁逻辑:
lock() 加锁
lock() 加锁办法:
// lock()办法源码
final void lock() {
// 先经过CAS的操作将state值改为1,等候值为0;
if (compareAndSetState(0, 1))
// 假如CAS成功,将独占线程设置为当时线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 失利代表有竞赛
acquire(1);
}
非公正锁的加锁很粗犷,上来就抢锁;竞赛成功将独占线程设置为当时线程,这个独占线程的效果有两个:一个是重入锁的判别,另一个使是为了解锁时进行判别避免非占用锁的线程进行误解锁操作;竞赛失利就进行进入 acquire() 办法。
点进 acquire() 办法:
// acquire()源码
public final void acquire(int arg) {
// 再次测验获取锁,假如失利将当时线程参加等候行列进行堵塞
// acquireQueued()也会有测验获取锁的逻辑
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire() 里经过 tryAcquire(arg) 办法再次测验获取一次锁,假如线程没有抓住这次机会,就会被加到行列中。
再看下 tryAcquire(arg) 办法:
// 非公正锁的完成
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// 非公正锁测验获取锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取 state
int c = getState();
// 假如 state 为 0 代表没有线程占用,能够获取锁
if (c == 0) {
// 经过CAS的操作修正state的值(确保原子性)
if (compareAndSetState(0, acquires)) {
// 假如成功将独占线程变量设置为当时线程并回来
setExclusiveOwnerThread(current);
return true;
}
}
// 假如独占线程是当时线程,证明当时线程现已获取到锁,所以进行重入
else if (current == getExclusiveOwnerThread()) {
// 重入次数
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 不需求CAS操作,由于进入到这儿的线程现已获取到了锁天然没有竞赛,直接设置值即可
setState(nextc);
return true;
}
// 抢占失利或许非重入的回来失利
return false;
}
tryAcquire(arg) 里会先去判别 state 的值:
-
假如 state 值为 0 代表没有线程占用,能够进行一次 CAS 操作去修正 state 的值,回来 CAS 成果;
-
假如 state 不为 0 进行重入锁的判别,判别独占线程是否是当时线程,假如是进行锁的重入,这儿不需求对 state 进行 CAS 操作,由于进入到这儿的线程现已获取到了锁天然没有竞赛,直接设置 state 值即可。
在 acquire() 办法内假如 tryAcquire() 测验获取锁的操作失利后会进行线程堵塞的操作,咱们接着来看下 addWaiter() 办法:
// addWaiter源码
private Node addWaiter(Node mode) {
// 将当时线程存储到 Node 节点内
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 假如尾节点不为空
if (pred != null) {
// 将当时线程节点的pre指向尾节点:tail <- node
node.prev = pred;
// CAS 操作将尾节点改为当时线程节点node
if (compareAndSetTail(pred, node)) {
// 假如修正成功
// 将之前 pre(之前的尾节点)的 next 指向 node(现在的尾节点)
pred.next = node;
// 完成等候行列的添加,直接回来
return node;
}
}
// 尾节点为空或 CAS 失利,会走到这儿
enq(node);
return node;
}
// 自旋直到将node参加到等候行列里
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 走到这儿而且尾节点为空代表第一次进行竞赛抢占锁
if (t == null) { // Must initialize
// CAS 设置等候行列的head节点
if (compareAndSetHead(new Node()))
// CAS 操作成功设置尾节
tail = head;
// 进入下一轮循环
} else {
// 尾节点不为空就能将node的prev指向尾节点
node.prev = t;
// CAS 修正尾节点为 node(这块是循环的出口,只有参加等候行列成功才会跳出循环)
if (compareAndSetTail(t, node)) {
// CAS 成功就next连上node并回来
t.next = node;
return t;
}
// CAS 失利代表有竞赛,进入下一轮循环
}
}
}
addWaiter() 办法首要做的事情是将当时线程加到等候行列的尾部。
参加到等候行列后就需求对线程进行堵塞操作,acquireQueued() 办法:
// acquireQueued() 源码
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
// 获取node节点的前继节点
final Node p = node.predecessor();
// 假如前继节点是头节点,就测验获取锁(优化)
if (p == head && tryAcquire(arg)) {
// 获取成功将当时节点设置为头节点
setHead(node);
// 断开之前的头节点
p.next = null; // help GC
failed = false;
// 回来代表获取锁成功,回来履行事务代码
return interrupted;
}
// 假如前继节点不是头节点或获取锁失利会走到这儿
// 查看当时节点是否应该堵塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued() 是线程被唤醒争抢锁和堵塞的中心办法,先判别前继节点是否是 head 头节点:
-
假如是,测验获取锁,失利就测验堵塞线程;
-
假如不是,直接测验堵塞线程。
shouldParkAfterFailedAcquire() 办法判别当时线程节点是否应该堵塞:
// 判别当时d节点是否应该堵塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 假如前继节点的waitStatus为SIGNA就回来true进行堵塞操作
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 大于 0 代表撤销争抢锁,从后往前遍历,直到 waitStatus 不大于零
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 初始化状况将前继节点的 waitStatus 值改为 SIGNAL 状况
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 回来false后会再次进入下一轮循环
return false;
}
直到判别能够进行堵塞后进行线程堵塞:
private final boolean parkAndCheckInterrupt() {
// 线程堵塞办法,调用本地办法堵塞当时线程
LockSupport.park(this);
return Thread.interrupted();
}
线程堵塞在LockSupport.park()办法内,保存线程上下文到寄存器内,被唤醒后会继续履行代码,走到 acquireQueued() 办法的循环内测验获取锁。
lock() 办法的加锁办法和 synchronized 逻辑根本相同,都是维护一个行列来堵塞没抢到锁的线程;可是有些场景下需求没抢到锁快速回来失利而不是堵塞,synchronized 正常是无法做到的,Lock 接口中供给了tryLock() / tryLock(long timeout, TimeUnit unit) 的办法,假如获取不到锁会直接回来失利或自旋一段时刻后回来失利。
tryLock() 加锁
tryLock() 办法不区分公正锁和非公正锁,本身就是一种非公正的表现,直接去测验获取锁,获取成功回来 true,获取失利回来 false,底层就是用的上文说的 nonfairTryAcquire() 办法。
tryLock(long timeout, TimeUnit unit) 加锁
而 tryLock(long timeout, TimeUnit unit) 的逻辑会有些不相同,会等到设置的超时时刻到后才回来:
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 先经过tryAcquire测验获取一次锁,假如失利调用 doAcquireNanos(arg, nanosTimeout)
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
// 自旋超时时刻阈值
static final long spinForTimeoutThreshold = 1000L;
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 查看设置的超时时刻
if (nanosTimeout <= 0L)
return false;
// 计算办法的回来的时刻戳 deadline
final long deadline = System.nanoTime() + nanosTimeout;
// 先将当时线程加到等候行列尾部
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
// 获取当时线程节点的前继节点
final Node p = node.predecessor();
// 假如前面继节点是 head 就去测验获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 获取剩下超时时刻 nanosTimeout
nanosTimeout = deadline - System.nanoTime();
// nanosTimeout 小于 0 代表超时时刻到,回来 false
if (nanosTimeout <= 0L)
return false;
// 查看是否应该堵塞线程
// 假如应该堵塞,判别 nanosTimeout 是否大于 spinForTimeoutThreshold(1000纳妙)
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
// 将线程堵塞 nanosTimeout 的时刻,等时刻到后线程自动唤醒,进行下一次循环判别时刻后回来false
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这儿用到了一个优化,假如 nanosTimeout <= 1000 纳妙,就自旋判别前继节点且测验获取锁而不是堵塞线程,削减了频繁堵塞和唤醒线程的开销。
unlock() 解锁
再看下 unlock() 解锁的流程:
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
// 先测验解锁,假如成功再唤醒head的后继节点
if (tryRelease(arg)) {
Node h = head;
// head 不等于 null 而且不等于 0 就唤醒后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease() 测验解锁的逻辑:
protected final boolean tryRelease(int releases) {
// 计算解锁后的state值
int c = getState() - releases;
// 判别解锁的线程是否占用锁
if (Thread.currentThread() != getExclusiveOwnerThread())
// 假如不占用抛出反常,避免未持有锁的线程误解锁
throw new IllegalMonitorStateException();
boolean free = false;
// 假如等于零代表解锁成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 不等于零的状况代表重入锁没解锁完
setState(c);
return free;
}
解锁的时分需求判别当时线程是否是独占线程,避免未持有锁的线程误解锁。
当解锁成功后去唤醒 head 的后继节点:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 假如后继节点为空或许状况为已撤销
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾节点tail从后往前遍历,找到最靠前的不为null且状况不为撤销的
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 堵塞线程
LockSupport.unpark(s.thread);
}
一般状况都是唤醒 head 的后继节点,可是假如后继节点为空或许状况为已撤销,就需求从后往前遍历,这也是为什么 AQS 的等候行列要设计成双向链表的原因之一。
lockInterruptibly() 可中止加锁
ReentrantLock 供给了 lockInterruptibly() 办法来供给获取锁并堵塞时,能够响应中止的功用:
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 先测验获取锁
if (!tryAcquire(arg))
// 获取锁失利进入 doAcquireInterruptibly
doAcquireInterruptibly(arg);
}
doAcquireInterruptibly(arg) 的逻辑:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 要点就在这儿,当调用线程的interrupt()中止办法时,堵塞的线程会被唤醒,并抛出反常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
调用 lockInterruptibly() 进行加锁的线程,在没抢到锁堵塞的时分假如当调用线程的 interrupt() 中止办法时,堵塞的线程会被唤醒,并抛出反常。
公正锁
ReentrantLock 的公正锁是遵从一个先来后到的准则:公正锁加锁的时分会将线程加到等候行列的尾部,按照行列的次序分配锁资源。应用于一些要求次序履行或同步履行时刻长(同步过程中耗时较长,公正锁会产生无用自旋)的场景。由于需求确保次序性,少去许多自旋的加锁测验,在同步时刻短和并发高的场景下功能天然会比非公正锁差些。
ReentrantLock 的加锁解锁逻辑整体大概如下:
三. ReentrantLock 怎么确保线程安全?
ReentrantLock 为什么能够确保线程安全三要素?
之前在《深化了解Java内存模型》剖析了,想要确保线程安全,就必须确保原子性、有序性、可见性。原子性和有序性肯定能够确保,由于整个同步过程是原子的,同步过程中只有一个线程能履行能够确保有序性,那可见性呢?
Lock lock = new ReentrantLock();
int i = 0;
// 伪代码...
try {
lock.lock();
i ++;
} finally {
lock.unlock();
}
synchronized 关键字是JVM底层支撑的解锁的时分会将同步代码块中的数据刷回主内存,那在上述代码中,还需求对 i 加上volatile关键字才干确保可见性终究确保线程安全?
其实是不需求的,AQS 中的 state 是 volatile 润饰的,加锁和解锁的过程都会操作 state,而之前在《深化浅出volatile关键字》中咱们了解到,对 volatile 润饰的关键字进行操作时会将本地缓存中的数据都改写回主内存,一切中心操作的数据也都会改写回主内存,可见性天然也能够确保。
四. 比较和总结
比较于 synchronized,ReentrantLock 供给了更多功用的API,比方快速失利加锁:tryLock(),超时加锁:tryLock(long timeout, TimeUnit unit),可中止加锁:lockInterruptibly(),公正锁等;可是由于 synchronized 后期参加了许多优化(想知道Java的亲儿子 synchronized 怎么被优化能够看《深化了解Java 线程和锁》),二者的功能相差无几,ReentrantLock 除了需求显示的加锁和解锁,且需求在finally里解锁(发生反常不会自动解锁)还会有更多的功用。
后边会剖析设计愈加奇妙的读写锁 ReentrantReadWriteLock。
假如觉得文章不错能够点个赞和重视!
公众号:阿东编程之路
你好,我是阿东,目前从事后端研发作业。技能更新太快需求一直增加自己的储藏,干脆就将学到的东西记录下来一起共享给朋友们。未来我会在此公众号共享一些技能以及学习笔记之类的。妥妥的都是干货,跟我们一起生长。