导读

咱们日常开发中,经常会碰到并发的场景,在Java 中言语系统里,咱们会想到 ReentrantLock、CountDownLatch、Semaphore 等东西,但你是否清楚它们内部的完成原理?这些东西都很类似,底层都是根据AbstractQueuedSynchronizer(AQS)来完成的。今日咱们就来一起学习 AQS 内部原理。俗话说知己知彼攻无不克,假如咱们了解了其中的原理,用该类东西开发就能够做到事半功倍。

文|夏福利 网易智企资深开发工程师

一、AQS 履行框架

下图是 AQS 大体履行框架:

AQS 源码流程分析

AQS 源码流程分析

经过上面这张图能够了解到 AQS 大约的履行进程。ReentrantLock、CountDownLatch、Semaphore 都是在这个流程上封装的。

拿 ReentrantLock 来说,ReentrantLock 是独占锁, 能够是公平锁,也能够对错公平锁, 经过这张图能够很好理解了。

ReentrantLock 是独占锁体现在同一时间只需一个线程能够测验获取资源成功, 其他获取失利的都会参加堵塞行列的队尾进行排队。

公平锁便是线程严厉按照堵塞行列的排列次序获取资源,先到先得,不得插队。如下图所示:

AQS 源码流程分析

AQS 源码流程分析

AQS 源码流程分析

而非公平锁就或许存在插队的或许。 例如,假如上面头节点被唤醒,正准备测验获取资源,这时来了一个线程也测验获取资源,有或许新来的线程获取资源成功,而头结点获取资源失利。这就对错公平锁。

在 ReentrantLock 源码中能够看到非公平锁会测验获取资源时,不会考虑堵塞行列是否为空, 假如能够获取资源成功则直接占用了资源。获取失利才会参加堵塞行列。代码如下:

final boolean nonfairTryAcquire(int acquires) {    final Thread current = Thread.currentThread();    int c = getState();    if (c == 0) {        if (compareAndSetState(0, acquires)) {            setExclusiveOwnerThread(current);            return true;        }    }    else if (current == getExclusiveOwnerThread()) {        int nextc = c + acquires;        if (nextc < 0) // overflow            throw new Error("Maximum lock count exceeded");        setState(nextc);        return true;    }    return false;}

而关于公平锁测验获取资源时,会判断堵塞行列是否为空(和非公平锁要害差别所在),如下:

static final class FairSync extends Sync {    private static final long serialVersionUID = -3000897897090466540L;
    final void lock() {        acquire(1);    }
    /**     * Fair version of tryAcquire.  Don't grant access unless     * recursive call or no waiters or is first.     */    @ReservedStackAccess    protected final boolean tryAcquire(int acquires) {        final Thread current = Thread.currentThread();        int c = getState();        if (c == 0) {            if (!hasQueuedPredecessors() &&                compareAndSetState(0, acquires)) {                setExclusiveOwnerThread(current);                return true;            }        }        else if (current == getExclusiveOwnerThread()) {            int nextc = c + acquires;            if (nextc < 0)                throw new Error("Maximum lock count exceeded");            setState(nextc);            return true;        }        return false;    }}

AQS 源码流程分析

二、AQS 完成原理详解

AQS 从姓名上就知道是抽象类,经过模板办法界说了上面那张图的流程。关于“资源占用”和“资源开释”的界说,则是交给详细的子类去界说去完成。

关于同享锁,子类需求完成如下两个办法:

protected int tryAcquireShared(int arg)protected boolean tryReleaseShared(int arg)
  • tryAcquireShared:同享办法。arg 为获取锁的次数, 测验获取资源;回来值为:

    负数表明失利;

    0表明成功,但没有剩下可用资源;

    正数表明成功,且有剩下资源;

  • tryReleaseShared:同享办法。arg 为开释锁的次数, 测验开释资源,假如开释后答应唤醒后续等候结点回来 True,否则回来 False;

而关于独占锁,子类需求完成三个办法:

protected boolean tryAcquire(int arg)protected boolean tryRelease(int arg)protected boolean isHeldExclusively()
  • isHeldExclusively: 该线程是否正在独占资源。只需用到 Condition 才需求去完成它;
  • tryAcquire: 独占办法。arg 为获取锁的次数,测验获取资源,成功则回来 True,失利则回来 False;
  • tryRelease: 独占办法。arg 为开释锁的次数,测验开释资源,成功则回来 True,失利则回来 False;

获取资源

首要,咱们看一下独占锁获取资源的进程。

在 AQS 中,独占锁的获取资源的中心代码如下:

public final void acquire(int arg) {    // 当 tryAcquire 回来 true 就阐明获取到锁了,直接完毕。    // 反之,回来 false 的话,就需求履行后边的办法。    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}

假如子类的 tryAcquire 回来 true, 则表明获取锁成功,直接完毕。

只需子类的 tryAcquire 办法回来 false,那么就阐明获取锁失利,就需求将自己参加行列。

private Node addWaiter(Node mode) {    // 创立一个独占类型的节点    Node node = new Node(Thread.currentThread(), mode);    // Try the fast path of enq; backup to full enq on failure    Node pred = tail;    // 假如 tail 节点不是 null,就将新节点的 pred 节点设置为 tail 节点。    // 并且将新节点设置成 tail 节点。    if (pred != null) {        node.prev = pred;        if (compareAndSetTail(pred, node)) {            pred.next = node;            return node;        }    }    // 假如 tail 节点是  null,或许 CAS 设置 tail 失利。    // 在 enq 办法中处理    enq(node);    return node;}

假如 tail 节点为 null, 或许 CAS 设置 tail 失利,则经过自旋的办法参加尾结点。

private Node enq(final Node node) {    for (;;) {        Node t = tail;        // 假如 tail 是 null,就创立一个虚拟节点,一起指向 headtail,称为 初始化if (t == null) { // Must initialize            if (compareAndSetHead(new Node()))                tail = head;        } else {// 假如不是 null            // 和 上个办法逻辑一样,将新节点追加到 tail 节点后边,并更新行列的 tail 为新节点。            // 只不过这儿是死循环的,失利了还能够再来 。            node.prev = t;            if (compareAndSetTail(t, node)) {                t.next = node;                return t;            }        }    }}

enq 办法的逻辑是什么呢?当 tail 是 null(没有初始化行列),就需求初始化行列了。CAS 设置 tail 失利,也会走这儿,需求在 enq 办法中循环设置 tail。直到成功。

上面的进程用一张图表明如下:

AQS 源码流程分析

将自己参加到堵塞行列后(留意 addWaiter 办法回来的是当时节点),履行 acquireQueued() 办法,将当时节点对应的线程挂起,源码如下:

// 这儿回来的节点是新创立的节点,arg 是请求的数量final boolean acquireQueued(final Node node, int arg) {    boolean failed = true;    try {        boolean interrupted = false;        for (;;) {            // 找上一个节点            final Node p = node.predecessor();            // 假如上一个节点是 head ,就测验获取锁            // 假如 获取成功,就将当时节点设置为 head,留意 head 节点是永久不会唤醒的。            if (p == head && tryAcquire(arg)) {                setHead(node);                p.next = null; // help GC                failed = false;                return interrupted;            }            // 在获取锁失利后,就需求堵塞了。            // shouldParkAfterFailedAcquire ---> 检查上一个节点的状况,假如是 SIGNAL 就堵塞,否则就改成 SIGNAL。            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}

这个办法有两个逻辑:

  • 怎么将自己挂起?
  • 被唤醒之后做什么?

先回答第二个问题:被唤醒之后做什么?

测验拿锁,成功之后,将自己设置为 head,断开和 next 的连接。

再看第一个问题:怎么将自己挂起?

详细逻辑在 shouldParkAfterFailedAcquire 办法中:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {    int ws = pred.waitStatus;    //  假如他的上一个节点的 ws 是 SIGNAL,他就需求堵塞。    if (ws == Node.SIGNAL)        // 堵塞        return true;    // 上一任被撤销。越过上一任并重试。    if (ws > 0) {        do {            // 将上一任的上一任 赋值给 当时的上一任            node.prev = pred = pred.prev;        } while (pred.waitStatus > 0);        // 将上一任的上一任的 next 赋值为 当时节点        pred.next = node;    } else {         // 假如没有撤销 || 0 || CONDITION || PROPAGATE,那么就将上一任的 ws 设置成 SIGNAL.        // 为什么有必要是 SIGNAL 呢?        // 答:期望自己的上一个节点在开释锁的时分,告知自己(让自己获取锁)        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);    }    // 重来    return false;}

该办法的首要逻辑便是将前置节点的状况修改成 SIGNAL,告知他:你开释锁的时分记住唤醒我。其中假如前置节点被撤销了,就越过他。那么在前置节点开释锁的时分,必定会唤醒这个节点。

上面是独占锁获取资源进程,同享锁获取资源的进程类似,会有略微的不同,中心代码如下:

private void doAcquireShared(int arg) {        // 将自己参加堵塞行列        final Node node = addWaiter(Node.SHARED);        boolean failed = true;        try {            boolean interrupted = false;            for (;;) {                // 将自己挂起前,测验再次获取锁,假如获取成功,                则将自己设置为头结点,并告知唤醒下一节点                final Node p = node.predecessor();                if (p == head) {                    int r = tryAcquireShared(arg);                    if (r >= 0) {                        // 这儿是和独占锁有差异的当地。这儿不但会将自己设置为头结点,                        而且会唤醒下一个节点,经过这种办法将一切等候同享锁的节点唤醒                        setHeadAndPropagate(node, r);                        p.next = null; // help GC                        if (interrupted)                            selfInterrupt();                        failed = false;                        return;                    }                }                                // 获取锁失利,则挂起。同独占锁逻辑                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true;            }        } finally {            if (failed)                cancelAcquire(node);        }    }

这个办法同样是包括两个逻辑:

  • 怎么将自己挂起?
  • 被唤醒之后做什么?

怎么将自己挂起和独占锁没有差异。唤醒之后做什么是和独占锁差异的要害: 假如当时节点唤醒后获取到了锁后,会唤醒下一个节点。下一个节点唤醒后会继续唤醒下下一个节点,然后将一切等候同享锁的线程唤醒。中心代码如下:

private void setHeadAndPropagate(Node node, int propagate) {        Node h = head; // Record old head for check below        // 将自己设置为头结点        setHead(node);        if (propagate > 0 || h == null || h.waitStatus < 0 ||            (h = head) == null || h.waitStatus < 0) {            Node s = node.next;            if (s == null || s.isShared())                // 唤醒下一个节点                doReleaseShared();        }    }

开释资源

上面讲了获取资源的逻辑,那怎么开释资源呢?

同样,还是先看一下独占锁的开释逻辑:

public final boolean release(int arg) {    if (tryRelease(arg)) {        Node h = head;        // 一切的节点在将自己挂起之前,都会将前置节点设置成 SIGNAL,期望前置节点开释的时分,唤醒自己。        // 假如前置节点是 0 ,阐明前置节点现已开释过了。不能重复开释了,后边将会看到开释后会将 ws 修改成0.        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        return true;    }    return false;}

从这个办法的判断就能够看出,head 有必要不等于 0。为什么呢?上面获取资源进程中提到:当一个节点测验挂起自己之前,都会将前置节点设置成 SIGNAL -1,就算是第一个参加行列的节点,在获取锁失利后,也会将初始化节点设置的 ws 设置成 SIGNAL。

而这个判断也是避免多线程重复开释,那么在开释锁之后,必定会将 ws 状况设置成 0。避免重复操作。代码如下:

private void unparkSuccessor(Node node) {    int ws = node.waitStatus;    if (ws < 0)        // 将 head 节点的 ws 改成 0,铲除信号。表明,他现已开释过了。不能重复开释。        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;    // 假如 nextnull,或许 next 被撤销了。就从 tail 开端向上找节点。    if (s == null || s.waitStatus > 0) {        s = null;        // 从尾部开端,向前寻觅最靠前的那个未被撤销的节点        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus <= 0)                s = t;    }    // 唤醒这个节点。    if (s != null)        LockSupport.unpark(s.thread);}

唤醒之后的逻辑是什么样子的还记住吗?在上面解说资源获取时有讲到:

线程唤醒后测验拿锁,拿锁成功则设置自己为 head,断开上一任 head 和自己的连接。

final boolean acquireQueued(final Node node, long arg) {    boolean failed = true;    try {        boolean interrupted = false;        //唤醒之后再次进行for循环,测验获取锁,获取成功则将自己设置为头结点        for (;;) {            final Node p = node.predecessor();            if (p == head && tryAcquire(arg)) {                setHead(node);                p.next = null; // help GC                failed = false;                return interrupted;            }            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    }

再来看一下同享锁的开释逻辑,代码如下:

public final boolean releaseShared(int arg) {        if (tryReleaseShared(arg)) {            doReleaseShared();            return true;        }        return false;    }

doReleaseShared() 代码如下:

private void doReleaseShared() {        for (;;) {            Node h = head;            if (h != null && h != tail) {                int ws = h.waitStatus;                if (ws == Node.SIGNAL) {                // 将 head 节点的 ws 改成 0,铲除信号。表明,他现已开释过了。不能重复开释。                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                        continue;            // loop to recheck cases                                            // 唤醒下一个节点                    unparkSuccessor(h);                }                else if (ws == 0 &&                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                    continue;                // loop on failed CAS            }            if (h == head)                   // loop if head changed                break;        }    }

同样,唤醒后做什么呢?

线程唤醒后测验拿锁,拿锁成功则设置自己为 head,断开上一任 head 和自己的连接, 并唤醒下一个节点,代码如下:

private void doAcquireShared(long arg) {    final Node node = addWaiter(Node.SHARED);    boolean failed = true;    try {        boolean interrupted = false;        //唤醒后再次for循环,测验获取锁,获取锁成功,则设置自己为头结点,        //并唤醒下一个结点,然后一向传播下去,将一切等候同享锁的线程唤醒        for (;;) {            final Node p = node.predecessor();            if (p == head) {                long r = tryAcquireShared(arg);                if (r >= 0) {                    setHeadAndPropagate(node, r);                    p.next = null; // help GC                    if (interrupted)                        selfInterrupt();                    failed = false;                    return;                }            }            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    }}

为了证明同享锁唤醒时是一个接一个被唤醒,咱们用一个 demo 来验证下,示例代码如下:

CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread t1 = new Thread(() -> {            try {                TimeUnit.SECONDS.sleep(1);                countDownLatch.await();                System.out.println("线程1被唤醒了");            } catch (InterruptedException e) {                e.printStackTrace();            }        });
        Thread t2 = new Thread(() -> {            try {                TimeUnit.SECONDS.sleep(2);                countDownLatch.await();                System.out.println("线程2被唤醒了");            } catch (InterruptedException e) {                e.printStackTrace();            }        });
        Thread t3 = new Thread(() -> {            try {                TimeUnit.SECONDS.sleep(3);                countDownLatch.await();                System.out.println("线程3被唤醒了");            } catch (InterruptedException e) {                e.printStackTrace();            }        });
        t1.start();        t2.start();        t3.start();
        TimeUnit.SECONDS.sleep(4);        countDownLatch.countDown();    }

上面示例代码中,线程 1、2、3 按次序参加堵塞行列,当主线程调用 countDown() 时,此时会唤醒线程 1,线程 1 唤醒后会唤醒线程 2,线程 2 会唤醒线程 3。从运转结果能够看出确实如此:

线程1被唤醒了线程2被唤醒了线程3被唤醒了

然后证明了上面的揣度。

AQS 源码流程分析

三、总结

独占锁和同享锁在获取资源失利时,都会将自己参加堵塞行列的尾部,并将前一节点的 ws 设置为 SINGAL,告知他:开释锁的时分记住唤醒我。

独占锁和同享锁的不同之处在于:节点被唤醒后,独占锁线程不会唤醒下一个节点(要唤醒有必要自动开释锁,比方使用 ReentrantLock 最终要调用 release() 办法自动开释锁)。

而关于同享锁来说,只需一个节点被唤醒了,那就会继续唤醒下一个节点,下一个节点又会去唤醒下下一个节点,然后将一切等候同享锁的线程唤醒。

作者介绍

夏福利,网易智企资深开发工程师,首要负责网易七鱼在线智能客服的研制