我正在参加「启航计划」

一 同步锁

在并发编程中,咱们常常用到的是synchronizedReentrantLock。其间,synchronizedjvm内置锁,而ReentrantLock坐落java.util.concurrent包下(以下简称JUC),ReentrantLock是依据AbstractQueuedSynchronizer(以下简称AQS)同步器框架完成的,本文主要来介绍AQS的内部完成及在JUC中依据AQS完成的相关类。

二 AQS内部完成

AQS是一个抽象类,内部保护一个state变量(代表同享资源)、一个FIFO等候行列(用来获取同享资源、线程排队办理等)。AQS界说了两种拜访同享资源的办法:Exclusive(独占办法,每次只要一个线程拜访资源,如:ReentrantLock)、Share(同享办法,多个线程能够一起拜访资源,如:SemaphoreCountDownLatch等)。不管是独占办法仍是同享办法,其具体完成只需求完成同享资源state的获取和开释即可,等候行列的相关操作(如获取锁失利入队、唤醒出队等),AQS现已完成好了。

注:尽管AQS提供了独占和同享两种办法拜访同享资源,但是两者并不一定是互斥的,还可所以独占和同享共存的办法拜访同享资源,如ReentrantReadWriteLock(后边单独分析)。

1、等候行列

NodeAQS中的一个静态内部类,上面提到AQS中保护了一个FIFO的双端等候行列,当获取锁失利时,AQS会将当时线程及等候状况等信息结构成一个节点Node,加入到等候行列的队尾,而且当时线程变成堵塞状况,等候唤醒。

//等候行列的Head节点 
private transient volatile Node head;
//等候行列的Tail节点
private transient volatile Node tail;
static final class Node {
    //同享锁对应节点
    static final Node SHARED = new Node();
    //独占锁对应节点
    static final Node EXCLUSIVE = null;
    //等候状况
    volatile int waitStatus;
    Node nextWaiter;
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
    //前驱节点
    volatile Node prev;
    //后继节点
    volatile Node next;
    //当时线程
    volatile Thread thread;
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    //找到当时节点的前驱节点
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    Node() {}
    //在addWaiter()时调用
    Node(Thread thread, Node mode) { 
        this.nextWaiter = mode;
        this.thread = thread;
    }
    //在Condition中调用
    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
  • waitStatus:当时线程在等候行列中的状况
waitStatus 功用
SIGNAL -1 当时节点的后继节点被堵塞(经过park)时,设置其前驱节点的状况为SIGNAL。标记为SIGNAL节点的线程开释锁时就会通知后继节点,使得后继节点所在的线程被唤醒。这个状况一般是后继节点来设置前驱节点的。
CANCELLED 1 因为超时(timeout)或中止(interrupt),行列中此节点被撤销。节点进入此状况后不会再改变。
CONDITION -2 此节点当时坐落条件行列(condition queue)中。当其他线程对这个Condition调用signal办法后,它会被转移同步行列(sync queue)中。
PROPAGATE -3 已发布的节点应该传播到其他节点。这在doReleaseShared中设置(仅针对head节点),以确保传播持续进行,即使其他操作现已介入。
0 0 以上的都不是,默认初始无锁状况
  • prev :等候行列(Sync Queue)中前驱节点
  • next :等候行列(Sync Queue)中后继节点
  • thread : 当时线程
  • nextWaiter:条件行列(Condition Queue)的后继节点

注:等候行列(Sync Queue)在独占形式、同享形式中都会运用到,条件行列(Condition Queue)只会在独占锁中运用。如:

ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
condition.await();
condition.signal();

经过ReentrantLock能够结构Condition,调用condition的await/signal办法时会把当时线程在Condition Queue中履行入队/出队操作。

JUC系列学习(二):AbstractQueuedSynchronizer同步器框架及相关实现类
当调用condition.await()时,Node节点就会从Sync Queue中进入到Condition Queue中,且对应的pre、next都会被置为nullwaitStatus变为CONDITION,并经过nextWaiter构成链表;

而当调用condition.signal()时,Node又会从Condition Queue进入到Sync Queue中,pre、next从头设置,waitStatus依据当时状况进行设置,nextWaiter会被置为null

:当调用多个lock.newCondition()时,就会发生多个Condition行列,即一个ReentrantLock能够对应多个Condition Queue

2、状况指示器state

state表示当时锁的状况,state=0是初始状况,即无锁状况; 当state>0 表示现已有线程取得了锁,当同一个线程多次取得同步锁时(可重入性),state会递加,而在开释锁的时候,state会进行递减直到state=0时,其他线程才有资历获取锁。独占形式下只要一个线程能够获取同步锁,而同享形式下能够有多个线程能够获取同步锁

private volatile int state;
protected final int getState() {
    return state;
}
protected final void setState(int newState) {
    state = newState;
}
//经过CAS办法更新state值
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

继承AQS的同步器主要完成:

独占锁需求完成

  • isHeldExclusively():是否是独占资源,当用到condition时需求完成它。
  • tryAcquire(int arg):独占锁办法,测验获取资源,成功回来true,不然回来false
  • tryRelease(int arg):独占锁办法,开释资源,假如开释操作使得所有获取同步器时被堵塞的线程康复履行,那么回来的是true,不然回来false

同享锁需求完成

  • tryAcquireShared(int arg):同享办法获取资源,负数表示获取操作失利;0代表成功,但无剩下资源;正数代表成功,且有剩下资源。
  • tryReleaseShared(int arg):同享办法测验开释资源,假如开释后答应唤醒后续节点回来true,不然回来false

咱们来看下上述办法在AQS中的完成:

protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

无论是独占锁仍是同享锁,获取锁和开释锁在AQS中都是没有完成的,需求在自界说同步器中完成

独占锁相关:

  • acquire(int arg): 独占形式下测验获取同享资源,疏忽中止操作。获取成功回来true,不然线程进入等候行列中,等候被唤醒后持续测验获取同享资源直到成功停止。本办法能够在完成Lock.lock()中运用。acquire()AQS中的源码:
//tryAcquire测验获取资源(在子类中完成),如成功直接回来,不然经过addWaiter及acquireQueued将该线程加入到等候行列的队尾,标记为独占形式,假如该线程在等候过程中中止过,acquireQueued办法会回来true,不然回来false。
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //假如tail为空(即行列为空),或经过CAS设置node到队尾失利,持续经过enq()循环设置node到队尾,直至成功停止
    enq(node);
    return node;
}
//将node设置到行列尾部
private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
/**
 *1、当时node节点的前驱节点是SIGNAL状况,回来true,意味着当时线程会被挂起,堵塞等候。
 *2、假如前驱节点是CANCEL状况(waitStatus>0),越过此节点并删除,持续往上找,直到找到不是CANCEL状况的节点作为其前驱节点停止;假如前驱节点是0或许PROPAGATE状况,则将前驱节点设置为SIGNAL状况,则在下一次履行循环时shouldParkAfterFailedAcquire能够直接回来true,挂起线程。
 **/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
  • acquireInterruptibly(int arg) : 同acquire办法,区别在于假如收到中止通知会直接中止。本办法能够在Lock.lockInterruptibly()中运用。
public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

三 JUC中依据AQS的相关完成

依据AQS构建的同步器类中,最基本的操作便是各种形式的获取操作和开释操作。获取操作是一种依赖状况的操作,通常会堵塞;开释操作时被堵塞的线程会从头开端履行。前面也提到了,AQS经过办理state变量来进行相关操作,跟state相关的办法有getStatesetStatecompareAndSetState,而且state在不同完成AQS的子类中代表的意思也各不相同。如:state 在 ReentrantLock中代表的是锁所有者线程重复获取该锁的次数;Semaphore 中 state代表剩下的答应数量;FutureTask中state用来表示使命的状况(未开端、正在运转、已完成、已撤销)。

  • ReentrantLock:只支持独占办法获取锁,所以内部只完成了tryAcquiretryReleaseisHeldExclusivelyReentrantLock运用AQS对多个条件变量、多个等候线程集内置支持。Lock.newCondition回来一个新的ConditionObject实例。
  • CountDownLatch:用于保存当时答应的数量。获取操作意味着“等候并直到闭锁到达结束状况”
  • SemaphoreSemaphore翻译为信号量,是一种同享锁,能够一起答应一个或多个线程一起同享资源。
  • CyclicBarrierCyclicBarrier意为循环栅门,能够完成一组线程等候至某个状况之后再全部持续履行。当所有线程履行结束后,CyclicBarrier还能够持续被重用。
  • ReentrantReadWriteLockReentrantReadWriteLock是独占锁(写锁)、同享锁(读锁)能够一起存在的一种读写锁,在读操作远大于写操作的场景中,能完成更好的并发性。
  • ThreadPoolExecutor线程池,内部完成运用的ReentrantLock。别的AndroidAsyncTask内部完成便是ThreadPoolExecutor

四 总结

获取锁的流程:

  • AQS 的模板办法 acquire 经过调用子类自界说完成的 tryAcquire 获取锁;
  • 假如获取锁失利,经过 addWaiter 办法将线程结构成 Node 节点插入到同步行列队尾;
  • acquirQueued 办法中以自旋的办法测验获取锁,假如失利则判别是否需求将当时线程堵塞,假如需求堵塞则终究履行 LockSupport(Unsafe) 中的 native API 来完成线程堵塞。

开释锁的流程:

  • 首先获取当时节点(实际上传入的是 head 节点)的状况,假如 head 节点的下一个节点是 null,或许下一个节点的状况为 CANCEL(不用再唤醒了),则直接从等候行列的尾部开端遍历,一向遍历到最前面的 waitStatus 小于等于 0 的节点(大于0的状况是CANCEL状况)。
  • 假如终究遍历到的节点不为 null,再调用 LockSupport.unpark 办法,调用底层办法唤醒线程。

五 参考

【1】并发Lock之AQS(AbstractQueuedSynchronizer)详解
【2】逐行分析AQS源码(1)——独占锁的获取