咱们好,我是王有志。重视王有志,一同聊技术,聊游戏,聊在外流浪的日子。

最近搞了个沟通群:共同富裕的Java人,中心功用是供给面试沟通场所,共享八股文或面试心得,宗旨是“Javaer help Javaer”,期望能够借他人之经历,霸占我之面试,欢迎各位参加咱们。

下面,咱们开端今日的主题:线程池。线程池是面试中必问的八股文,我将触及到到的问题分为3大类:

  • 根底运用

    • 线程池是什么?为什么要运用线程池?

    • Executor结构是什么?

    • Java供给了哪些线程池?

  • 完成原理

    • 线程池的底层原理是怎么完成的?

    • 创立线程池的参数有哪些?

    • 线程池中的线程是什么时刻创立的?

  • 系统规划

    • 怎么合理的设置线程池的大小?

    • 假如服务器宕机,怎么处理行列中的使命?

期望今日的内容能够帮你回答以上的问题。

Tips

  • 本文运用Java 11源码进行剖析;

  • 文章会在源码中添加注释,要害内容会有单独的剖析。

池化思想

在你的编程生涯中,必定遇到过各式各样的“池”,如:数据库连接池,常量池,以及今日的线程池。无一例外,它们都是凭借池化思想来办理计算机中的资源。

维基百科中是这样描绘“池化”的:

In resource management, pooling is the grouping together of resources (assets, equipment, personnel, effort, etc.) for the purposes of maximizing advantage or minimizing risk to the users. The term is used in finance, computing and equipment management.

“池化”指的是将资源汇聚到一同,以发挥优势或下降风险

接着来看维基百科中对“池”的描绘:

In computer science, a pool is a collection of resources that are kept, in memory, ready to use, rather than the memory acquired on use and the memory released afterwards.A pool client requests a resource from the pool and performs desired operations on the returned resource. When the client finishes its use of the resource, it is returned to the pool rather than released and lost.

计算机科学中的“池”,是内存中保存资源的调集,创立资源以备运用,停用时回收,而不是运用时创立,停用时丢掉。客户端从池中恳求资源,并履行操作,当不再运用资源时,将资源归还到池中,而不是释放或丢掉

为什么要运用“池”?

首要”池”是资源的调集,经过“池”能够完成对资源的统一办理

其次,“池”内寄存现已创立并初始化的资源,运用时直接从“池”内获取,跳过了创立及初始化的进程,进步了呼应速度

最终,资源运用完成后归还到“池”中,而非丢掉或毁掉,进步资源的利用率

线程池

池化思想的引入是为了处理资源办理中遇到的问题,线程池正是凭借池化思想完成的线程办理东西。那么线程池能够协助咱们处理哪些实际的问题呢?

最直接的是操控线程的创立,不加以限制的创立线程会耗尽系统资源。不信的话你能够试试下面的代码:

public static void main(String[] args) {
  while (true) {
    new Thread(()-> {
    }).start();
  }
}

Tips:卡顿警告~~

其次,线程的创立和毁掉是需求时刻的,凭借线程池能够有效的避免线程频繁的创立和毁掉线程,进步程的序呼应速度。

问题回答:线程池是什么?为什么要运用线程池?

Executor系统

Java中供给了功用完善的Executor系统,用于完成线程池。先来了解下Executor系统中的中心成员间的联系:

13.一文彻底了解线程池

Executor系统的最顶层是Executor接口ExecutorService接口,它们界说了Executor系统的中心功用。

Executor接口

Executor接口的注释:

An object that executes submitted Runnable tasks. This interface provides a way of decoupling task submission from the mechanics of how each task will be run, including details of thread use, scheduling, etc. An Executor is normally used instead of explicitly creating threads.

Executor接口十分简略,只界说了execute办法,首要目的是将Runnable使命与履行机制(线程,调度使命等)解耦,供给了履行Runnable使命的办法

public interface Executor {
  /**
   * Executes the given command at some time in the future.  The command
   * may execute in a new thread, in a pooled thread, or in the calling
   * thread, at the discretion of the {@code Executor} implementation.
   */
  void execute(Runnable command);
}

ExecutorService接口

ExecutorService接口承继了Executor接口,拓宽了Executor接口的才能。ExecutorService接口的注释:

An Executor that provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks.

ExecutorService接口要害办法的声明:

public interface ExecutorService extends Executor {
  /**
   * Initiates an orderly shutdown in which previously submitted
   * tasks are executed, but no new tasks will be accepted.
   * Invocation has no additional effect if already shut down.
   */
  void shutdown();
  /**
   * Attempts to stop all actively executing tasks, halts the
   * processing of waiting tasks, and returns a list of the tasks
   * that were awaiting execution.
   * This method does not wait for actively executing tasks to
   * terminate.  Use {@link #awaitTermination awaitTermination} to
   * do that.
   */
  List<Runnable> shutdownNow();
  boolean isShutdown();
  boolean isTerminated();
  /**
   * Blocks until all tasks have completed execution after a shutdown
   * request, or the timeout occurs, or the current thread is
   * interrupted, whichever happens first.
   */
  boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
  <T> Future<T> submit(Callable<T> task);
  <T> Future<T> submit(Runnable task, T result);
  /**
   * Submits a Runnable task for execution and returns a Future
   * representing that task. The Future's {@code get} method will
   * return {@code null} upon <em>successful</em> completion.
   */
  Future<?> submit(Runnable task);
}

对要害办法做一个阐明:

  • 承继自Executor接口:

    • execute:履行Runnable使命;
  • ExecutorService接口界说的办法:

    • submit:履行RunnableCallable使命,并回来Future

    • shutdown:答应已提交的使命履行结束,但不承受新使命的封闭;

    • shutdownNow:测验封闭所有使命(正在/等候履行),并回来等候履行的使命。

Tips:其余办法主张阅读源码中的注释,即便是说到的4个办法,也要阅读注释

问题回答:Executor结构是什么?

ThreadPoolExecutor中心流程

Executor系统中,咱们最了解的必定是ThreadPoolExecutor完成了,也是咱们能够完成自界说线程池的根底。接下来逐渐剖析ThreadPoolExecutor的完成原理。

结构线程池

ThreadPoolExecutor供给了4个结构办法,咱们来看参数最全的那个结构办法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
  if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) 
    throw new IllegalArgumentException();
  if (workQueue == null || threadFactory == null || handler == null) 
    throw new NullPointerException();
  this.corePoolSize = corePoolSize;
  this.maximumPoolSize = maximumPoolSize;
  this.workQueue = workQueue;
  this.keepAliveTime = unit.toNanos(keepAliveTime);
  this.threadFactory = threadFactory;
  this.handler = handler;
}

ThreadPoolExecutor的结构办法供给了7个参数:

  • int corePoolSize线程池的中心线程数量,创立线程的数量小于等于corePoolSize时,会一向创立线程

  • int maximumPoolSize线程池的最大线程数量,当线程数量等于corePoolSize后且行列已满,答应持续创立(maximumPoolSize−corePoolSize)(maximumPoolSize-corePoolSize)个线程;

  • long keepAliveTime线程的最大闲暇时刻,当创立了超出corePoolSize数量的线程后,这些线程在不履行使命时能够存活的时刻,超出keepAliveTime后会被毁掉;

  • TimeUnit unitkeepAliveTime的单位;

  • BlockingQueue<Runnable> workQueue堵塞行列,用于保存等候履行的使命;

  • ThreadFactory threadFactory线程工厂,用于创立线程,默许运用Executors.defaultThreadFactory()

  • RejectedExecutionHandler handler回绝战略,当行列已满,且没有闲暇的线程时,履行的回绝使命的战略。

Tips:有些小伙伴会疑问,假如每次履行一个使命,履行结束后再履行新使命,线程池依旧会创立corePoolSize个线程吗?答案是会的,后文解说。

问题回答:创立线程池的参数有哪些?

主控状况CTL与线程池状况

ThreadPoolExecutor中界说了主控状况CTL线程池状况

/**
 * The main pool control state, ctl, is an atomic integer packing
 * two conceptual fields
 *   workerCount, indicating the effective number of threads
 *   runState,    indicating whether running, shutting down etc
 */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 29
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;// 0001 1111 1111 1111 1111 1111 1111 1111
private static final int RUNNING    = -1 << COUNT_BITS;// 111 0 0000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN   =  0 << COUNT_BITS;// 000 0 0000 0000 0000 0000 0000 0000 0000
private static final int STOP       =  1 << COUNT_BITS;// 001 0 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING    =  2 << COUNT_BITS;// 010 0 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED =  3 << COUNT_BITS;// 011 0 0000 0000 0000 0000 0000 0000 0000
private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
private static int workerCountOf(int c)  { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

CTL包括了两部分内容:线程池状况(runState,源码中运用rs代替)作业线程数(workCount,源码中运用wc代替)。当看到位运算符和“MASK”一同呈现时,就应该想到应用了位掩码技术。

主控状况CTL的默许值是RUNNING | 0即:1110 0000 0000 0000 0000 0000 0000 0000。runStateOf办法回来低29位为0的CTL,与之对应的是线程池状况,workerCountOf办规律回来高3位为0的CTl,用低29位表明作业线程数量,所以线程池最多答应536870911个线程。

Tips

  • 作业线程指的是现已创立的线程,并不必定在履行使命,后文解说;

  • 位运算的能够参考编程技巧:“高端”的位运算;

  • Java中二进制运用补码,注意原码,反码和补码间的转化。

线程池的状况

注释中对线程池的状况做出了具体的阐明:

RUNNING: Accept new tasks and process queued tasks

SHUTDOWN: Don’t accept new tasks, but process queued tasks

STOP: Don’t accept new tasks, don’t process queued tasks, and interrupt in-progress tasks

TIDYING: All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method

TERMINATED: terminated() has completed

  • RUNNING:接纳新使命,处理行列中的使命;

  • SHUTDOWN:不接纳新使命,处理行列中的使命;

  • STOP:不接纳新使命,不处理行列中的使命,中止正在履行的使命;

  • TIDYING:所有使命现已履行结束,而且作业线程为0,转化到TIDYING状况后将履行Hook办法terminated()

  • TERMINATEDterminated()办法履行结束。

状况的转化

注释中也对线程池状况的转化做出了具体阐明:

RUNNING -> SHUTDOWN On invocation of shutdown()

(RUNNING or SHUTDOWN) -> STOP On invocation of shutdownNow()

SHUTDOWN -> TIDYING When both queue and pool are empty

STOP -> TIDYING When pool is empty

TIDYING -> TERMINATED When the terminated() hook method has completed

咱们经过一张状况转化图来了解线程池状况之间的转化:

13.一文彻底了解线程池

结合源码,能够看到线程池的状况从RUNNING到TERMINATED其数值是单调递加的,换句话说线程池从“活着”到“死透”所对应的数值是逐渐增大,所以能够运用数值间的比较去确认线程池处于哪一种状况。

运用线程池

咱们现已对ThreadPoolExecutor有了一个整体的认知,现在能够创立并运用线程池了:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(6));
threadPoolExecutor.submit(() -> {
  // 事务逻辑
});

这儿我运用最“简略”的结构办法,咱们看到在线程池中提交使命运用的是submit办法,该办法在抽象类AbstractExecutorService中完成:

public abstract class AbstractExecutorService implements ExecutorService {
  public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
  }
  public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
  }
  public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
  }
}

submit的重载办法之间只要参数列表的不同,完成逻辑是相同的,均是先封装RunnableFuture目标,再调用ThreadPoolExecutor#execute办法。

问题回答submit()execute()办法有什么区别?

execute办法

承继自Executor接口的execute办法是线程池的要害办法:

public void execute(Runnable command) {
  // 检测待履行使命
  if (command == null) {
    throw new NullPointerException();
  }
  // 获取主控状况CTL
  int c = ctl.get();
  // STEP 1: 当作业线程数量小于中心线程时,履行addWorker办法
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true)) {
      return;
    }
    c = ctl.get();
  }
  // 当作业线程数量大于中心线程数量时
  // STEP 2: 首要判别线程池是否处于运转状况,接着测验添加到行列中
  if (isRunning(c) && workQueue.offer(command)) {
    // 再次查看线程池状况
    int recheck = ctl.get();
    // 不再处于RUNNING,则从行列中删去当时使命,并履行回绝战略
    if (!isRunning(recheck) && remove(command)) {
      reject(command);
    } else if (workerCountOf(recheck) == 0) {
      addWorker(null, false);
    }
  }
  // STEP 3: 无法添加到行列时,测验履行addWorker
  else if (!addWorker(command, false))
    // addWorker履行失败,则履行回绝战略
    reject(command);
}

阅读execute办法的源码时需求知道一个前提,addWorker办法会查看线程池状况和作业线程数量,并履行作业使命。接着来看execute办法的3种履行情况:

  • STEP 1:线程池状况:RUNNING,作业线程数:小于中心线程数,此刻履行addWorker(command, true)

  • STEP 2:线程池状况:RUNNING,作业线程数:等于中心线程数,行列:未饱满,添加到行列中;

  • STEP 3:线程池状况:RUNNING,作业线程数:等于中心线程数,行列:已饱满,履行addWorker(command, false)

需求要点重视STEP 1的部分,还记得结构线程池最终的问题吗?STEP 1便解说了为什么一个接一个的履行使命,依旧会创立出corePoolSize个线程。接着咱们经过一张流程图展现execute办法的履行流程:

13.一文彻底了解线程池

流程图画得比较“杂乱”,由于有些判别看似在一行中履行,实际上是凭借了&&运算符短路的特性来决议是否履行,例如isRunning(c) && workQueue.offer(command)中,假如isRunning(c) == false则不会履行workQueue.offer(command)

addWorker办法

private boolean addWorker(Runnable firstTask, boolean core)

回来值为布尔类型表明是否成功履行,参数列表中有两个参数:

  • Runnable firstTask,待履行使命;

  • boolean core,true表明最多答应创立corePoolSize个线程,false表明运用最多答应创立maximumPoolSize个线程。

在剖析execute办法的进程中,咱们提早“剧透”了addWorker办法的功用:

  • 查看线程池状况和作业线程数量

  • 履行作业使命

因而addWorker办法的源码部分咱们分红两部分来看。

Tips再次着重本文运用Java 11源码进行剖析,在addWorker办法的完成上Java 11与Java 8存在差异。

查看线程池状况和作业线程数量

第一部分是线程池状况和作业线程数量查看的源码:

retry:
// 获取主控状况CTL
for (int c = ctl.get();;) {
  // 注释1
  // Java 11相对友爱许多,削减了许多!的运用,看起来比较契合人的思想
  // 这部分判别能够分红两部分:
  //   1. 至少为SHUTDOWN状况
  //   2.条件3选1满意:
  //     2-1,至少为STOP状况
  //     2-2,firstTask不为空
  //     2-3,workQueue为空
  if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) {
    return false;
  }
  for (;;) {
    // core == true,确保作业线程数量小于中心线程数量
    // core == false,确保线程数量小于最大线程数量
    if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) {
      return false;
    }
    // 增加作业线程数量并退出
    if (compareAndIncrementWorkerCount(c)) {
      break retry;
    }
    // 假如至少是SHUTDOWN状况,则重新履行
    c = ctl.get();
    if (runStateAtLeast(c, SHUTDOWN)) {
      continue retry;
    }   
  }
}

注释1的代码并不杂乱,只是需求结合线程池在不同状况下的处理逻辑来剖析:

  • 当状况“至少”为SHUTDOWN时,什么情况不需求处理?

    • 添加新的使命(对应条件2-2)

    • 行列为空(对应条件2-3)

  • 当状况“至少”为STOP时,线程池应当当即中止,不接纳,不处理。

Tips:线程池状况的部分说线程池状况从RUNNING到TERMINATED是单调递加的,因而在Java 11的完成中才会呈现runStateAtLeast办法。

履行作业使命

第二部分是履行作业使命的源码:

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
  // 创立Worker目标
  w = new Worker(firstTask);
  // 从worker目标中获取线程
  final Thread t = w.thread;
  if (t != null) {
    // 上锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
      int c = ctl.get();
      // 线程池状况查看
      // RUNNING状况,或许“小于”STOP状况(处理行列中的使命)
      if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {
        // 线程状况查看
        if (t.getState() != Thread.State.NEW) {
          throw new IllegalThreadStateException();
        }
        // 将Worker目标添加到workers中
        workers.add(w);
        workerAdded = true;
        int s = workers.size();
        if (s > largestPoolSize) {
          // 记录线程池中呈现过的最大线程数
          largestPoolSize = s;
        }
      }
    } finally {
      mainLock.unlock();
    }
    if (workerAdded) {
      // 发动线程
      t.start();
      workerStarted = true;
    }
  }
} finally {
  if (! workerStarted) {
    // addWorker履行失败
    // addWorkerFailed中包括作业线程数减1的逻辑
    addWorkerFailed(w);
  }  
}
return workerStarted;

结合两部分代码,一个正向流程是这样的:

  • 查看状况:查看是否答应创立Worker,假如答应履行compareAndIncrementWorkerCount(c),CTL中作业线程数量+1;

  • 履行使命:创立Worker目标,经过Worker目标获取线程,添加到workers中,最终发动线程。

回过头看咱们之前一向说到的作业线程,实际上是Worker目标,咱们能够近似的将Worker目标和作业线程画上等号。

问题回答:线程池中的线程是什么时刻创立的?

三调addWorker

execute办法中,有3种情况调用addWorker办法:

  • STEP 1addWorker(command, true)

  • STEP 2addWorker(null, false)

  • STEP 3addWorker(command, false)

STEP 1和STEP 3很好了解,STEP 1最多答应创立corePoolSize个线程,STEP 3最多答应创立maximumPoolSize个线程。STEP 2就比较难了解了,传入了空使命然后调用addWorker办法。

什么情况下会履行到addWorker(null, false)

  • 第1个条件:workerCount≥corePoolSizeworkerCount \geq corePoolSize

  • 第2个条件:isRunning(c) && workQueue.offer(command)

  • 第3个条件:workerCountOf(recheck) == 0

处于RUNNING状况的条件不难了解,对立的是第1个条件和第3个条件。依据这两个条件能够得到:corePoolSize≤workCount=0corePoolSize \leq workCount = 0,也便是说答应创立中心线程数为0的线程池

接着咱们来看addWorker(null, false)做了什么?创立了Worker目标,添加到workers中,并调用了一次Thread.start,虽然没有任何待履行的使命。

为什么要创立一个Worker目标?别忘了,**现已履行过workQueue.offer(command)了,需求确保线程池中至少有一个Worker,才能履行workQueue**中的使命。

“东西人”Worker

实际上ThreadPoolExecutor保护的作业线程便是Worker目标,咱们来看Worker类的原码:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  final Thread thread;
  Runnable firstTask;
  volatile long completedTasks;
  Worker(Runnable firstTask) {
    setState(-1);
    this.firstTask = firstTask;
    // 经过默许线程工厂创立线程
    this.thread = getThreadFactory().newThread(this);
  }
  public void run() {
    runWorker(this);
  }
}

Worker承继自AbstractQueuedSynchronizer,并完成了Runnable接口。

咱们要点重视结构办法,尤其是this.thread = getThreadFactory().newThread(this),经过线程工厂创立线程,传入的Runnable接口是谁?

Worker目标本身,也便是说假如有worker.getThread().start(),此刻会履行Worker.run办法。

Tips

  • AbstractQueuedSynchronizer便是大名鼎鼎的AQS,Worker凭借AQS完成非重入独占锁,不过这部分不是今日的要点;

  • Woker目标与本身的成员变量thread的联系可谓是难分难解,好好梳理下,否则会很混乱。

runWorker办法

runWorker办法传入的是Worker目标本身,来看办法完成:

final void runWorker(Worker w) {
  // 注释1
  Thread wt = Thread.currentThread();
  // Worker目标中获取履行使命
  Runnable task = w.firstTask;
  // 将Worker目标中的使命置空
  w.firstTask = null;
  w.unlock();
  boolean completedAbruptly = true;
  try {
    // 注释2
    while (task != null || (task = getTask()) != null) {
      w.lock();
      // 线程池的部分状况要中止正在履行的使命
      if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) {
        wt.interrupt();
      }
      try {
        beforeExecute(wt, task);
        try {
          // 履行使命
          task.run();
          afterExecute(task, null);
        } catch (Throwable ex) {
          afterExecute(task, ex);
          throw ex;
        }
      } finally {
        task = null;
        w.completedTasks++;
        w.unlock();
      }
    }
    completedAbruptly = false;
  } finally {
    processWorkerExit(w, completedAbruptly);
  }
}

咱们或许会对注释1的部分比较迷惑,这个Thread wt = Thread.currentThread()是什么鬼?别急,我带你从头梳理一下。

运用线程池中的代码为例,假定是首次履行,咱们看主线程做了什么:

13.一文彻底了解线程池

刚才也说了,Worker目标的线程在发动后履行worker.run,也便是在runWorker办法中Thread.currentThread()Worker目标的线程,并非主线程。

再来看注释2的部分,第一次进入循环时,履行的task是Runnable task = w.firstTask,即初度判别task != null,第二次进入循环时,task是经过task = getTask()获取的。

线程池中,除了当时Worker正在履行的使命,还有谁能够供给待履行使命?答案是行列,因而咱们能够合理得猜想getTask()是获取行列中的使命。

getTask办法

private Runnable getTask() {
  // 前次从行列中获取使命是否超时
  boolean timedOut = false;
  for (;;) {
    // 线程池状况判别,某些状况下不需求处理行列中的使命
    int c = ctl.get();
    if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
      decrementWorkerCount();
      return null;
    }
    int wc = workerCountOf(c);
    // allowCoreThreadTimeOut是否答应中心线程超时毁掉,默以为false
    // 经过allowCoreThreadTimeOut办法设置
    // wc > corePoolSize为true表明启用了非中心线程
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    // wc > maximumPoolSize,或许的情况是由于一同履行了setMaximumPoolSize办法
    // timed && timedOut为true时,表明前次获取使命超时,当时需求进行超时操控
    // wc > 1 || workQueue.isEmpty(),作业线程数量大于1或行列为空
    if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
      // 削减作业线程数量
      if (compareAndDecrementWorkerCount(c)) {
        return null;
      }
      continue;
    }
    try {
      // 注释1
      // 从行列中获取待履行使命
      Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
      if (r != null) {
        return r;
      }
      timedOut = true;
    } catch (InterruptedException retry) {
      timedOut = false;
    }
  }
}

注释1的部分有两种获取使命的方式:

  • workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)获取队首元素,假如当时行列为空,则等候指守时刻后回来null

  • workQueue.take()获取队首元素,假如行列为空,则一向等候,直到有回来值。

线程池只会在一种情况下运用workQueue.take,即不答应中心线程超时毁掉,一同线程池的作业线程数量小于中心线程数量,结合runWorker办法的源码咱们能够得知,此刻凭借了堵塞行列的才能,确保runsWoker办法一向停留在task = getTask()上,直到getTask()回来呼应的使命

而在挑选运用workQueue.poll时存在两种情况:

  • 答应中心线程超时毁掉,即allowCoreThreadTimeOut == true

  • 当时作业线程数大于中心线程数,即线程池现已创立满足数量的中心线程,而且行列现已饱满,开端创立非中心线程处理使命。

结合runWorker办法的源码咱们能够知道,假如行列中的使命现已被消耗结束,即getTask()回来null,则会跳出while循环,履行processWorkerExit办法。

processWorkerExit办法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
  // runWorker履行失败的场景
  if (completedAbruptly) {
    decrementWorkerCount();
  }
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    completedTaskCount += w.completedTasks;
    // 从workers中删去Worker
    workers.remove(w);
  } finally {
    mainLock.unlock();
  }
  // 依据线程池状况判别是否结束线程池
  tryTerminate();
  int c = ctl.get();
  // STOP之下的状况,runWorker正常结束时completedAbruptly == false
  // 确保至少有1个worker,用于处理行列中的使命
  if (runStateLessThan(c, STOP)) {
    if (!completedAbruptly) {
      int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
      if (min == 0 && ! workQueue.isEmpty()) {
        min = 1;
      }
      if (workerCountOf(c) >= min){
        return;
      }
    }
    // runWorker反常退出时,即completedAbruptly == true
    // 或许是workers存活少于1个
    addWorker(null, false);
  }
}

processWorkerExit办法做了3件事:

  • 移除“多余”的Worker目标(答应毁掉的中心线程或许非中心线程);

  • 测验修改线程池状况;

  • 确保在至少存活1个Worker目标。

Tips:我跳过了tryTerminate()办法的剖析,对,是成心的~~

问题回答:线程池的底层原理是怎么完成的?

毁掉非中心线程

设想一个场景:现已创立了满足数量的中心线程,而且行列现已饱满,仍然有使命提交时,会是怎样的履行流程?

线程池创立非中心线程处理使命,当非中心线程履行结束后并不会当即毁掉,而是和中心线程一同去处理行列中的使命。那么当所有的使命都处理结束之后呢?

回到runWorker中,当所有使命履行结束后再次进入循环,getTask中判别作业线程数大于和中心线程数,此刻启用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),而keepAliveTime便是构建线程池时设定的线程最大闲暇时刻,当超过keepAliveTime后仍旧没有取得使命回来null,跳出runWorker的循环,履行processWorkerExit毁掉非中心线程。

ThreadPoolExecutor拾遗

目前咱们现已具体剖析了线程池的履行流程,这儿我会弥补一些前文未触及到的内容,由于是弥补内容,所以触及不会具体的解说源码。

预创立线程

咱们在说到线程池的优点时会特别着重一句,池内保存了创立好的资源,运用时直接取出,但线程池好像依旧是首次接到使命后才会创立资源啊?

实际上,线程池供给prestartCoreThread办法,用于预创立中心线程:

public boolean prestartCoreThread() {
  return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true);
}

假如你的程序需求做出极致的优化,能够挑选预创立中心线程。

封闭和当即封闭

ThreadPoolExecutor供给了两个封闭的功用shutdownshutdownNow

public void shutdown() {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    checkShutdownAccess();
    advanceRunState(SHUTDOWN);
    // 中止闲暇线程
    interruptIdleWorkers();
    // ScheduledThreadPoolExecutor的hook
    onShutdown();
  } finally {
    mainLock.unlock();
  }
  tryTerminate();
}
public List<Runnable> shutdownNow() {
  List<Runnable> tasks;
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    checkShutdownAccess();
    advanceRunState(STOP);
    // 中止所有线程
    interruptWorkers();
    tasks = drainQueue();
  } finally {
    mainLock.unlock();
  }
  tryTerminate();
  return tasks;
}

两者的不同仍是很明显的:

  • shutdown将线程池的状况改为SHUTDOWN,而shutdownNow则改为STOP;

  • shutdown不回来行列中的使命,而shutdownNow回来行列中的使命,由于STOP状况不会再去履行行列的使命;

  • shutdown中止闲暇线程,而shutdownNow则是中止所有线程。

从完成作用上来看封闭shutdown会更“温和”一些,而当即封闭shutdownNow则更为“激烈”,仿佛语气中带着不容置疑。

回绝战略

线程池不会无条件的接纳使命,有两种情况下它会回绝使命:

  • 中心线程已满,添加到行列后,线程池不再处于RUNNING状况,此刻从行列删去使命,并履行回绝战略;

  • 中心线程已满,行列已满,非中心线程已满,此刻履行回绝战略。

Java供给了RejectedExecutionHandler接口:

public interface RejectedExecutionHandler {
  void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

因而,咱们能够经过完成RejectedExecutionHandler接口,完成自界说回绝战略。另外,Java中也供给了4种默许回绝战略:

  • AbortPolicy:直接抛出反常;

  • CallerRunsPolicy:提交使命的线程履行;

  • DiscardOldestPolicy:丢掉行列中最早参加的线程;

  • DiscardPolicy:直接丢掉,便是啥也不干。

源码十分简略,咱们自行阅读即可。

Java供给了哪些线程池

假如不想自己界说线程池,Java也贴心的供给了4种内置线程池,默许线程池经过Executors获取。

Java的命名中,s后缀一般是对应东西类,一般供给很多静态办法,例如:Collections之于Collection。所以即便归于Executor系统中的一员,但却没办法在“族谱”上呈现,打工人的悲惨命运

FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
  return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
}

固定大小线程池,中心线程数和最大线程数相同,看起来都还不错,首要的问题是经过无参结构器创立的LinkedBlockingQueue,它答应的最大长度是Integer.MAX_VALUE

**Tips:**这也便是为什么《阿里巴巴Java开发手册》中不推荐的原因。

CachedThreadPool

public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory);
}

能够说是“无限大”的线程池,接到使命就创立新线程,另外**SynchronousQueue是十分特殊的行列,不存储数据,每个put操作对应一个take操作**。咱们来剖析下实际或许发生的情况:

  • 前提:很多并发涌入

  • 提交第一个使命,进入行列,判别中心线程数为0,履行addWorker(null, false),对应execute的SETP 2;

  • 提交第二个使命,假定第一个使命未结束,第二个使命直接提交到行列中;

  • 提交第三个使命,假定第一个使命未结束,无法添加到行列中,履行addWorker(command, false)对应execute的SETP 3。

也便是说,只要提交的够快,就会无限创立线程。

SingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
  return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory));
}

只要一个线程的线程池,问题也是在于LinkedBlockingQueue,能够“无限”的接纳使命。

ScheduledExecutor

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
  return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
  return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
  return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory));
}

用来履行守时使命,DelegatedScheduledExecutorService是对ScheduledExecutorService的包装。

在Executor系统的“族谱”中,是有体现到ScheduledExecutorServiceScheduledThreadPoolExecutor的,这部分留给咱们自行剖析了。

除了以上4种内置线程池外,Java还供给了内置的ForkJoinPool

public static ExecutorService newWorkStealingPool(int parallelism) {
  return new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
}
public static ExecutorService newWorkStealingPool() {
  return new ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
}

这部分是Java 8之后供给的,咱们暂时按下不表,放到后期关于Fork/Join结构中具体解说。

问题回答:Java供给了哪些线程池?

合理设置线程池

一般咱们在议论合理设置线程池的时候,指的是设置线程池的corePoolSizemaximumPoolSize,合理的设置能够最大化的发挥线程池的才能。

咱们先来看美团技术团队的调研结果:

13.一文彻底了解线程池

无论是哪种公式,都是根据理论得出的结果,但往往理论到工程还有很长得一段路要走。

按照我的经历,合理的设置线程池能够汇总成一句话:依据理论公式预估初始设置,随后对中心事务进行压测调整线程池设置

Java也供给了动态调整线程池的才能:

public void setThreadFactory(ThreadFactory threadFactory);
public void setRejectedExecutionHandler(RejectedExecutionHandler handler);
public void setCorePoolSize(int corePoolSize);
public void setMaximumPoolSize(int maximumPoolSize);
public void setKeepAliveTime(long time, TimeUnit unit);

除了workQueue都能调整,本文不评论线程池动态调整的完成。

Tips

  • 调研结果源自于《Java线程池完成原理及其在美团事务中的实践》;

  • 该篇文章中也具体的评论了动态化线程池的思路,推荐阅读。

结语

线程池的大部分内容就到这儿结束了,期望咱们够经过本篇文章回答绝大部分关于线程池的问题,带给咱们一些协助,假如有错误或许不同的主意,欢迎咱们留言评论。不过今日的仍是遗留了两点内容:

  • 堵塞行列

  • Fork/Join结构

这些我会在后续的文章中和咱们共享。


好了,今日就到这儿了,Bye~~