• 作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,阿里云专家博主
  • 系列专栏:Java设计模式、数据结构和算法、Kafka从入门到成神、Kafka从成神到升仙、Spring从成神到升仙系列
  • 假如感觉博主的文章还不错的话,请三连支撑一下博主哦
  • 博主正在尽力完成2023计划中:以梦为马,扬帆起航,2023追梦人
  • 联系办法:hls1793929520,加我进群,大家一同学习,一同前进,一同对抗互联网隆冬

ConditionObject

一、导言

并发编程在互联网技能运用如此广泛,简直一切的后端技能面试官都要在并发编程的运用和原理方面临小伙伴们进行 360 的刁难。

作为一个在互联网公司面一次拿一次 Offer 的面霸,打败了许多竞争对手,每次都只能看到许多落寞的身影失望的脱离,略感内疚(请答应我运用一下夸大的修辞手法)。

所以在一个寂寞难耐的夜晚,暖男我痛定思痛,决议开端写 《吊打面试官》 系列,希望能协助各位读者以后边试势如破竹,对面试官进行 360 的反击,吊打问你的面试官,让一同面试的同僚张口结舌,张狂收割大厂 Offer

虽然现在是互联网隆冬,但天地不决,你我皆是黑马

二、运用

咱们上篇文章剖析了 ReentrantLocklockunLock 办法,详细可见:ReentrantLock

咱们知道,关于 synchronized 来说,拥有 waitnotify 办法,可暂停和唤醒线程,详细可见:synchronized

作为 synchronized 的竞争对手,AQS 必定也提供了此功用,咱们一同来看看 AQS 中的运用

这儿吐槽一句:这个唤醒的流程,AQSsynchronized 有点神似

public class ConditionObjectTest {
    public static void main(String[] args) throws Exception{
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        new Thread(() -> {
            lock.lock();
            System.out.println("子线程获取锁资源并await挂起线程");
            try {
                Thread.sleep(5000);
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("子线程挂起后被唤醒!持有锁资源");
        }).start();
        Thread.sleep(100);
        // =================main======================
        lock.lock();
        System.out.println("主线程等候5s拿到锁资源,子线程履行了await办法");
        condition.signal();
        System.out.println("主线程唤醒了await挂起的子线程");
        lock.unlock();
    }
}

咱们运行上述代码,能够发现履行过程如下:

子线程获取锁资源并await挂起线程
主线程等候5s拿到锁资源,子线程履行了await办法
主线程唤醒了await挂起的子线程
子线程挂起后被唤醒!持有锁资源

咱们简略的说一下过程,详细的咱们后边源码会讲到:

  • 首要,咱们的子线程履行 lock.lock() 办法获取锁资源,将 AQS 中的 state0 修正为 1
  • 咱们的主线程履行 lock.lock() 办法,发觉当时的 state1,封装成 Node 节点放至 AQS 行列中,随后 park 挂起;
  • 当子线程履行 condition.await() 办法时,将该线程封装成 Node 扔到 Condition行列 中并放弃锁资源。咱们的主线程被唤醒且将 state0 修正为 1,拿到锁资源;
  • 主线程履行 condition.signal() 将咱们的子线程让 ConditionObject 里边扔到 AQS 里边,等候被被唤醒;
  • 主线程履行 lock.unlock() 办法让出锁资源,唤醒子线程履行后续的事务逻辑;

三、源码

1、newCondition

首要肯定是咱们 Condition 的结构办法了,咱们主要是通过 lock.newCondition() 来获取,该办法是不区别公正锁、非公正锁的

public Condition newCondition() {
    return sync.newCondition();
}
final ConditionObject newCondition() {
    return new ConditionObject();
}

这儿咱们能够看到,朴实无华的 new 了一个 ConditionObject 回来,咱们看下 ConditionObject 里边的参数

public class ConditionObject implements Condition {
    // 头节点
    private transient Node firstWaiter;
    // 尾结点
    private transient Node lastWaiter;
}

咱们看到这儿,或许感觉和咱们上一篇 AQS 行列中的双向链表差不多,但要记住:这儿是一个单向链表,他的指针Node nextWaiter 并非 prevnext

2、await-挂起前的操作

咱们在讲 await 办法时,会分两部分讲:

  • 榜首部分:咱们履行 await 办法直到 park

  • 第二部分:unpark 后续的操作

public final void await() throws InterruptedException {
    // 判断当时线程是不是处于中止,假如是中止,则抛出反常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 干掉一切标识非CONDITION(-2)的节点并将该线程封装成Node节点放到Condition行列中
    Node node = addConditionWaiter();
    // 开释当时的锁资源并唤醒AQS行列中的榜首个节点(虚拟头节点的下一个)
    long savedState = fullyRelease(node);
    int interruptMode = 0;
    // isOnSyncQueue:检测当时的节点是不是在AQS行列中、true(在AQS行列中)/false(不在AQS行列中)
    while (!isOnSyncQueue(node)) {
        // 节点不在AQS直接挂起当时线程
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

2.1 addConditionWaiter

  • 删去 Condition行列 一切标识非 CONDITION(-2) 的节点

  • 将该线程封装成 Node 节点放到 Condition 行列中

private Node addConditionWaiter() {
    // 引用指向尾节点
    Node t = lastWaiter;
    // 假如当时的尾节点不等于null && 尾节点的标识不等于CONDITION(-2)
    // 证明咱们当时的尾节点是有问题的
    // 因为你只要在Condition行列中,只要CONDITION(-2)是有用的
    if (t != null && t.waitStatus != Node.CONDITION()) {
        // 删去非CONDITION(-2)的节点
        unlinkCancelledWaiters();
        // 最终重新赋值一下
        t = lastWaiter;
    }
    // 将当时线程封装成Node节点,标识为CONDITION(-2)
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 假如为null,阐明Condition行列为空,头尾指针都指向当时节点即可
    if (t == null)
        firstWaiter = node;
    else
        // 将最终的指向当时节点
        t.nextWaiter = node;
	// 尾指针指向当时节点
    lastWaiter = node;
    // 完成刺进并回来
    return node;
}
// 遍历当时的Condition行列,删去掉那些标识不为CONDITION(-2)的节点
// 这段代码逻辑有点绕,不熟悉链表的同学建议能够直接不看了,记住其功用就能够了
// 关键是用三个引用来删去链表,有兴趣的同学能够自己画一下流程
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

2.2 fullyRelease

  • 判断当时的线程是否是持有锁的线程,假如不是则抛出反常
  • 假如当时线程是持有锁的线程,则一次性开释掉一切的锁资源(可重入一次性开释)并将持有锁线程置为 null
  • 假如上述操作出现反常,则将当时节点置为报废节点(CANCELLED),后续进行铲除
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 拿到当时的state
        int savedState = getState();
        // 开释当时的锁资源并唤醒AQS行列中的榜首个节点
        if (release(savedState)) {
            // 没有失利,直接回来即可
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        // 失利了
        if (failed)
            // 这个节点报废了,置为1,后续直接铲除去
            node.waitStatus = Node.CANCELLED;
    }
}
// 这个办法咱们上篇文章中讲过
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        // 假如能够放弃锁
        Node h = head;
        // 直接唤醒AQS行列里边榜首个节点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
// Step1:检测当时线程是否是占用锁的线程,不是则抛出反常
// Step2:假如是占用锁的线程,将state置为0并将占用锁的线程置为null
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

2.3 isOnSyncQueue

  • 检测当时的节点是不是在AQS行列中
final boolean isOnSyncQueue(Node node) {
    // 假如这个节点是CONDITION或者前继节点为null,那肯定是Condition行列
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 假如他的next指针不为空,证明这哥们一定在AQS中(因为Condition行列是用nextWaiter衔接的)
    if (node.next != null) 
        return true;
    // 暴力查询
    return findNodeFromTail(node);
}
// 朴实无华在AQS中遍历寻觅
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

2.4 问题考查

咱们在上面能够看到这一段代码:

// isOnSyncQueue:检测当时的节点是不是在AQS行列中
// true:在AQS行列中
// false:不在AQS行列中
while (!isOnSyncQueue(node)) {
    // 节点不在AQS直接挂起当时线程
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0){
         break;
    }
}

这时候咱们或许会有一个疑问,咱们上面分明已经把当时线程给封装成 Node 放到 Condition行列 里边了,这儿为什么还要判断其有没有在 AQS 行列中呢?

这儿考虑到另外一个原因,因为咱们在 封装成 Node 放到 Condition行列 里边 到 LockSupport.park(this) 这个外围的判断,这段时间有或许咱们当时的线程被其他线程履行 signal 办法直接唤醒了,这样咱们当时节点已经不会在 Condition行列 中了。

那么咱们这儿挂起之后该线程已经停止了,咱们去剖析 signal 唤醒办法

3、signal

  • 判断其是不是持有锁的线程,假如不是抛出反常
  • 将节点从 Condition行列 中删去掉而且放入到 AQS 行列中,等候唤醒
public final void signal() {
    // 当时线程是不是持有锁的线程,不是则抛出反常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 设置一个引用
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

3.1 isHeldExclusively

  • 当时线程是不是持有锁的线程
protected final boolean isHeldExclusively() {
    return getExclusiveOwnerThread() == Thread.currentThread();
}

3.2 doSignal

  • 把头节点直接删去掉而且将其状态修正为0,放入到 AQS 行列中
  • 假如当时头节点修正标识失利的话,则去修正 Condition行列 中的下一个节点
  • 假如放入到 AQS 行列中的该节点的前继节点无效,则需求当即唤醒该节点,去铲除无效的节点
private void doSignal(Node first) {
    do {
        // 假如这个条件能够建立的话,阐明当时的Condition行列只要一个数据
        // 直接置空,唤醒即可
        if ( (firstWaiter = first.nextWaiter) == null){
            lastWaiter = null;
        }
        // 假如有多个的话,把榜首个头节点给删去掉
        first.nextWaiter = null;
        // 这儿假如回来true的话,则退出循环
        // 假如当时节点修正标识失利之后,需求履行后边的`first = firstWaiter`,相当于唤醒后边的节点
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
// 入参:node(Condition行列中的榜首个节点)
final boolean transferForSignal(Node node()) {
    // 测验将当时的标识从CONDITION(-2)修正为0,为放入AQS行列做准备
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    // 将该节点放入到AQS行列中,这儿的p是前继节点
    Node p = enq(node);
    // 拿到当时的标识
    int ws = p.waitStatus;
    // 这一段if语句主要是做了兼容处理
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 假如这儿能够进来,那么只要两个状况
        // ws > 0:证明当时是无效的节点,那么我排在后边的节点或许永远都不会唤醒,那么我不行呀,我得当即唤醒该节点
        // 		   唤醒之后,履行咱们的acquireQueued.shouldParkAfterFailedAcquire办法,铲除一切的无效节点并挂起
        // CAS失利:假如前面节点正常,但是咱们CAS将其修正为SIGNAL失利了,阐明前继节点有问题,和上面类似,需求重新唤醒该节点
        LockSupport.unpark(node.thread);
    return true;
}

4、await-唤醒后的操作

  • 唤醒之后会判断唤醒的办法,这儿不需求纠结
  • 保证该节点在 AQS 行列中,取出 AQS 行列中的榜首个节点获取锁资源,假如不是榜首个节点则挂起
public final void await() throws InterruptedException {
    // 判断当时线程是不是处于中止,假如是中止,则抛出反常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 干掉一切标识非CONDITION(-2)的节点并将该线程封装成Node节点放到Condition行列中
    Node node = addConditionWaiter();
    // 开释当时的锁资源并唤醒AQS行列中的榜首个节点(虚拟头节点的下一个)
    long savedState = fullyRelease(node);
    int interruptMode = 0;
    // isOnSyncQueue:检测当时的节点是不是在AQS行列中、true(在AQS行列中)/false(不在AQS行列中)
    while (!isOnSyncQueue(node)) {
        // 节点不在AQS直接挂起当时线程
        LockSupport.park(this);
        // 假如线程履行到这,阐明现在被唤醒了。
        // 线程能够被signal唤醒。(假如是signal唤醒,能够承认线程已经在AQS行列中)
        // 线程能够被interrupt唤醒,线程被唤醒后,没有在AQS行列中。
        // 假如线程先被signal唤醒,然后线程中止了。。。。(做一些额外处理)
        // checkInterruptWhileWaiting能够承认当时中如何唤醒的。
        // 回来的值,有三种
        // 0:正常signal唤醒,没其他事(不知道Node是否在AQS行列)
        // THROW_IE(-1):中止唤醒,而且能够保证在AQS行列
        // REINTERRUPT(1):signal唤醒,但是线程被中止了,而且能够保证在AQS行列
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 你就当上面的终究成果,便是唤醒后退出循环履行后续的唤醒操作即可
    // 假如保证在AQS中的话,将AQS中的榜首个节点获取锁资源,假如不是榜首个节点的话,则会堕入挂起状态
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 假如当时节点还有nextWaiter的话,需求删去
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

4.1 checkInterruptWhileWaiting

private int checkInterruptWhileWaiting(Node node) {
    // Thread.interrupted():这个办法很经典,上篇咱们讲过,获取该线程的中止状态并铲除
    return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
// 
final boolean transferAfterCancelledWait(Node node) {
    // CAS将当时的状态修正为0
    // 假如能够修正成功,阐明这个节点是被中止唤醒的,不是正常唤醒的
    // 已然不是正常唤醒的,那么就得放到AQS行列中
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    // 假如这个节点不在AQS行列中,则履行Thread.yield()
    // 这儿也是一个小细节,咱们前面说到会将这个节点放入到AQS行列中,但是有或许这个哥们还没在AQS行列中
    // 或许由于CPU的一些原因,总之做了一个保证
    // 假如没在里边,则让线程停一停,等一等
    while (!isOnSyncQueue(node))
        Thread.yield();
    // signal唤醒的,终究回来false
    return false;
}

四、流程图

阅读完synchronized和ReentrantLock的源码后,我竟发现其非常相似

五、写在最终

鲁迅先生曾说:独行难,众行易,和情投意合的人一同前进。彼此毫无保留的共享经验,才是对抗互联网隆冬的最佳挑选。

其实许多时候,并不是咱们不行尽力,很或许便是自己尽力的方向不对,假如有一个人能稍微点拨你一下,你真的或许会少走几年弯路。

假如你也对 后端架构和中间件源码 有兴趣,欢迎添加博主微信:hls1793929520,一同学习,一同成长

我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,喜欢后端架构和中间件源码。

咱们下期再会。

我从清晨走过,也拥抱夜晚的星斗,人生没有捷径,你我皆平凡,你好,陌生人,一同共勉。

往期文章引荐:

  • 《吊打面试官系列》从根上剖析ReentrantLock的来龙去脉
  • 《吊打面试官系列》从源码全面解析 ThreadLocal 关键字的来龙去脉
  • 《吊打面试官系列》从源码全面解析 synchronized 关键字的来龙去脉
  • 《吊打面试官系列》阿里边试官让我讲讲volatile,我直接从HotSpot开端讲起,一套组合拳拿下面试