欢迎关注专栏【JAVA并发】

前语

咱们应该都用过synchronized 关键字加锁,用来确保某个时间只答应一个线程运转。那么假如控制某个时间答应指定数量的线程履行,有什么好的办法呢? 答案便是JUC供给的信号量Semaphore

介绍和运用

  • Semaphore(信号量)能够用来约束能一起拜访同享资源的线程上限,它内部维护了一个答应的变量,也便是线程答应的数量
  • Semaphore的答应数量假如小于0个,就会堵塞获取,直到有线程开释答应
  • Semaphore是一个非重入锁

API介绍

  1. 结构办法
  • public Semaphore(int permits)permits 表明答应线程的数量
  • public Semaphore(int permits, boolean fair)fair 表明公正性,假如设为 true,表明是公正,那么等候最久的线程先履行
  1. 常用API
  • public void acquire():表明一个线程获取1个答应,那么线程答应数量相应削减一个
  • public void release():表明开释1个答应,那么线程答应数量相应会增加
  1. 其他API
  • void acquire(int permits):表明一个线程获取n个答应,这个数量由参数permits决议
  • void release(int permits):表明一个线程开释n个答应,这个数量由参数permits决议
  • int availablePermits():回来当时信号量线程答应数量
  • int getQueueLength(): 回来等候获取答应的线程数的预估值

根本运用

public static void main(String[] args) {
        // 1. 创建 semaphore 目标
        Semaphore semaphore = new Semaphore(2);
        // 2. 10个线程一起运转
        for (int i = 0; i < 8; i++) {
            new Thread(() -> {
                // 3. 获取答应
                try {
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    log.debug("running...");
                    sleep(1);
                    log.debug("end...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // 4. 开释答应
                    semaphore.release();
                }
            }).start();
        }
    }

运转结果:

【JUC】信号量Semaphore详解

原理介绍

【JUC】信号量Semaphore详解

上面是Semaphore的类结构图,其间FairSyncNonfairSync是它的内部类,他们一起继承了AQS类,AQS的同享形式供给了Semaphore的加锁、解锁。

假如对AQS不了解的请移步浅显易懂了解Java并发AQS的同享锁形式

为了更好的搞懂原理,咱们经过一个例子来协助咱们了解。

假定Semaphorepermits为 3,这时 5 个线程来获取资源,其间Thread-1Thread-2Thread-4CAS 竞赛成功,permits 变为 0,而 Thread-0 Thread-3 竞赛失利。

【JUC】信号量Semaphore详解

获取答应acquire()

  • acquire()主办法会调用 sync.acquireSharedInterruptibly(1)办法
  • acquireSharedInterruptibly()办法会先调用tryAcquireShared()办法回来答应的数量,假如小于0个,调用doAcquireSharedInterruptibly()办法进入堵塞
// acquire() -> sync.acquireSharedInterruptibly(1),可中断
public final void acquireSharedInterruptibly(int arg) {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 测验获取通行证,获取成功回来 >= 0的值
    if (tryAcquireShared(arg) < 0)
        // 获取答应证失利,进入堵塞
        doAcquireSharedInterruptibly(arg);
}
  • tryAcquireShared()办法在终会调用到Sync#nonfairTryAcquireShared()办法
  • nonfairTryAcquireShared()办法中会减去获取的答应数量,回来剩下的答应数量
// tryAcquireShared() -> nonfairTryAcquireShared()
// 非公正,公正锁会在循环内 hasQueuedPredecessors()办法判别堵塞行列是否有临头节点(第二个节点)
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        // 获取 state ,state 这里【表明通行证】
        int available = getState();
        // 核算当时线程获取通行证完成之后,通行证还剩下数量
        int remaining = available - acquires;
        // 假如答应已经用完, 回来负数, 表明获取失利,
        if (remaining < 0 ||
            // 答应证足够分配的,假如 cas 重试成功, 回来正数, 表明获取成功
            compareAndSetState(available, remaining))
            return remaining;
    }
}
  • 假如剩下的答应数量<0, 会调用doAcquireSharedInterruptibly()办法将当时线程加入到堵塞行列中堵塞
  • 办法中调用parkAndCheckInterrupt()堵塞当时线程
private void doAcquireSharedInterruptibly(int arg) {
    // 将调用 Semaphore.aquire 办法的线程,包装成 node 加入到 AQS 的堵塞行列中
    final Node node = addWaiter(Node.SHARED);
    // 获取标记
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            // 前驱节点是头节点能够再次获取答应
            if (p == head) {
                // 再次测验获取答应,【回来剩下的答应证数量】
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 成功后本线程出队(AQS), 所在 Node设置为 head
                    // r 表明【可用资源数】, 为 0 则不会继续传播
                    setHeadAndPropagate(node, r); 
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 堵塞
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        // 被打断后进入该逻辑
        if (failed)
            cancelAcquire(node);
    }
}

终究的AQS状态如下图所示:

  • Thread-1Thread-2Thread-4正常运转

  • AQS的state也便是等于0

  • Thread-0Thread-3再堵塞行列中

【JUC】信号量Semaphore详解

开释答应release()

现在Thread-4运转结束,要开释答应,Thread-0Thread-3又是怎么康复履行的呢?

  • 调用release()办法开释答应,终究调用 Sync#releaseShared()办法
  • 假如办法tryReleaseShared(arg)测验开释答应成功,那么调用doReleaseShared();进行唤醒
// release() -> releaseShared()
public final boolean releaseShared(int arg) {
    // 测验开释锁
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }    
    return false;
}
  • tryReleaseShared()办法主要是测验开释答应
  • 获取当时答应数量 + 开释的数量,然后经过cas设置回去
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // 获取当时锁资源的可用答应证数量
        int current = getState();
        int next = current + releases;
        // 索引越界判别
        if (next < current)            
            throw new Error("Maximum permit count exceeded");        
        // 开释锁
        if (compareAndSetState(current, next))            
            return true;    
    }
}
  • 调用doReleaseShared()办法唤醒行列中的线程
  • 其间unparkSuccessor()办法是唤醒的核心操作
// 唤醒
private void doReleaseShared() {
    // 假如 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark	
    // 假如 head.waitStatus == 0 ==> Node.PROPAGATE    
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                // 防止 unparkSuccessor 被屡次履行
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                // 唤醒后继节点
                unparkSuccessor(h);
            }
            // 假如已经是 0 了,改为 -3,用来解决传播性
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)
            break;
    }
}

终究AQS状态如下图所示:

【JUC】信号量Semaphore详解

  • 答应state变回1
  • 然后Thread-0开始竞赛,假如竞赛成功,如下图所示:

【JUC】信号量Semaphore详解

  • 因为Thread-0竞赛成功,再次获取到答应,答应数量减1,终究又变回0
  • 然后等候行列中剩下Thread-3

总结

Semaphore信号量类根据AQS的同享锁完成,有公正锁和非公正锁两个版别,它用来约束能一起拜访同享资源的线程上限,典型的使用场景是能够用来维护有限的公共资源,比方数据库衔接等。

假如本文对你有协助的话,请留下一个赞吧

本文正在参与「金石计划 . 分割6万现金大奖」