持续创作,加速生长!这是我参加「日新计划 10 月更文应战」的第17天,点击查看活动概况
概述
这篇文章浅显易懂了解Java并发AQS的独占锁形式讲解了AQS的独占锁完成原理,那么本篇文章在论述AQS另外一个重要形式,同享锁形式,那什么是同享锁呢?
同享锁能够由多个线程一同获取, 比较典型的便是读锁,读操作并不会产生副作用,所以能够答应多个线程一同对数据进行读操作而不会有线程安全问题,jdk中的许多并发东西比方ReadWriteLock和CountdownLatch便是依靠AQS的同享锁完成的。
本文重点讲解下AQS是怎么完成同享锁的。
自定义同享锁例子
首要我们经过AQS完成一个非常最最最轻量简略的同享锁例子,协助我们对同享锁有一个全体的感知。
@Slf4j
public class ShareLock {
/**
* 同享锁协助类
*/
private static class ShareSync extends AbstractQueuedSynchronizer {
private int lockCount;
/**
* 创立同享锁协助类,最多有count把同享锁,超过了则堵塞
*
* @param count 同享锁数量
*/
public ShareSync(int count) {
this.lockCount = count;
}
/**
* 测验获取同享锁
*
* @param arg 每次获取锁的数量
* @return 回来正数,表明后续其他线程获取同享锁可能成功; 回来0,表明后续其他线程无法获取同享锁;回来负数,表明当时线程获取同享锁失利
*/
@Override
protected int tryAcquireShared(int arg) {
// 自旋
for (;;) {
int c = getState();
// 假如持有锁的数量大于指定数量,回来-1,线程进入堵塞
if(c >= lockCount) {
return -1;
}
int nextc = c + 1;
// cas设置成功,回来1,获取到同享锁
if (compareAndSetState(c, nextc)) {
return 1;
}
}
}
/**
* 测验开释同享锁
*
* @param arg 开释锁的数量
* @return 假如开释后答应唤醒后续等候结点回来true,不然回来false
*/
@Override
protected boolean tryReleaseShared(int arg) {
// 自旋操作
for (; ; ) {
int c = getState();
// 假如没有锁了
if (c == 0) {
return false;
}
// 不然锁量-1
int nextc = c - 1;
// cas修正状况
if (compareAndSetState(c, nextc)) {
return true;
}
}
}
}
private final ShareSync sync;
public ShareLock(int count) {
this.sync = new ShareSync(count);
}
/**
* 加同享锁
*/
public void lockShare() {
sync.acquireShared(1);
}
/**
* 开释同享锁
*/
public void releaseShare() {
sync.releaseShared(1);
}
}
- 创立内部类同享协助锁
ShareSync
类,承继自AbstractQueuedSynchronizer
类,完成了同享锁相关的办法tryAcquireShared()
和tryReleaseShared()
。 - 创立
ShareLock
,供给了lockShare()
加锁和releaseShare()
两个API。
验证:
public static void main(String[] args) throws InterruptedException {
ShareLock shareLock = new ShareLock(3);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
shareLock.lockShare();
try {
log.info("lock success");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
shareLock.releaseShare();
log.info("release success");
}
}, "thread-" + i).start();
}
Thread.sleep(10000);
}
- 总共创立最多共同有3个线程同享的同享锁。
- 创立5个线程去竞争同享锁。
运转成果:
- 运转成果显示每次最多只需3个
lock success
,阐明一同只需3个线程同享。 - 只需在开释同享锁今后,其他线程才干获取锁。
下面对它的完成原理一探终究。
核心原理机制
同享形式也是由AQS供给的,首要我们关注下AQS的数据结构。
AQS内部保护了一个volatile int state(代表同享资源)和一个FIFO线程等候行列(多线程争用资源被堵塞时会进入此行列)。
AQS作为一个抽象办法,供给了加锁、和开释锁的框架,这儿选用的模板方形式,在上面中说到的tryAcquireShared
、tryReleaseShared
便是和同享形式相关的模板办法。
办法名 | 描绘 |
---|---|
protected int tryAcquireShared(int arg) | 同享办法。arg为获取锁的次数,测验获取资源。负数表明失利;0表明成功,但没有剩下可用资源;正数表明成功,且有剩下资源。 |
protected boolean tryReleaseShared(int arg) | 同享办法。arg为开释锁的次数,测验开释资源,假如开释后答应唤醒后续等候结点回来True,不然回来False。 |
同享形式的进口办法如下:
办法名 | 描绘 |
---|---|
void acquireShared(int arg) | 同享形式获取锁,不呼应中止。 |
void acquireSharedInterruptibly(int arg) | 同享形式获取锁,呼应中止。 |
tryAcquireSharedNanos(int arg, long nanosTimeout) | 测验在同享形式下获取锁,假如中止则中止,假如超过给定超时则失利。 |
boolean releaseShared(int arg) | 同享形式下开释锁。 |
源码解析
上图是AQS的类结构图,其中标红部分是组成AQS的重要成员变量。
成员变量
- state同享变量
AQS中里一个很重要的字段state,表明同步状况,是由volatile润饰的,用于展示当时临界资源的获锁情况。经过getState(),setState(),compareAndSetState()三个办法进行保护。
关于state的几个关键:
- 运用volatile润饰,确保多线程间的可见性。
- getState()、setState()、compareAndSetState()运用final润饰,约束子类不能对其重写。
- compareAndSetState()选用达观锁思想的CAS算法,确保原子性操作。
- CLH行列(FIFO行列)
AQS里另一个重要的概念便是CLH行列,它是一个双向链表行列,其内部由head和tail别离记录头结点和尾结点,行列的元素类型是Node。
private transient volatile Node head;
private transient volatile Node tail;
Node的结构如下:
static final class Node {
//同享形式下的等候标记
static final Node SHARED = new Node();
//独占形式下的等候标记
static final Node EXCLUSIVE = null;
//表明当时结点已撤销调度。当timeout或被中止(呼应中止的情况下),会触发改变为此状况,进入该状况后的结点将不会再改变。
static final int CANCELLED = 1;
//表明后继结点在等候当时结点唤醒。后继结点入队时,会将前继结点的状况更新为SIGNAL。
static final int SIGNAL = -1;
//表明结点等候在Condition上,当其他线程调用了Condition的signal()办法后,CONDITION状况的结点将从等候行列转移到同步行列中,等候获取同步锁。
static final int CONDITION = -2;
//同享形式下,前继结点不只会唤醒其后继结点,一同也可能会唤醒后继的后继结点。
static final int PROPAGATE = -3;
//状况,包含上面的四种状况值,初始值为0,一般是节点的初始状况
volatile int waitStatus;
//上一个节点的引证
volatile Node prev;
//下一个节点的引证
volatile Node next;
//保存在当时节点的线程引证
volatile Thread thread;
//condition行列的后续节点
Node nextWaiter;
}
留意,waitSstatus负值表明结点处于有效等候状况,而正值表明结点已被撤销。所以源码中许多地方用>0、<0来判别结点的状况是否正常。
- exclusiveOwnerThread
AQS经过承继AbstractOwnableSynchronizer类,拥有的属性。表明独占形式下同步器持有的线程。
同享锁获取acquireShared(int)
acquireShared(int)是同享锁形式下线程获取同享资源的进口办法,它会获取指定量的资源,获取成功则直接回来,获取失利则进入等候行列,直到获取到资源停止,整个过程无法呼应中止。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
办法的全体流程如下:
- tryAcquireShared()测验获取资源,需要自定义同步器去完成,回来负值代表获取失利;0代表获取成功,但没有剩下资源;正数表明获取成功,还有剩下资源,其他线程还能够去获取。
- 假如失利则经过doAcquireShared()进入等候行列,直到获取到资源停止才回来。
doAcquireShared(int)
此办法用于将当时线程参加等候行列尾部休息,直到其他线程开释资源唤醒自己,自己成功拿到相应量的资源后才回来。
private void doAcquireShared(int arg) {
//封装线程为同享Node 参加行列尾部
final Node node = addWaiter(Node.SHARED);
//是否成功标志
boolean failed = true;
try {
//等候过程中是否被中止过的标志
boolean interrupted = false;
// 自旋操作
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
//假如到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的
if (p == head) {
//测验获取资源
int r = tryAcquireShared(arg);
//成功
if (r >= 0) {
//将head指向自己,还有剩下资源能够再唤醒之后的线程
setHeadAndPropagate(node, r);
p.next = null; // help GC
//假如等候过程中被打断过,此时将中止补上。
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//判别状况,寻找安全点,进入waiting状况,等着被unpark()或interrupt()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
doAcquireShared
办法的完成和获取独占锁中的acquireQueued
办法很类似,但是主要有一点不同,那便是线程在被唤醒后,若成功获取到了同享锁,还需要判别同享锁是否还能被其他线程获取,若能够,则持续向后唤醒它的下一个节点对应的线程。
setHeadAndPropagate(Node, int)
该办法主要将当时节点设置为头节点,一同判别条件是否契合(比方还有剩下资源),还会去唤醒后继结点,毕竟是同享形式。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
//head指向自己
setHead(node);
//假如还有剩下量,持续唤醒下一个邻居线程
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// 唤醒操作
doReleaseShared();
}
}
同享开释releaseShared(int)
releaseShared(int)
是同享形式下线程开释同享资源的进口,它会开释指定量的资源,假如成功开释且答应唤醒等候线程,它会唤醒等候行列里的其他线程来获取资源。
public final boolean releaseShared(int arg) {
//测验开释资源
if (tryReleaseShared(arg)) {
//唤醒后继结点
doReleaseShared();
return true;
}
return false;
}
办法的全体流程如下:
- tryReleaseShared测验开释锁,这由自定义同步器去完成, 回来true表明开释成功。
- doReleaseShared唤醒后续行列中等候的节点,
doReleaseShared()
此办法主要用于唤醒行列中等候的同享节点。
private void doReleaseShared() {
// 自旋操作
for (;;) {
// 获取头节点
Node h = head;
if (h != null && h != tail) {
// 获取节点的等候状况
int ws = h.waitStatus;
// 假如节点等候状况是-1, -1表明有职责唤醒后续节点的状况
if (ws == Node.SIGNAL) {
// cas修正当时节点的等候状况为0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//唤醒后续节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)// head发生改变
break;
}
}
- 逻辑是一个死循环,每次循环中重新读取一次head,然后保存在局部变量h中,再合作
if(h == head) break;
,这样,循环检测到head没有改变时就会退出循环。留意,head改变必定是因为:acquire thread被唤醒,之后它成功获取锁,然后setHead设置了新head。而且留意,只需经过if(h == head) break;
即head不变才干退出循环,不然会履行屡次循环。 -
if (h != null && h != tail)
判别行列是否至少有两个node,假如行列从来没有初始化过(head为null),或许head便是tail,那么中间逻辑直接不走,直接判别head是否改变了。 - 假如行列中有两个或以上个node,那么查看局部变量h的状况:
-
- 假如状况为SIGNAL,阐明h的后继是需要被告诉的。经过对CAS操作成果取反,将
compareAndSetWaitStatus(h, Node.SIGNAL, 0)
和unparkSuccessor(h)
绑定在了一同。阐明了只需head成功得从SIGNAL修正为0,那么head的后继的代表线程必定会被唤醒了。 - 假如状况为0,阐明h的后继所代表的线程现已被唤醒或行将被唤醒,而且这个中间状况行将消失,要么因为acquire thread获取锁失利再次设置head为SIGNAL并再次堵塞,要么因为acquire thread获取锁成功而将自己(head后继)设置为新head而且只需head后继不是队尾,那么新head必定为SIGNAL。所以设置这种中间状况的head的status为PROPAGATE,让其status又变成负数,这样可能被被唤醒线程检测到。
- 假如状况为SIGNAL,阐明h的后继是需要被告诉的。经过对CAS操作成果取反,将
- 假如状况为PROPAGATE,直接判别head是否改变。
- 两个continue确保了进入那两个分支后,只需当CAS操作成功后,才可能去履行if(h == head) break;,才可能退出循环。
- if(h == head) break;确保了,只需在某个循环的过程中有线程刚获取了锁且设置了新head,就会再次循环。目的当然是为了再次履行unparkSuccessor(h),即唤醒行列中第一个等候的线程。
总结
本文主要讲解了AQS的同享形式,经过一个自定义简略的demo协助我们浅显易懂的了解,一同深入分析了源码完成,希望对我们有协助。
参考
developer.aliyun.com/article/779…
tech.meituan.com/2019/12/05/…
www.cnblogs.com/tuyang1129/…
www.cnblogs.com/waterystone…
www.cnblogs.com/moxiaotao/p…
blog.csdn.net/anlian523/a…