1.什么是堵塞行列

堵塞行列–BlockingQueue,它是一个接口

public interface BlockingQueue<E> extends Queue<E>

BlcokingQueue承继了Queue接口,是行列的一种,Queue和BlockingQueue都是在Java5中参加的,BlockingQueue是线程安全的,咱们在许多场景下都能够运用线程安全的行列来优雅地处理咱们业务自身的线程安全问题。
首要并发行列关系图

Java阻塞队列详解
上图展现了Queue最首要的完成类,能够看出Java提供的线程安全的行列(也成为并发行列)首要分为堵塞行列和非堵塞行列俩大类。

  • 堵塞行列的典型比如便是BlockingQueue接口的完成类,首要有6种完成:ArrayBlcokingQueue,LinkedBlockingQueue,SynchronousQueue,DelayQueue,PriorityBlockingQueue和LinkedTransherQueue,它们各自有不同不同的特色。
  • 非堵塞并发行列最典型的便是ConcurrentLinkedQueue,这个类不会让线程堵塞,运用CAS确保了线程安全。
  • 从上图还能够看到Deque,便是双端行列,双端行列从头和尾都能增加和删去元素,而一般的Queue只能从一端进入,另一端出去。这是Queue和Deque最首要的差异,其它方面基本都差不多。

2.堵塞行列的特色

堵塞行列的特色便是堵塞两个字,哈哈哈哈。像是废话。堵塞功用使得生产者和顾客两头的才能得以平衡,当有任何一端速度过快时,堵塞行列便会把过快的速度降下来。最重要的两个办法:take()和put();
take()
take办法的功用是获取并移除行列的头节点,通常在行列里有数据的时分是能够正常移除的,当行列里无数据,则堵塞,直到行列里边有数据。一旦有数据了,就马上免除堵塞状态,而且获取到数据 put()
put办法刺进元素时,假如行列没有满能够正常刺进,假如行列满了,则堵塞,直到行列里边有了闲暇空间,就会消除堵塞状态,并把数据增加进去。

堵塞行列容量问题

堵塞行列分为两种有界和无界。无界行列意味着里边能够包容非常多的元素,例如LinkedBlcokingQueue的上限是Integer.MAX_VALUE。学过Java的人都知道这个数其实挺大的。但有界行列例如ArrayBlockingQueue假如行列满了,也不会扩容。

3.堵塞行列常用办法

在堵塞行列中有许多办法,而且都非常相似,这儿我把常用的8个办法总结了一下以增加、删去为主。首要分为三类:

  • 抛出反常:add、remove、element
  • 回来成果可是不抛出反常:offer、poll、peek、
  • 堵塞:take、put、

3.1 抛出反常:add、remove、element

add办法是往行列里边增加一个元素,假如行列满了,就会抛出反常来提示咱们行列已满。

public static void main(String[] args) {
    BlockingQueue<Integer> blockingQueue=new ArrayBlockingQueue<>(2);
    blockingQueue.add(1);
    blockingQueue.add(1);
    blockingQueue.add(1);
}

这儿咱们指定行列容量为2,而且尝试参加3个值。超过了容量上限就会报IllegalStateException的过错。

Java阻塞队列详解
remove办法是删去元素,假如咱们行列为空的时分又进行了删去操作,相同会报NoSuchElementException。

public static void main(String[] args) {
    BlockingQueue<Integer> blockingQueue=new ArrayBlockingQueue<>(2);
    blockingQueue.add(1);
    blockingQueue.add(1);
    blockingQueue.remove();
    blockingQueue.remove();
    blockingQueue.remove();
}

这儿咱们指定容量为2,而且增加两个元素,然后删去三个元素。成果如下

Java阻塞队列详解
element办法是回来行列的头节点,可是不会删去这个元素。当行列为空时相同会报NoSuchElementException的过错

public static void main(String[] args) {
    BlockingQueue<Integer> blockingQueue=new ArrayBlockingQueue<>(2);
    blockingQueue.element();
}

Java阻塞队列详解

3.2回来成果可是不抛出反常offer、poll、peek

offer办法用来刺进一个元素,假如刺进成功会回来true,假如行列满了,再刺进元素不会抛出反常可是会回来false。

public static void main(String[] args) {
    BlockingQueue<Integer> blockingQueue=new ArrayBlockingQueue<>(2);
    System.out.println(blockingQueue.offer(1));
    System.out.println(blockingQueue.offer(1));
    System.out.println(blockingQueue.offer(1));
}

Java阻塞队列详解
poll办法和remove办法是对应的都是删去元素,都会回来删去的元素,可是当行列为空时则会回来null

public static void main(String[] args) {
    BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(2);
    blockingQueue.offer(1);
    blockingQueue.offer(1);
    System.out.println(blockingQueue.poll());
    System.out.println(blockingQueue.poll());
    System.out.println(blockingQueue.poll());
}

Java阻塞队列详解
peek办法和element办法对应,回来行列的头节点但并不删去,假如行列为空则直接回来null

public static void main(String[] args) {
    BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(2);
    System.out.println(blockingQueue.peek());
}

Java阻塞队列详解
带超时时刻的offer和poll

offer(E e, long timeout, TimeUnit unit)

它有三个参数,分别是元素、超时时长和时刻单位。通常情况下,这个办法会刺进成功而且回来true;假如行列满了导致刺进不成功,在调用带超时时刻重载办法的offer的时分,则会等候指定的超时时刻,假如到了时刻依然没有刺进成功,则回来false。

E poll(long timeout, TimeUnit unit)

这个带参数的poll和上面的offer相似。假如能够移除,便会当即回来这个节点的内容;假如超过了咱们界说的超时时刻依然没有元素能够移除,便会回来null作为提示。

3.2 堵塞put和take

put办法的作用是刺进元素,通常在行列没有满的时分是正常刺进。假如行列满了无法持续刺进,这时它不会马上回来false和抛出反常,而是让刺进的线程进入堵塞状态,直到行列里边有闲暇空间了。此时行列就会让之前的线程触摸堵塞状态,并把方才那个元素增加进去。 take办法的作用是获取并移除行列的头节点。通常行列里边有元素会正常取出数据并移除;可是假如履行take的时分行列里无数据,则堵塞,直到行列里边有数据今后,就会当即免除堵塞状态,而且取到数据。 搞了个总结能够看一下

Java阻塞队列详解

4.常见的堵塞行列

4.1 ArrayBlockingQueue

ArrayBlockingQueue是最典型的有界行列,其内部是用数组存储元素的,运用Reentrant完成线程安全

public ArrayBlockingQueue(int capacity) {
   this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
   if (capacity <= 0)
       throw new IllegalArgumentException();
   this.items = new Object[capacity];
   lock = new ReentrantLock(fair);
   notEmpty = lock.newCondition();
   notFull =  lock.newCondition();
}

咱们在创立它的时分就需要指定它的容量,之后也不能够在扩容了,在结构函数中咱们相同能够指定是否是公正的。假如ArrayBlockingQueue被设置为非公正的,那么就存在插队的可能;假如设置为公正的,那么等候了最长时刻的线程会优先被处理,其它线程不允许插队。

4.2 LinkedBlockingQueue

LinkedBlockingQueue内部运用链表完成的,假如咱们不指定它的初始容量,那么它的默认容量就为整形的最大值Integer.MAX_VALUE,因为这个数特别特别的大,所以它也被称为无界行列。

4.3 SynchronousQueue

SynchronousQueue最大的不同之处在于,它的容量不同,所以没有当地来暂存元素,导致每次取数据都要先堵塞,直到有数据放入;同理,每次放数据的时分也会堵塞,直到有顾客来取。SynchronousQueue的容量不是1而是0,因为SynchronousQueue不需要去持有元素,它做的便是直接传递。

4.4 PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界堵塞行列,能够经过自界说类完成compareTo()办法来拟定元素排序规矩,或许初始化时经过结构器参数Comparator来拟定排序规矩。一起,刺进行列的对象有必要是可比较大小的,也便是Comparable的,不然就会抛出ClasscastException反常。
它的take办法在行列为空时会堵塞,可是正因为它是无界行列,而且会自动扩容,所以它的行列永久不会满,所以它的put()办法永久不会堵塞,增加操作一直都会成功。

4.5 DelayQueue

Delay这个行列比较特殊,具有推迟的功用,咱们能够设定在行列中的使命推迟多久之后履行。它是无界行列,可是放入的元素有必要完成Delayed接口,而Delayed接口又承继了Comparable接口,所以天然就拥有了比较和排序的才能.

public interface Delayed extends Comparable<Delayed> {
    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

能够看出这个Delayed接口承继自Comparable,里边需要涉嫌getDelay.这儿的getDelay办法回来的是还剩下多长的推迟时刻才会被履行。假如回来0或许负数则代表使命已过期。元素会根据推迟时刻的长短被放到行列的不同方位,越接近行列头代表越早过期。

5 堵塞和非堵塞行列的并发安全原理是什么

5.1堵塞行列-以ArrayBlockingQueue为例

先来看一下ArrayBlockingQueue的几个重要特点

//用于存放元素的数组
final Object[] items;
//下一次读取的方位
int takeIndex;
// 下一次写入的方位
int putIndex;
// 行列中元素的数量
int count;
/*
 * 用于操控并发的东西类
 */
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;

ArrayBlockingQueue完成并发同步的原理便是运用ReentrantLock和它的俩个Condition,读操作和写操作都需要先获取到ReentrantLock独占锁才能进行下一步操作。进行读操作时假如行列为空,线程就会进入到读线程专属的notEmpty的Condition的行列中去排队,等候写线程写入新的元素;同理行列已满,这个时分写操作的线程进入到写线程专属的notFull行列中去排队,等候读线程将行列元素移除并腾出空间。
看一下ArrayBlockingQueue的put办法。

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

在put办法中,首要用checkNotNull办法查看刺进的元素是不是null,假如不是null,咱们会用Reentrantlock上锁,而且运用的上锁办法是lock.lockInterruptibly()。这个办法在获取锁的一起是能够相应中止的,这也正是咱们的堵塞行列调用put办法时,在尝试获取锁但还没拿到锁的期间能够呼应中止的底层原因。紧接着在while循环中,它会查看当时行列是不是已经满了,也便是count的长度是否等于数组长度。假如等于代表已经满了,于是咱们便会进行等候,直到有空余的时分,咱们才会履行下一步操作,调用enqueue办法让元素进入行列,最后用unlock办法解锁。

5.2 非堵塞行列ConcurrentLinkedQueue

public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {
            // p is last node
            if (p.casNext(null, newNode)) {
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            p = (t != (t = tail)) ? t : head;
        else
            // Check for tail updates after two hops.
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

经过源码咱们能够清晰的看出,非堵塞行列ConcurrentLinkedQueue它的首要流程便是在判空今后,会进入一个大循环中,p.casNext()办法,这个办法正是运用了CAS来操作的,这个死循环去合作CAS其实便是咱们平时说的乐观锁的思维。其实非堵塞行列ConcurrentLickedQueue运用CAS非堵塞算法+不断重试的实际来完成线程安全的

6 线程池对于堵塞行列的挑选

Java阻塞队列详解

  • FixedThreadPool选取的是LinkedBlcokingQueue(同理SingleThreadExecutor) 首要咱们知道LinkedBlockingQueu默认是无限长的,而FixedThreadPool的线程数是固定的,当核心线程数都在被运用时,这个时分假如进来新的使命会被放进堵塞行列中。因为行列是没有容量上限的,行列永久不会被填满,这样就确保了线程池FixedThreadPool和SingleThreadExecutor,不会回绝新使命的提交,也不会丢失数据。
  • CachedThreadPool选取的是SynchronousQueue 首要CachedThreadPool的线程最大数量是无限的,也就意味着它的线程数不会受限制,那么它就不需要额定的空间来存储那些Task,因为每个使命都能够经过新建线程来处理。SynchronousQueue会直接把使命交给线程,不保存它们,效率更好。
  • ScheduledThreadPool选取的是推迟行列
  • 对于ScneduledThreadPool而言,它运用的是DelayedWorkQueue,推迟行列的特色是:不是先进先出,而是会依照推迟时刻的长短来排序,下一个即将履行的使命会排到行列的最前面。挑选运用推迟行列的原因是,ScheduledThreadPool处理的是基于时刻而履行的Task,而推迟行列有才能把Task依照履行时刻的