欢迎关注专栏【JAVA并发】
前语
咱们应该都用过synchronized
关键字加锁,用来确保某个时间只答应一个线程运转。那么假如控制某个时间答应指定数量的线程履行,有什么好的办法呢? 答案便是JUC供给的信号量Semaphore
。
介绍和运用
-
Semaphore
(信号量)能够用来约束能一起拜访同享资源的线程上限,它内部维护了一个答应的变量,也便是线程答应的数量 -
Semaphore
的答应数量假如小于0个,就会堵塞获取,直到有线程开释答应 -
Semaphore
是一个非重入锁
API介绍
- 结构办法
-
public Semaphore(int permits)
:permits
表明答应线程的数量 -
public Semaphore(int permits, boolean fair)
:fair
表明公正性,假如设为true
,表明是公正,那么等候最久的线程先履行
- 常用API
-
public void acquire()
:表明一个线程获取1个答应,那么线程答应数量相应削减一个 -
public void release()
:表明开释1个答应,那么线程答应数量相应会增加
- 其他API
-
void acquire(int permits)
:表明一个线程获取n个答应,这个数量由参数permits
决议 -
void release(int permits)
:表明一个线程开释n个答应,这个数量由参数permits
决议 -
int availablePermits()
:回来当时信号量线程答应数量 -
int getQueueLength()
: 回来等候获取答应的线程数的预估值
根本运用
public static void main(String[] args) {
// 1. 创建 semaphore 目标
Semaphore semaphore = new Semaphore(2);
// 2. 10个线程一起运转
for (int i = 0; i < 8; i++) {
new Thread(() -> {
// 3. 获取答应
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug("running...");
sleep(1);
log.debug("end...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 4. 开释答应
semaphore.release();
}
}).start();
}
}
运转结果:
原理介绍
上面是Semaphore
的类结构图,其间FairSync
和NonfairSync
是它的内部类,他们一起继承了AQS类,AQS的同享形式供给了Semaphore
的加锁、解锁。
假如对AQS不了解的请移步浅显易懂了解Java并发AQS的同享锁形式
为了更好的搞懂原理,咱们经过一个例子来协助咱们了解。
假定Semaphore
的 permits
为 3,这时 5 个线程来获取资源,其间Thread-1
,Thread-2
,Thread-4
CAS 竞赛成功,permits
变为 0,而 Thread-0
和 Thread-3
竞赛失利。
获取答应acquire()
-
acquire()
主办法会调用sync.acquireSharedInterruptibly(1)
办法 -
acquireSharedInterruptibly()
办法会先调用tryAcquireShared()
办法回来答应的数量,假如小于0个,调用doAcquireSharedInterruptibly()
办法进入堵塞
// acquire() -> sync.acquireSharedInterruptibly(1),可中断
public final void acquireSharedInterruptibly(int arg) {
if (Thread.interrupted())
throw new InterruptedException();
// 测验获取通行证,获取成功回来 >= 0的值
if (tryAcquireShared(arg) < 0)
// 获取答应证失利,进入堵塞
doAcquireSharedInterruptibly(arg);
}
-
tryAcquireShared()
办法在终会调用到Sync#nonfairTryAcquireShared()
办法 -
nonfairTryAcquireShared()
办法中会减去获取的答应数量,回来剩下的答应数量
// tryAcquireShared() -> nonfairTryAcquireShared()
// 非公正,公正锁会在循环内 hasQueuedPredecessors()办法判别堵塞行列是否有临头节点(第二个节点)
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 获取 state ,state 这里【表明通行证】
int available = getState();
// 核算当时线程获取通行证完成之后,通行证还剩下数量
int remaining = available - acquires;
// 假如答应已经用完, 回来负数, 表明获取失利,
if (remaining < 0 ||
// 答应证足够分配的,假如 cas 重试成功, 回来正数, 表明获取成功
compareAndSetState(available, remaining))
return remaining;
}
}
- 假如剩下的答应数量<0, 会调用
doAcquireSharedInterruptibly()
办法将当时线程加入到堵塞行列中堵塞 - 办法中调用
parkAndCheckInterrupt()
堵塞当时线程
private void doAcquireSharedInterruptibly(int arg) {
// 将调用 Semaphore.aquire 办法的线程,包装成 node 加入到 AQS 的堵塞行列中
final Node node = addWaiter(Node.SHARED);
// 获取标记
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 前驱节点是头节点能够再次获取答应
if (p == head) {
// 再次测验获取答应,【回来剩下的答应证数量】
int r = tryAcquireShared(arg);
if (r >= 0) {
// 成功后本线程出队(AQS), 所在 Node设置为 head
// r 表明【可用资源数】, 为 0 则不会继续传播
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 堵塞
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
// 被打断后进入该逻辑
if (failed)
cancelAcquire(node);
}
}
终究的AQS状态如下图所示:
-
Thread-1
、Thread-2
、Thread-4
正常运转 -
AQS的
state
也便是等于0 -
Thread-0
、Thread-3
再堵塞行列中
开释答应release()
现在Thread-4
运转结束,要开释答应,Thread-0
、Thread-3
又是怎么康复履行的呢?
- 调用
release()
办法开释答应,终究调用Sync#releaseShared()
办法 - 假如办法
tryReleaseShared(arg)
测验开释答应成功,那么调用doReleaseShared();
进行唤醒
// release() -> releaseShared()
public final boolean releaseShared(int arg) {
// 测验开释锁
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
-
tryReleaseShared()
办法主要是测验开释答应 - 获取当时答应数量 + 开释的数量,然后经过cas设置回去
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 获取当时锁资源的可用答应证数量
int current = getState();
int next = current + releases;
// 索引越界判别
if (next < current)
throw new Error("Maximum permit count exceeded");
// 开释锁
if (compareAndSetState(current, next))
return true;
}
}
- 调用
doReleaseShared()
办法唤醒行列中的线程 - 其间
unparkSuccessor()
办法是唤醒的核心操作
// 唤醒
private void doReleaseShared() {
// 假如 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
// 假如 head.waitStatus == 0 ==> Node.PROPAGATE
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 防止 unparkSuccessor 被屡次履行
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒后继节点
unparkSuccessor(h);
}
// 假如已经是 0 了,改为 -3,用来解决传播性
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
终究AQS状态如下图所示:
- 答应state变回1
- 然后
Thread-0
开始竞赛,假如竞赛成功,如下图所示:
- 因为Thread-0竞赛成功,再次获取到答应,答应数量减1,终究又变回0
- 然后等候行列中剩下
Thread-3
总结
Semaphore
信号量类根据AQS的同享锁完成,有公正锁和非公正锁两个版别,它用来约束能一起拜访同享资源的线程上限,典型的使用场景是能够用来维护有限的公共资源,比方数据库衔接等。
假如本文对你有协助的话,请留下一个赞吧
本文正在参与「金石计划 . 分割6万现金大奖」