我正在参加「启航计划」
一 同步锁
在并发编程中,咱们常常用到的是synchronized
和ReentrantLock
。其间,synchronized
是jvm
内置锁,而ReentrantLock
坐落java.util.concurrent
包下(以下简称JUC
),ReentrantLock
是依据AbstractQueuedSynchronizer
(以下简称AQS
)同步器框架完成的,本文主要来介绍AQS
的内部完成及在JUC
中依据AQS
完成的相关类。
二 AQS内部完成
AQS
是一个抽象类,内部保护一个state
变量(代表同享资源)、一个FIFO
等候行列(用来获取同享资源、线程排队办理等)。AQS
界说了两种拜访同享资源的办法:Exclusive
(独占办法,每次只要一个线程拜访资源,如:ReentrantLock
)、Share
(同享办法,多个线程能够一起拜访资源,如:Semaphore
、CountDownLatch
等)。不管是独占办法仍是同享办法,其具体完成只需求完成同享资源state
的获取和开释即可,等候行列的相关操作(如获取锁失利入队、唤醒出队等),AQS
现已完成好了。
注:尽管AQS
提供了独占和同享两种办法拜访同享资源,但是两者并不一定是互斥的,还可所以独占和同享共存的办法拜访同享资源,如ReentrantReadWriteLock
(后边单独分析)。
1、等候行列
Node
是AQS
中的一个静态内部类,上面提到AQS中保护了一个FIFO的双端等候行列,当获取锁失利时,AQS会将当时线程及等候状况等信息结构成一个节点Node,加入到等候行列的队尾,而且当时线程变成堵塞状况,等候唤醒。
//等候行列的Head节点
private transient volatile Node head;
//等候行列的Tail节点
private transient volatile Node tail;
static final class Node {
//同享锁对应节点
static final Node SHARED = new Node();
//独占锁对应节点
static final Node EXCLUSIVE = null;
//等候状况
volatile int waitStatus;
Node nextWaiter;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
//前驱节点
volatile Node prev;
//后继节点
volatile Node next;
//当时线程
volatile Thread thread;
final boolean isShared() {
return nextWaiter == SHARED;
}
//找到当时节点的前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {}
//在addWaiter()时调用
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
//在Condition中调用
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
- waitStatus:当时线程在等候行列中的状况
waitStatus | 值 | 功用 |
---|---|---|
SIGNAL | -1 | 当时节点的后继节点被堵塞(经过park )时,设置其前驱节点的状况为SIGNAL 。标记为SIGNAL 节点的线程开释锁时就会通知后继节点,使得后继节点所在的线程被唤醒。这个状况一般是后继节点来设置前驱节点的。 |
CANCELLED | 1 | 因为超时(timeout )或中止(interrupt ),行列中此节点被撤销。节点进入此状况后不会再改变。 |
CONDITION | -2 | 此节点当时坐落条件行列(condition queue )中。当其他线程对这个Condition 调用signal 办法后,它会被转移同步行列(sync queue )中。 |
PROPAGATE | -3 | 已发布的节点应该传播到其他节点。这在doReleaseShared 中设置(仅针对head 节点),以确保传播持续进行,即使其他操作现已介入。 |
0 | 0 | 以上的都不是,默认初始无锁状况 |
-
prev :等候行列(
Sync Queue
)中前驱节点 -
next :等候行列(
Sync Queue
)中后继节点 - thread : 当时线程
-
nextWaiter:条件行列(
Condition Queue
)的后继节点
注:等候行列(Sync Queue)在独占形式、同享形式中都会运用到,条件行列(Condition Queue)只会在独占锁中运用。
如:
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
condition.await();
condition.signal();
经过ReentrantLock
能够结构Condition
,调用condition的await/signal
办法时会把当时线程在Condition Queue
中履行入队/出队操作。
当调用condition.await()
时,Node
节点就会从Sync Queue
中进入到Condition Queue
中,且对应的pre、next
都会被置为null
,waitStatus
变为CONDITION
,并经过nextWaiter
构成链表;
而当调用condition.signal()
时,Node
又会从Condition Queue
进入到Sync Queue
中,pre、next
从头设置,waitStatus
依据当时状况进行设置,nextWaiter
会被置为null
。
注:当调用多个lock.newCondition()
时,就会发生多个Condition
行列,即一个ReentrantLock
能够对应多个Condition Queue
。
2、状况指示器state
state
表示当时锁的状况,state=0
是初始状况,即无锁状况; 当state>0
表示现已有线程取得了锁,当同一个线程多次取得同步锁时(可重入性),state
会递加,而在开释锁的时候,state
会进行递减直到state=0
时,其他线程才有资历获取锁。独占形式下只要一个线程能够获取同步锁,而同享形式下能够有多个线程能够获取同步锁。
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
//经过CAS办法更新state值
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
继承AQS
的同步器主要完成:
独占锁需求完成:
-
isHeldExclusively()
:是否是独占资源,当用到condition
时需求完成它。 -
tryAcquire(int arg)
:独占锁办法,测验获取资源,成功回来true
,不然回来false
。 -
tryRelease(int arg)
:独占锁办法,开释资源,假如开释操作使得所有获取同步器时被堵塞的线程康复履行,那么回来的是true
,不然回来false
。
同享锁需求完成:
-
tryAcquireShared(int arg)
:同享办法获取资源,负数表示获取操作失利;0代表成功,但无剩下资源;正数代表成功,且有剩下资源。 -
tryReleaseShared(int arg)
:同享办法测验开释资源,假如开释后答应唤醒后续节点回来true
,不然回来false
。
咱们来看下上述办法在AQS
中的完成:
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
无论是独占锁仍是同享锁,获取锁和开释锁在AQS
中都是没有完成的,需求在自界说同步器中完成
独占锁相关:
-
acquire(int arg)
: 独占形式下测验获取同享资源,疏忽中止操作。获取成功回来true
,不然线程进入等候行列中,等候被唤醒后持续测验获取同享资源直到成功停止。本办法能够在完成Lock.lock()
中运用。acquire()
在AQS
中的源码:
//tryAcquire测验获取资源(在子类中完成),如成功直接回来,不然经过addWaiter及acquireQueued将该线程加入到等候行列的队尾,标记为独占形式,假如该线程在等候过程中中止过,acquireQueued办法会回来true,不然回来false。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//假如tail为空(即行列为空),或经过CAS设置node到队尾失利,持续经过enq()循环设置node到队尾,直至成功停止
enq(node);
return node;
}
//将node设置到行列尾部
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
*1、当时node节点的前驱节点是SIGNAL状况,回来true,意味着当时线程会被挂起,堵塞等候。
*2、假如前驱节点是CANCEL状况(waitStatus>0),越过此节点并删除,持续往上找,直到找到不是CANCEL状况的节点作为其前驱节点停止;假如前驱节点是0或许PROPAGATE状况,则将前驱节点设置为SIGNAL状况,则在下一次履行循环时shouldParkAfterFailedAcquire能够直接回来true,挂起线程。
**/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
-
acquireInterruptibly(int arg)
: 同acquire
办法,区别在于假如收到中止通知会直接中止。本办法能够在Lock.lockInterruptibly()
中运用。
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
三 JUC中依据AQS的相关完成
依据AQS
构建的同步器类中,最基本的操作便是各种形式的获取操作和开释操作。获取操作是一种依赖状况的操作,通常会堵塞;开释操作时被堵塞的线程会从头开端履行。前面也提到了,AQS
经过办理state
变量来进行相关操作,跟state
相关的办法有getState
、setState
、compareAndSetState
,而且state
在不同完成AQS的子类中代表的意思也各不相同。如:state 在 ReentrantLock中代表的是锁所有者线程重复获取该锁的次数;Semaphore 中 state代表剩下的答应数量;FutureTask中state用来表示使命的状况(未开端、正在运转、已完成、已撤销)。
-
ReentrantLock
:只支持独占办法获取锁,所以内部只完成了tryAcquire
、tryRelease
、isHeldExclusively
。ReentrantLock
运用AQS
对多个条件变量、多个等候线程集内置支持。Lock.newCondition
回来一个新的ConditionObject
实例。 -
CountDownLatch
:用于保存当时答应的数量。获取操作意味着“等候并直到闭锁到达结束状况” -
Semaphore
:Semaphore
翻译为信号量,是一种同享锁,能够一起答应一个或多个线程一起同享资源。 -
CyclicBarrier
:CyclicBarrier
意为循环栅门,能够完成一组线程等候至某个状况之后再全部持续履行。当所有线程履行结束后,CyclicBarrier
还能够持续被重用。 -
ReentrantReadWriteLock
:ReentrantReadWriteLock
是独占锁(写锁)、同享锁(读锁)能够一起存在的一种读写锁,在读操作远大于写操作的场景中,能完成更好的并发性。 -
ThreadPoolExecutor
:线程池,内部完成运用的ReentrantLock
。别的Android
中AsyncTask
内部完成便是ThreadPoolExecutor
。
四 总结
获取锁的流程:
-
AQS
的模板办法acquire
经过调用子类自界说完成的tryAcquire
获取锁; - 假如获取锁失利,经过
addWaiter
办法将线程结构成Node
节点插入到同步行列队尾; - 在
acquirQueued
办法中以自旋的办法测验获取锁,假如失利则判别是否需求将当时线程堵塞,假如需求堵塞则终究履行LockSupport(Unsafe)
中的native API
来完成线程堵塞。
开释锁的流程:
- 首先获取当时节点(实际上传入的是
head
节点)的状况,假如head
节点的下一个节点是null
,或许下一个节点的状况为CANCEL
(不用再唤醒了),则直接从等候行列的尾部开端遍历,一向遍历到最前面的waitStatus
小于等于 0 的节点(大于0的状况是CANCEL
状况)。 - 假如终究遍历到的节点不为
null
,再调用LockSupport.unpark
办法,调用底层办法唤醒线程。
五 参考
【1】并发Lock之AQS(AbstractQueuedSynchronizer)详解
【2】逐行分析AQS源码(1)——独占锁的获取