简介
LinkedBlockingQueue
是一个堵塞的有界行列,底层是经过一个个的Node
节点构成的链表完成的,链表行列中的头节点是一个空的Node
节点,在多线程下操作时会运用ReentrantLock
锁来保证数据的安全性,并运用ReentrantLock
下的Condition
对象来堵塞以及唤醒线程。
常量
/**
* 链表中的节点类
*/
static class Node<E> {
//节点中的元素
E item;
//下一个节点
Node<E> next;
Node(E x) { item = x; }
}
/** 链表行列的容量巨细,假如没有指定则运用Integer最大值 */
private final int capacity;
/** 记载链表中的节点的数量的原子类 */
private final AtomicInteger count = new AtomicInteger();
/**链表的头节点
*/
transient Node<E> head;
/**
* 链表的尾节点
*/
private transient Node<E> last;
/** 从链表行列中获取节点时避免多个线程一起操作所发生数据安全问题时所加的锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** 增加节点到链表行列中避免多个线程一起操作所发生数据安全问题时所加的锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
-
Node
:链表行列中的节点,用于寄存元素。 -
capacity
:链表行列中最多能寄存的节点数量,假如在创立LinkedBlockingQueue
的时分没有指定.则默许最多寄存的节点的数量为Integer
的最大值。 -
head
:链表行列中的头节点,一般来说头节点都是一个没有元素的空节点。 -
last
:链表行列中的尾节点。 -
takeLock
:在获取链表行列中的节点的时分所加的锁。 -
putLock
:在增加链表行列中的节点的时分所加的锁。 -
Condition
:当线程需求进行等候或许唤醒的时分则会调用该对象下的办法。
结构办法
/**
* 创立默许容量巨细的链表行列
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* 创立指定容量巨细的链表行列
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
//创立一个空节点,并将该节点设置为头尾节点
last = head = new Node<E>(null);
}
/**
* 依据指定集合中的元素创立一个默许容量巨细的链表行列
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
//创立默许容量巨细的链表行列
this(Integer.MAX_VALUE);
//获取增加元素节点的锁
final ReentrantLock putLock = this.putLock;
//加锁
putLock.lock();
try {
//链表中节点的数量
int n = 0;
//遍历集合中的元素
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
//为元素创立一个节点,并将节点增加到链表的尾部,并设置节点为尾节点
enqueue(new Node<E>(e));
//链表中节点的数量自增
++n;
}
//记载链表中节点的数量
count.set(n);
} finally {
//开释锁
putLock.unlock();
}
}
第一个和第三个结构办法中都会调用第二个结构办法,而在第二个结构办法中会设置链表行列中容纳节点的数量以及创立一个空的头节点来填充,再看第三个结构办法中的代码,首先会获取putLock
锁,代表当时是一个需求增加节点的线程,再将指定集合中的元素封装成一个Node
节点,并依次将封装的节点追加到链表行列中的尾部,并运用AtomicInteger
来记载链表行列中节点的数量。
put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
//为指定元素创立节点
Node<E> node = new Node<E>(e);
//获取增加元素节点的锁
final ReentrantLock putLock = this.putLock;
//获取记载链表节点数量的原子类
final AtomicInteger count = this.count;
//加锁,假如加锁的线程被中止了则抛出反常
putLock.lockInterruptibly();
try {
//校验链表中的节点数量是否抵达了指定的容量
//假如抵达了指定的容量就进行堵塞等候
//假如线程被唤醒了,可是链表中的节点数量还是未改变,则持续堵塞等候
//只要当头节点出队,新的节点才能持续增加
while (count.get() == capacity) {
notFull.await();
}
//将新节点增加到链表的尾部并设置为尾节点
enqueue(node);
//获取没有增加当时节点时链表中的节点数量
//并更新链表中的节点数量
c = count.getAndIncrement();
if (c + 1 < capacity)
//唤醒等候增加节点的线程
//或许当时线程在等候行列中等候的时分
//有新的线程要履行增加节点的操作
//可是链表的容量现已抵达最大,所以新的线程也会进行等候
//当时线程被唤醒了而且链表的容量没有抵达最大则测验去唤醒等候的线程
notFull.signal();
} finally {
//开释锁
putLock.unlock();
}
if (c == 0)
//c等于0说明增加当时节点的时分链表中没有节点
//或许有线程在获取节点,可是链表中没有节点
//然后一向进行等候,当增加了节点的时分就需求唤醒获取节点的线程
signalNotEmpty();
}
LinkedBlockingQueue
中的代码都比较简单,主要是ReentrantLock
下的Condition
中的办法比较复杂,我们先整体的了解一下put
办法,首先经过new Node
为将指定元素封装成一个节点,再获取putLock
锁,当链表行列中的节点数量现已抵达了capacity
巨细,那当时线程就需求调用Condition
下的await
办法进行等候将线程堵塞,直到有节点出队或许说有节点被删去或许当时线程被中止了,当时线程被中止了则会直接退出当时put
办法并抛出反常,假如节点出队了或许节点被删去了,那当时线程被唤醒了则会持续履行增加节点的操作。
enqueue
办法则会将封装的节点追加到链表行列中的尾部,经过getAndIncrement
办法先获取没有增加当时节点时链表行列中节点的数量,然后更新增加了当时节点之后链表行列中节点的数量,c
则是没有增加当时节点时链表行列中节点的数量,c+1
则是增加当时节点后链表行列中节点的数量,假如说c+1小于capacity
则说明线程在增加节点的时分,链表行列中的节点数量现已抵达了最大值,后续增加节点的线程都需求进行堵塞,当有节点被删去或出队的时分,最开端堵塞的线程被唤醒,被唤醒的线程则会去履行增加节点的操作,当增加完节点之后链表行列中的节点数量没有抵达最大值则会去唤醒后续被堵塞的线程履行增加节点的操作。
c等于0
说明在增加当时节点之前,或许有线程在获取链表行列中的节点,可是链表行列中没有节点,导致获取节点的线程处于堵塞状况,当增加完节点之后,链表行列中有了节点,此刻就需求唤醒堵塞的线程去获取节点。
增加元素的办法分为put和offer
,差异在于堵塞与非堵塞,当链表行列中的节点数量现已抵达最大值,put
办法则会堵塞,而offer
办法不会堵塞则是直接返回。
获取元素的办法分为take、poll、peek
,take
办法与put
办法类似,只不过一个是入队,一个是出队,poll
与peek
都对错堵塞的,可是差异在于poll
获取了节点之后,该节点会从链表行列中移除,而peek
不会移除节点。
await
public final void await() throws InterruptedException {
if (Thread.interrupted())
//线程被中止抛出反常
throw new InterruptedException();
//为当时线程创立一个等候模式的节点并入队,并将等候行列中现已撤销等候的节点移除掉
Node node = addConditionWaiter();
//开释当时线程的锁,避免当时线程加了锁,导致其它在等候的线程被唤醒之后不能获取到锁然后导致一向堵塞
int savedState = fullyRelease(node);
int interruptMode = 0;
//假如指定节点还在等候行列中等候则挂起
//假如指定节点被中止了则会将指定节点增加到同步等候行列中
//假如指定节点被唤醒了则会将指定节点增加到同步等候行列中
while (!isOnSyncQueue(node)) {
//节点在等候行列中则挂起
LockSupport.park(this);
//线程在等候行列中被中止则会增加到同步等候行列中
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//acquireQueued 指定节点中的线程被中止了或许被唤醒了则会测验去获取锁
//假如还未到指定节点中的线程获取锁的时分则会持续挂起
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
//指定节点的线程现已获取到了锁而且节点关联的下一个节点不为空
//此刻就需求将现已获取到锁的节点从等候行列中移除
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
首先经过addConditionWaiter
办法将当时线程封装成一个等候模式的节点,并将节点增加到等候行列中以及会将等候行列中现已撤销等候的线程节点从行列中移除,再经过fullyRelease
办法开释掉当时线程加的所有的锁,之所以开释锁是避免其它线程获取不到锁然后一向堵塞,再看isOnSyncQueue
办法,该办法是校验当时线程节点是否在等候行列中,假如在等候行列中那就将节点中的线程挂起等候。
isOnSyncQueue
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
//指定节点还在等候行列中此刻就需求持续等候
return false;
if (node.next != null)
//指定节点现已不在等候行列中了
return true;
//从等候行列中的尾节点开端向头节点遍历,校验指定的节点是否在其间
return findNodeFromTail(node);
}
当节点的状况为CONDITION
时,则说明该节点还在等候行列中,node.prev等于null
为什么说也是在等候行列中呢?因为等候行列中的节点是没有prev
指针和next
指针的,假如prev
指针和next
指针指向的节点不为空,那就说明该节点是在同步等候行列中的,假如在同步等候行列中的话,那节点中的线程就可以测验去获取锁并履行后续的操作。
当等候行列中的线程节点被唤醒和中止则会增加到同步等候行列中,假如是被中止的话则会经过checkInterruptWhileWaiting
办法增加一个中止标识,再经过acquireQueued
办法来获取锁,假如获取锁失利则持续等候,当获取锁成功之后则会该节点从等候行列中移除,假如说你是一个被中止的线程,最后会经过reportInterruptAfterWait
办法抛出中止反常。
signal
public final void signal() {
if (!isHeldExclusively())
//加锁的线程不是当时线程则抛出反常
throw new IllegalMonitorStateException();
//头节点
Node first = firstWaiter;
if (first != null)
//唤醒头节点
doSignal(first);
}
/**
* 唤醒等候行列中的头节点
* 假如等候行列中的头节点被撤销等候或现已被唤醒了
* 此刻就需求唤醒头节点的后续的一个节点
* 直到成功的唤醒一个节点中的线程
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
/**
* 将指定的节点增加到同步等候行列中
* 并依据前一个节点的等候状况来决议是否需求马上唤醒指定节点
*/
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
//更改节点状况失利说明该节点现已被唤醒了
return false;
//即将唤醒的节点增加到同步等候行列中
//并返回前一个节点
Node p = enq(node);
//前一个节点的等候状况
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//假如前一个节点的等候状况大于0则说明现已被撤销加锁,此刻就需求唤醒后续的节点,就是当时节点
//前一个节点的等候状况不大于0可是更改前一个节点的等候状况时失利则说明前一个节点现已被唤醒了并更改了状况
//此刻就需求测验将当时节点中的线程唤醒
LockSupport.unpark(node.thread);
return true;
}
唤醒线程节点的办法主要还是看transferForSignal
办法,首先会经过cas
操作将需求唤醒的节点的状况设置为0,假如更改节点状况失利则说明该节点现已被唤醒了,更新节点状况成功则会经过enq
办法将节点增加到同步等候行列中,此刻就需求依据前一个节点来决议是否需求当即唤醒当时节点中的线程。
从下面的图片中能看出来其实同步等候行列和等候行列中运用的节点是共用的节点,并不会创立新的节点,同步等候行列中的节点运用next
指针和prev
指针来关联节点,而等候行列中则是运用nextWaiter
指针来关联节点的。