前文说到线程的运用以及线程间通讯方式,一般情况下咱们经过new Thread或许new Runnable创立线程,这种情况下,需求开发者手动办理线程的创立和收回,线程目标没有复用,很多的线程目标创立与毁掉会引起频频GC,那么事否有机制主动进行线程的创立,办理和收回呢?线程池能够完成该能力。

线程池的长处:

  • 线程池中线程重用,避免线程创立和毁掉带来的功能开销
  • 能有用控制线程数量,避免很多线程抢占资源造成堵塞
  • 对线程进行简略办理,提供守时履行预计指定间隔履行等战略

线程池的封装完成

在java.util.concurrent包中提供了一系列的东西类以便利开发者创立和运用线程池,这些类的承继关系及阐明如下:

线程池封装及拒绝策略

类名 阐明 补白
Executor Executor接口提供了一种使命提交后的履行机制,包括线程的创立与运行,线程调度等,一般不直接运用该类 /
ExecutorService ExecutorService接口,提供了创立,办理,终止Future履行的办法,用于盯梢一个或多个异步使命的进度,一般不直接运用该类 /
ScheduledExecutorService ExecutorService的完成接口,提供延时,周期性履行Future的能力,一同具有ExecutorService的基础能力,一般不直接运用该类 /
AbstractExecutorService AbstractExecutorService是个虚类,对ExecutorService中办法进行了默许完成,其提供了newTaskFor函数,用于获取RunnableFuture目标,该目标完成了submit,invokeAny和invokeAll办法,一般不直接运用该类 /
ThreadPoolExecutor 经过创立该类目标就能够构建一个线程池,经过调用execute办法能够向该线程池提交使命。一般情况下,开发者经过自界说参数,结构该类目标就来获得一个契合事务需求的线程池 /
ScheduledThreadPoolExecutor 经过创立该类目标就能够构建一个能够周期性履行使命的线程池,经过调用schedule,scheduleWithFixedDelay等办法能够向该线程池提交使命并在指守时刻节点运行。一般情况下,开发者经过结构该类目标就来获得一个契合事务需求的可周期性履行使命的线程池 /

由上表可知,对于开发者而言,一般情况下咱们能够经过结构ThreadPoolExecutor目标来获取一个线程池目标,经过其界说的execute办法来向该线程池提交使命并履行,那么怎样创立线程池呢?让咱们一同看下

ThreadPoolExecutor

ThreadPoolExecutor完好参数的结构函数如下所示:

  /**
   * Creates a new {@code ThreadPoolExecutor} with the given initial
   * parameters.
   *
   * @param corePoolSize the number of threads to keep in the pool, even
   *     if they are idle, unless {@code allowCoreThreadTimeOut} is set
   * @param maximumPoolSize the maximum number of threads to allow in the
   *     pool
   * @param keepAliveTime when the number of threads is greater than
   *     the core, this is the maximum time that excess idle threads
   *     will wait for new tasks before terminating.
   * @param unit the time unit for the {@code keepAliveTime} argument
   * @param workQueue the queue to use for holding tasks before they are
   *     executed.  This queue will hold only the {@code Runnable}
   *     tasks submitted by the {@code execute} method.
   * @param threadFactory the factory to use when the executor
   *     creates a new thread
   * @param handler the handler to use when execution is blocked
   *     because the thread bounds and queue capacities are reached
   * @throws IllegalArgumentException if one of the following holds:<br>
   *     {@code corePoolSize < 0}<br>
   *     {@code keepAliveTime < 0}<br>
   *     {@code maximumPoolSize <= 0}<br>
   *     {@code maximumPoolSize < corePoolSize}
   * @throws NullPointerException if {@code workQueue}
   *     or {@code threadFactory} or {@code handler} is null
   */
  public ThreadPoolExecutor(int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue<Runnable> workQueue,
               ThreadFactory threadFactory,
               RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
      maximumPoolSize <= 0 ||
      maximumPoolSize < corePoolSize ||
      keepAliveTime < 0)
      throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
      throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
        null :
        AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
   }

从上述代码能够看出,在构建ThreadPoolExecutor时,主要触及以下参数:

  1. corePoolSize:中心线程个数,一般情况下能够运用 处理器个数/2 作为中心线程数的取值,能够经过Runtime.getRuntime().availableProcessors()来获取处理器个数
  2. maximumPoolSize:最大线程个数,该线程池支撑一同存在的最大线程数量
  3. keepAliveTime:非中心线程闲置时的超时时长,超过这个时长,非中心线程就会被收回,咱们也能够经过allowCoreThreadTimeOut(true)来设置中心线程闲置时,在超时时刻抵达后收回
  4. unit:keepAliveTime的时刻单位
  5. workQueue:线程池中的使命行列,当中心线程数满或最大线程数满时,经过线程池的execute办法提交的Runnable目标存储在这个参数中,遵从先进先出准则
  6. threadFactory:创立线程的工厂 ,用于批量创立线程,统一在创立线程时进行一些初始化设置,如是否看护线程、线程的优先级等。不指守时,默许运用Executors.defaultThreadFactory() 来创立线程,线程具有相同的NORM_PRIORITY优先级并且对错看护线程
  7. handler:使命回绝处理战略,当线程数量等于最大线程数且等候行列已满时,就会选用回绝处理战略处理新提交的使命,不指守时,默许的处理战略是AbortPolicy,即抛弃该使命

综上,咱们能够看出创立一个线程池最少需求清晰中心线程数,最大线程数,超时时刻及单位,等候行列这五个参数,下面咱们创立一个中心线程数为1,最大线程数为3,5s超时收回,等候行列最多能存放5个使命的线程池,代码如下:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1,3,5,TimeUnit.SECONDS,new LinkedBlockingQueue<>(5));

随后咱们运用for循环向该executor中提交使命,代码如下:

public static void main(String[] args) {
  // 创立线程池
  ThreadPoolExecutor executor = new ThreadPoolExecutor(1,3,5,TimeUnit.SECONDS,new LinkedBlockingQueue<>(5));
  for (int i=0;i<10;i++) {
    int finalI = i;
    System.out.println("put runnable "+ finalI +"to executor");
    // 向线程池提交使命
    executor.execute(new Runnable() {
      @Override
      public void run() {
        System.out.println(Thread.currentThread().getName()+",runnable "+ finalI +"start");
        try {
          Thread.sleep(5000);
         } catch (InterruptedException e) {
          throw new RuntimeException(e);
         }
        System.out.println(Thread.currentThread().getName()+",runnable "+ finalI +"executed");
       }
     });
   }
}

输出如下:

线程池封装及拒绝策略

从输出能够看到,当提交一个使命到线程池时,其履行流程如下:

线程池封装及拒绝策略

线程池回绝战略

线程池回绝战略有四类,界说在ThreadPoolExecutor中,分别是:

  • AbortPolicy:默许回绝战略,丢掉提交的使命并抛出RejectedExecutionException,在该反常输出信息中,能够看到当时线程池状况
  • DiscardPolicy:丢掉新来的使命,可是不抛出反常
  • DiscardOldestPolicy:丢掉行列头部的旧使命,然后测验从头履行,假如再次失利,重复该过程
  • CallerRunsPolicy:由调用线程处理该使命

当然,假如上述回绝战略不能满意需求,咱们也能够自界说反常,完成RejectedExecutionHandler接口,即可创立自己的线程池回绝战略,下面是运用自界说回绝战略的示例代码:

public static void main(String[] args) {
  RejectedExecutionHandler handler = new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      System.out.println("runnable " + r +" in executor "+executor+" is refused");
     }
   };
  ThreadPoolExecutor executor = new ThreadPoolExecutor(1,3,5,TimeUnit.SECONDS,new LinkedBlockingQueue<>(5),handler);
  for (int i=0;i<10;i++) {
    int finalI = i;
    Runnable runnable = new Runnable() {
      @Override
      public void run() {
        System.out.println(Thread.currentThread().getName()+",runnable "+ finalI +"start");
        try {
          Thread.sleep(5000);
         } catch (InterruptedException e) {
          throw new RuntimeException(e);
         }
        System.out.println(Thread.currentThread().getName()+",runnable "+ finalI +"executed");
       }
     };
    System.out.println("put runnable "+ runnable+"  index:"+finalI +" to executor:"+executor);
    executor.execute(runnable);
   }
}

输出如下:

线程池封装及拒绝策略

使命行列

对于线程池而言,使命行列需求是BlockingQueue的完成类,BlockingQueue接口的完成类类图如下:

线程池封装及拒绝策略

下面咱们针对常用行列做简略了解:

  • ArrayBlockingQueue:ArrayBlockingQueue是根据数组的堵塞行列,在其内部维护一个定长数组,所以运用ArrayBlockingQueue时必须指定使命行列长度,因为不论对数据的写入或许读取都运用的是同一个锁目标,所以没有完成读写别离,一同在创立时咱们能够指定锁内部是否选用公正锁,默许完成对错公正锁。

    非公正锁与公正锁

    公正锁:多个使命堵塞在同一锁时,等候时长长的优先获取锁

    非公正锁:多个使命堵塞在同一锁时,锁可获取时,一同抢锁,谁先抢到谁先履行

  • LinkedBlockingQueue:LinkedBlockingQueue是根据链表的堵塞行列,在创立时可不指定使命行列长度,默许值是Integer.MAX_VALUE,在LinkedBlockingQueue中读锁和写锁完成了分支,相对ArrayBlockingQueue而言,功率提升明显。

  • SynchronousQueue:SynchronousQueue是一个不存储元素的堵塞行列,也就是说当需求刺进元素时,必须等候上一个元素被移出,否则不能刺进,其适用于使命多可是履行比较快的场景。

  • PriorityBlockingQueue:PriorityBlockingQueue是一个支撑指定优先即的堵塞行列,默许初始化长度为11,最大长度为Integer.MAX_VALUE – 8,能够经过让装入行列的目标完成Comparable接口,界说目标排序规矩来指定行列中元素优先级,优先级高的元素会被优先取出。

  • DelayQueue:DelayQueue是一个带有延迟时刻的堵塞行列,行列中的元素,只有等候延时时刻到了才能够被取出,因为其内部用PriorityBlockingQueue维护数据,故其长度与PriorityBlockingQueue共同。一般用于守时调度类使命。

下表从一些视点对上述行列进行了比较:

行列名称 底层数据结构 默许长度 最大长度 是否读写别离 适用场景
ArrayBlockingQueue 数组 0 开发者指定大小 使命数量较少时运用
LinkedBlockingQueue 链表 Integer.MAX_VALUE Integer.MAX_VALUE 很多使命时运用
SynchronousQueue 公正锁-行列/非公正锁-栈 0 / 使命多可是履行速度快的场景
PriorityBlockingQueue 目标数组 11 Integer.MAX_VALUE-8 有使命需求优先处理的场景
DelayQueue 目标数组 11 Integer.MAX_VALUE-8 守时调度类场景