导言

AQS (AbstractQueuedSynchronizer) 是一个供给了锁和同步器根底功用的结构。

本文测验经过以 ReentrantLock 为比如,去介绍 AQS 这个结构是怎么从结构的角度上,去辅佐完成同步机制的。

从源码的角度上,侧重介绍 AQS 中的 acquire 模版办法里边涉及到的整个流程。

这儿并不会去过分去介绍 ReentrantLock 的特性,仅仅一个辅佐解说 AQS 的引子。

为什么我会挑选 acquire 这个办法捏?

    static final class NonfairSync extends Sync {
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
    }
    // 公正锁
    static final class FairSync extends Sync {
        final void lock() {
            acquire(1);
        }
    }

咱们能够看到 ReentrantLock,无论是 公正锁还是非公正锁,里边都会出现这个 acquire 办法的调用。

由此可见,里边很有或许就包含着咱们想要知道的全部。

注意:本文是依据 JDK8 分析的,在高版本中源码的写法或许不一样。

acquire

public final void acquire(long arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire 办法中,咱们能够看到 首要的中心办法有:

  • tryAcquire
  • addWaiter
  • acquireQueued

下面就带着大家一同分析这三个办法到底在整个流程是干什么的。

tryAcquire

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

AQS 中,tryAcquire 办法是直接抛反常。

相信读者也能从许多结构源码中看到很多类似的,一个父类办法写的是直接一个抛反常。

这样的写法结构的作者通常是考虑:

  • 不需求一切的子类都完成。
  • 假如有的子类没有自己完成就调用了,那么我应该直接给他抛反常,完成 fail-fast 机制。

快去复习一下 fail-fastfail-save 两种机制!

addWaiter(Node.EXCLUSIVE)

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

测验增加 Node 节点到 tail 尾部

假如队列为空,才会去走 enq 办法。

源码注释中提到了 fast path 这个词,addwaiter 这儿考虑的状况是 尾节点 tail 不为空的状况。

  • 假如 tail 不为空,那么中心逻辑就只存在 addWaiter 这儿。
  • 假如 tail 为空,那么中心逻辑会去走 enqenq 里边评论了 空 和 非空 两种状况。

这儿仅仅一个 “偷鸡”fast path 的测验。

enq

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                // 队尾为 null 则经过 CAS 把自己增加到队尾
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 发现不为空表明这儿已经出现并发竞赛,有一个线程成功把他加入到队尾了
                // 那么我这个线程就只能在他后边加入了
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

acquireQueue

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 自旋
            for (;;) {
                final Node p = node.predecessor();
                // 只要前驱节点是 head 才干再测验 tryAcquire 去获取锁
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 上面测验获取锁失败了走到这儿,决定当时线程是否该堵塞
                // p 和 前驱节点,也便是依据 前驱节点 判别 当时线程是否堵塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

CLH 锁机制

这儿咱们能够看到只要 前驱节点 是 head,才有资历进行 tryAcquire 去获取锁,并不是一切在同步队列的线程,都有机会去获取锁。AQS 所选用的这种机制,其实是 CLH 锁的一种变种。

CLH 锁是一种简略高效的 自旋锁,得名于其发明者 Craig, LandinHagersten

它依据 链表,其每个节点代表一个企图获取锁的线程。

CLH 锁中,线程不会直接对锁进行自旋等候,而是在其前驱节点的一个副本上自旋,这样减少了对共享变量的访问,然后降低了体系的总线和缓存压力。

要说 CLH 好,那么它好在哪里?它又处理了什么问题?

  • 在没有 CLH 的时分,10个线程过来,经过自旋去获取锁。

  • 有了 CLH,10个线程过来,只要1个线程会自旋去获取锁。

自旋的成本,就能够大大的降下来了。所以咱们会说 CLH 是一种 高效的自旋锁

那么就会有人会问:保护竞赛的线程,addWaiter 也是经过 CAS 将线程增加到队尾保护的呀,不也是自旋嘛?

好好好,当你注意到这点的时分,我只能说你很细,可是还不够!

需求注意的是在 addWaiter 自旋,它的竞赛目标是 前驱线程目标,由于我要把自己加到前驱目标的后边。

可是另外一个是详细的一个 共享资源目标,这两者的 竞赛压力是彻底不一样的

而且当我 CLH 线程排好队了之后,以后的竞赛压力就大大减少,性能就愈加高效,所以咱们说他是一个好东西。

CLH 是天然的公正锁,非公正锁怎么办?

在前面咱们已经知道了 CLH 是一种 公正锁 的机制。AQS 作为一种通用的同步结构,那是不是意味着一切的子类都是公正锁呢?那么咱们为什么说 ReentrantLock 有公正和非公正两种形式。

直接上源码!

ReentrantLock的 公正形式 是经过 里边的 sync 类型来完成的:

  • 公正锁FairSync
  • 非公正锁NonfairSync
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    public void lock() {
        sync.lock();
    }

咱们再去看两者有啥区别:

    // 非公正锁
    static final class NonfairSync extends Sync {
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
    }
    // 公正锁
    static final class FairSync extends Sync {
        final void lock() {
            acquire(1);
        }
    }

咱们能够很清楚地看到,非公正锁很 “鸡贼” 地在提前去经过 CAS 就进行获取锁了。

它先不走 acquire 那套模版办法,所以这儿就表现了 非公正锁 的非公正之处。

可是假如这次这个 非公正锁 没有成功获取到锁,那么它也是会被扔到同步队列里边去,让他 乖乖地排队去排队自旋

分布式锁羊群效应

CLH 其实跟分布式锁中处理 羊群效应 非常类似。

羊群效应:在分布式锁体系中,当多个恳求一同到达时,它们或许会构成一种“群集”,或在某个资源(例如数据库或服务)上排队。

ZooKeeper 举比如,当一个持有分布式锁的节点开释锁时,它会在 ZooKeeper 中更新对应的节点状况。这个状况更新会被其他正在等候锁的节点调查到,然后触发它们简直一同去测验获取锁。

Curator 是一个优秀的 ZooKeeper 客户端结构,它是经过 次序节点最小节点调查 的办法去处理这种羊群问题。

原理上跟咱们这儿的 CLH 思想是一致的:我只关心前驱节点,而不是详细的共享资源目标

由此咱们还能够联想到:pushpull 两种负载均衡模型他们各自的特色。

由于该内容并不是本文首要介绍目标,所以这儿就点到为止。

shouldParkAfterFailedAcquire


    /** 
     ** Node 节点状况
     **/
    static final class Node {
        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;
    }
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 前驱节点状况是 SIGNAL 的话,当时线程能够直接挂起
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        // 前驱节点状况 > 0,也便是 CANCELLED,会一直往前去找非 CANCELLED 的节点
        // 把自己的前驱指向它
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
             // 经过 CAS 办法将前驱节点修改为 SIGNAL,让他来通知我当时线程
             // 我当时线程睡着就能够了
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

依据前驱节点的状况,来判别我当时线程是否应该 park,也便是堵塞,前驱节点的取值便是要看 waitStatus

waitStatus 的取值有 4 种,咱们这边只针对以 ReentrantLock 为比如,由于其他的涉及到一些 读写锁 的,样例不够通用,欠好进行阐释。因此咱们这儿只针对 ReentrantLock 举比如,那么 waitStatus 就有:

  • SIGNAL:假如前驱节点是 SIGNAL,当时驱节点 获取到锁开释锁 时,会唤醒我当时节点,因此我当时节点直接挂起即可,让出 CPU 资源。

能够幻想这么一个场景,小明和你一同去问老师问题(锁获取),可是老师每次只能处理一个学生,那么小明跟你说,我先去问问题,等我问完(锁开释),就来告知你(唤醒你),你先睡一会吧(当时线程挂起),这样就会更好懂一点。

  • CANCELLED:假如是前驱节点是 CANCELLED,代表前驱节点是一个 由于中止 Interrupt 而放弃竞赛的线程,那么我这个节点的前驱节点要往前一直修改去 找到前面是有用的节点,也便是 非CANCELLED 的前驱节点。

  • 其他的:关于其他的状况值,会经过 CAS,将前驱节点修改为 SIGNAL,让他能够通知我当时这个节点。

parkAndCheckInterrupt

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

能够看到 AQS 这儿是经过 LockSupport 来堵塞当时线程的。

为什么这儿用的是 LockSupport.park 而不必 Object.wait

LockSupport.park 供给了一种更灵敏、简略且有用的办法来操控线程的堵塞和唤醒。 特别适合于构建杂乱的同步工具和用于高级并发场景。 相比之下,Object.waitnotify 尽管也是有用的同步机制,但在运用上更为杂乱和受限。

不需求持有锁

  • LockSupportLockSupport.park 不需求线程持有任何特定的锁。这意味着它能够在任何上下文中被运用,而无需忧虑死锁或许恪守特定的锁协议。
  • Object.wait:相比之下,Object.wait 必须在同步块(即持有目标锁的范围内)中调用,否则会抛出 IllegalMonitorStateException 反常。
    Object lock = new Object();
    synchronized (lock) {
        // 必须在同步块内调用 wait,确保持有 lock 的锁
        lock.wait();
    }

假如你没有拿到锁,然后你调用了 Object.wait 办法,那么就会抛出 IllegalMonitorStateException 反常。

LockSupport 无需忧虑这个,因此它减少了死锁的风险,简化了一些编码,我并不需求去忧虑我是否当时线程能否拿到锁。

所以说 LockSupport 很适合 同步工具和 一些并发场景。

permit 许可机制:答应先 unpark 然后再 park

  • LockSupportLockSupport 供给了一种“许可”机制,其间 unpark 操作能够预先产生,为后续的 park 操作“积累”一个许可。这种机制使得线程操控愈加精密。
  • Object.wait / notifyObject.wait 依赖于 notifynotifyAll 办法来唤醒等候的线程。假如 waitnotify 之后产生,线程仍然会堵塞,直到另一个 notifynotifyAll 被调用。
    Thread thread = Thread.currentThread();
    // 提前 unpark
    LockSupport.unpark(thread);
    // 后续的 park 会当即回来,由于已经有一个可用的许可
    LockSupport.park();
    System.out.println("Thread continued execution");

简化的运用

  • LockSupportLockSupport 的运用相对简略,不需求与特定的目标或锁关联。
  • Object.wait / notifyObject.waitnotify 必须在特定目标的上下文中运用,而且需求正确处理锁和反常,这使得它们的运用愈加杂乱。

总结

  • AQS 供给了一系列的模版办法 aquiretryaquireaquireQueue,来帮助咱们完成同步机制。
  • AQS 内保护的同步队列中的大量操作,都是经过 CAS 去完成的。
  • AQS 内的 CLH 锁机制供给了一种 高效的自旋锁 完成。

拥有 AQS,咱们就能够经过对 state 的语义不同,亦或许去重写某个办法,然后完成更为灵敏,愈加强大的锁:

  • ReentranLock:可重入锁。
  • ReentrantReadWriteLock:读写锁。
  • CountDownLatch:异步转同步,需求等候的。

下篇文章就讲 AQS 的完成类是怎么完成自己的特性!

源码级 | 浅谈 AQS 同步结构模版办法、CLH 锁机制

来都来了,点个赞再走吧彦祖,这对我来说非常重要!