线程的创立和毁掉是昂贵的操作,会耗费许多的系统资源。线程池使命行列履行完结后会保存 corePoolSize 数量的线程,以备重复使用。这降低了线程创立和毁掉的开支,提高了系统的功能和呼应时刻。
ThreadPoolExecutor
结构函数
线程池结构函数参数
- corePoolSize : 中心线程数量 ,线程池里边会一向保存,没有使命时休眠,有使命时唤醒
- maximumPoolSize : 最大线程数量 ,当中心线程数量不行完结使命,而外敞开新线程进行工作,整个线程池的线程总数。
- keepAliveTime : 当新建线程超越了中心线程数,当没有使命时,非中心线程存活多久进行才毁掉线程
- unit : keepAliveTime 的时刻单位
- workQueue : 使命行列,当线程来不及处理使命时,存储在此行列里 这是一个堵塞行列的数据结构 关于堵塞行列 Java 行列介绍
- threadFactory : 线程工厂
- handler : 回绝战略,当线程池线程数量现已到达最大值 maximumPoolSize ,且当 workQueue 使命行列无法增加使命时,假如持续再向线程池中增加使命时,决定线程池的操作战略。
线程工厂
线程池创立时线程时会调用 newThread() 办法 ,用于线程池创立线程时对线程进行一致装备 ,如给线程设置线程名,优先级等
public interface ThreadFactory {
Thread newThread(Runnable r);
}
回绝战略
- CallerRunsPolicy : 假如线程池没有中止,就在当时增加使命到线程池的线程直接运转
- AbortPolicy : 抛出 RejectedExecutionException 反常,线程池默许战略
- DiscardPolicy : 直接抛弃使命,当什么都没产生
- DiscardOldestPolicy : 假如线程池没有中止,把使命行列里边一个老的使命删去,新使命增加上去
public interface RejectedExecutionHandler {
// r 为增加的 Runnable, executor 如当时线程池
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
线程池的状况
线程池使用一个 int 类型参数的前 3 位表明线程池的状况,后 29 位表明线程池的线程数量。而且经过原子类 AtomicInteger 包装,ctl 初始化为 RUNNING 状况,线程数量为 0.
经过打印成二机制,理解起来会更清楚。
/* 核算后的数值 二进制表明
COUNT_MASK : 00011111111111111111111111111111
RUNNING : 11100000000000000000000000000000
SHUTDOWN : 00000000000000000000000000000000
STOP : 00100000000000000000000000000000
TIDYING : 01000000000000000000000000000000
TERMINATED : 01100000000000000000000000000000
*/
线程池状况只看前 3位,
- 只要 RUNNING 为负数,
- SHUTDOWN 为 0.
- STOP,TIDYING,TERMINATED 是正数的最高位 所以能够经过数值核算进行状况比较。
如 小于 SHUTDOWN 的只要 RUNNING ,经过这个能够判别线程池正在运转。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取线程数量,只取后面 29位即可
private static int workerCountOf(int c) { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 参数正确性校验
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- RUNNING: 接纳新的使命,而且处理入队的使命
- SHUTDOWN: 不接纳新的使命但是处理现已入队的使命
- STOP: 不接纳新的使命,也不处理入队的使命,中止正在处理使命的线程
- TIDYING: 当一切的使命已被中止 ,线程数量为 0。线程池的状况为 TIDYING 然后会履行 terminated() 办法 ,terminated() 办法履行完结,状况会转移到 TERMINATED
- TERMINATED: 线程池彻底中止状况
---
title: 线程池状况时序图
---
sequenceDiagram
RUNNING->>SHUTDOWN: shutdown()
RUNNING->>STOP: shutdownNow()
SHUTDOWN->>TIDYING: 当线程池中一切线程已被终止而且使命行列为空
STOP->>TIDYING: 当线程池中一切线程已被终止而且使命行列为空
TIDYING->>TERMINATED: 当 terminated() 履行完结
把使命增加到线程池进行履行
public void execute(Runnable command) {
// 使命为空直接抛反常
if (command == null)
throw new NullPointerException();
// 获取线程池当时状况
int c = ctl.get();
// 假如线程个数小于中心线程数量,直接调用 addWorker 敞开新中心线程
if (workerCountOf(c) < corePoolSize) {
// 敞开新线程成功直接回来 ,留意 addWorker 参数 command 不为 null ,第二个参数为 true 表明中心线程
if (addWorker(command, true))
return;
c = ctl.get();
}
// 线程池处于 RUNNING 状况 ,而且使命行列增加使命成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get(); // 再次获取线程池状况
// 线程池不是 RUNNING 状况,而且移除成功 remove 的时候会测验终止线程池
if (! isRunning(recheck) && remove(command))
reject(command); // 回绝履行使命
// 线程池正在运转中,中心线程设置为 0 ,command 已加入到使命行列
// 当线程池运转线程数量为 0 时 , 需求敞开一个线程进行处理刚增加到使命行列的 command
else if (workerCountOf(recheck) == 0)
addWorker(null, false); // 敞开一个没有初始运转使命的非中心线程
}
else if (!addWorker(command, false)) // 无法增加,可能意味着使命行列达到上限,敞开非中心线程
reject(command); // 非中心线程无法创立时,回绝履行使命
}
创立线程
/*
修改ctl失利时 需求从头加在 ctl的值 。有两层无限循环进行查看
第一层无限循环 有 retry 符号,查看了线程池的状况,是否中止
第二层无限循环 查看的是线程池数量是否超越了设定值
*/
// firstTask 代表新建线程的第一个使命,core 代表是否中心线程
private boolean addWorker(Runnable firstTask, boolean core) {
// retry 符号,用于跳出多重循环
// break continue 加 retry 代表操作的是 retry: 紧接着的for循环
retry:
for (int c = ctl.get();;) {
// 大于等于 SHUTDOWN ,只能是 SHUTDOWN STOP,TIDYING,TERMINATED 这几种
// 当线程池状况是 SHUTDOWN 时 ,firstTask 为 null,而且使命行列不为空,可能才会敞开线程进行完结使命行列中已增加的使命
// 当线程池状况是 STOP,TIDYING,TERMINATED 时直接回来 false ,不会敞开新线程。
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
// 请留意:咱们初始化线程参数的时候并没有校验 corePoolSize,maximumPoolSize 数值最大值规模,可能超越29位,
// 所以必须 与上 COUNT_MASK 去除前3位
// 判别线程数量是否超越中心线程数或许最大线程数,超越了直接回来 false
// 没有超越的话,进行新开线程。
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 线程数量加 1成功 ,跳出 retry 循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 原子类加 1 失利,阐明其他线程已更改 ctl 值 从头获取
c = ctl.get(); // Re-read ctl
//当线程池状况大于等于 SHUTDOWN compareAndIncrementWorkerCount CAS失利
// 是 因为线程池状况产生了改变,直接跳到第一层循环
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// 不然 compareAndIncrementWorkerCount CAS失利 因为线程数量产生了改变,重试第二层循环
}
}
// 经过了线程池的状况和数量查看,进行敞开线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创立 Worker 目标,结构函数里边会调用 ThreadFactory.newThread()办法创立线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
// 怎么线程不是刚创立的线程,抛出反常
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w); // 把 w 增加到调集中
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize) // 记载线程池中开的最大线程数
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
// 假如 worker增加了,进行发动线程
if (workerAdded) {
t.start(); // 发动线程会履行,Worker 的 run 办法
workerStarted = true;
}
}
} finally {
// 假如未发动,调用 addWorkerFailed 办法
if (! workerStarted)
addWorkerFailed(w);
}
// 回来发动状况
return workerStarted;
}
// 假如创立线程失利
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w); // 移除发动失利的线程
decrementWorkerCount(); // 线程池数量 -1
tryTerminate(); // 测验终止线程池
} finally {
mainLock.unlock();
}
}
履行线程
Worker 承继 了 AbstractQueuedSynchronizer
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// 当发动线程时会调用此办法
public void run() {
runWorker(this);
}
// 0 代表未被确定状况
// 1 代表被确定状况
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true; // 线程是否反常退出
try {
// task 为 firstTask ,经过 getTask()去使命行列中拿 task ,
// 假如 task 为空会退出 while 循环 ,线程会终止 。
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 使命履行之前
try {
task.run(); // 开端履行使命
afterExecute(task, null); // 使命履行之后
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null; // task 设置为 null
w.completedTasks++; // 记载线程完结的 completedTasks
w.unlock();
}
}
completedAbruptly = false; // 非反常退出
} finally {
// 处理线程退出
processWorkerExit(w, completedAbruptly);
}
}
// 最大线程数,中心线程数,是否答应中心线程超时 都是能够动态设置的
// 获取使命,当回来的task 为 null,就能够正常退出线程
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// 大于等于 SHUTDOWN ,只能是 SHUTDOWN STOP,TIDYING,TERMINATED 这几种
// 当线程池状况是 SHUTDOWN 时 ,而且使命行列为空 回来 null
// 当线程池状况是 STOP,TIDYING,TERMINATED 时直接回来 null
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount(); // 回来 null 之前 ,线程数 -1
return null; // 回来 null 代表线程是 正常退出的
}
int wc = workerCountOf(c); // 获取线程数
// 答应中心线程超时,或许当时线程数大于中心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 1.当时线程数大于最大线程数
// 2.当时线程数大于中心线程数,而且使命行列为空
// 3.中心线程运转超时,当时现已超时 ,而且使命行列为空
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) // 线程数 -1 ,回来 null
return null;
continue;
}
try {
Runnable r = timed ? // 假如答应中心线程超时,使命行列没有使命会等候 keepAliveTime 时刻
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take(); // 假如使命行列没有使命会一向堵塞等候
if (r != null) // 假如拿到的 使命 ,回来 r
return r;
timedOut = true; // 假如 r 为 null ,阐明超时
} catch (InterruptedException retry) {
timedOut = false; // 或许 等候使命行列时,被 interrupt() ,没有超时
}
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 反常退出 ,计算线程数 -1,正常退出 ,现已减过 1
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks; // 计算已完结的使命数
workers.remove(w); // 去除 Worker
} finally {
mainLock.unlock();
}
// 测验终止线程池
tryTerminate();
int c = ctl.get();
// 线程池状况为 RUNNING,SHUTDOWN
if (runStateLessThan(c, STOP)) {
// 正常退出
if (!completedAbruptly) {
// 运转中心线程超时 min 为 0 ,不然为 corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 当 min 为 0 ,使命行列不为空
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 假如当时线程池仍是其他线程存活 直接回来
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 当时线程池状况为 SHUTDOWN ,线程池没有其他线程 ,而且使命行列不为空
// 敞开一个线程来履行剩下未完结的使命
addWorker(null, false);
}
}
// 测验终止线程池
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 当以下几种情况下 直接回来,不更改线程池状况
// 1. 线程池状况为 RUNNING
// 2. 线程池状况为 TIDYING ,TERMINATED
// 3. 线程池状况为 SHUTDOWN 而且使命行列不为空
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
return;
// 线程数不为 0 时 中止闲暇的线程 直接回来,不更改线程池状况
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 当时线程池状况设置为 TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated(); // 履行 terminated() 办法
} finally {
ctl.set(ctlOf(TERMINATED, 0));// 当时线程池状况设置为 TERMINATED
termination.signalAll(); // 通知等候线程池终止的线程线程已终止
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
中止线程池
// 中止线程池,但是有不会铲除使命行列,等候使命行列履行完结
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); // 把线程池状况置为 SHUTDOWN
interruptIdleWorkers(); // 中止闲暇的线程
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate(); // 测验中止线程池
}
// 当即中止线程池,未完结的使命列表会被铲除并回来
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP); // 把线程池状况置为 STOP
interruptWorkers(); // 中止一切的线程
tasks = drainQueue(); // 移除未完结的使命行列
} finally {
mainLock.unlock();
}
tryTerminate(); // 测验中止线程池
return tasks;
}
// 设置线程池的状况
private void advanceRunState(int targetState) {
// assert targetState == SHUTDOWN || targetState == STOP;
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 没有调用过 interrupt(),而且能够获取锁 ,阐明在getTask()办法中等候获取使命行列的使命
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt(); // 调用 interrupt() 会抛出 InterruptedException
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
private void interruptWorkers() {
// assert mainLock.isHeldByCurrentThread();
for (Worker w : workers)
w.interruptIfStarted(); // 只要线程发动了 调用 interrupt()
}
Executors
Executors 东西类提供了几个静态办法快速创立各种类型的线程池。
- newCachedThreadPool()
- 创立一个 0 中心线程数 ,最大线程数为 Integer.MAX_VALUE ,没有使命时线程保持存活 60 秒。
SynchronousQueue<Runnable>()
是一个零容量的行列,不会保存元素用于后续的拜访,当一个线程试图向中插入元素时,它会被堵塞,直到另一个线程从行列中取走这个元素,同样地,当一个线程试图从 中取走元素时,它也会被堵塞,直到另一个线程将元素放入行列中 。当有许多使命耗时使命增加到此线程池时,会创立许多线程,不会限制创立线程的数量,会耗费许多系统资源,乃至OOM 需求特别留意
- 创立一个 0 中心线程数 ,最大线程数为 Integer.MAX_VALUE ,没有使命时线程保持存活 60 秒。
- newSingleThreadScheduledExecutor()
- 创立单中心线程数为 1 ,最大线程数也为 1 ,堵塞行列
LinkedBlockingQueue<Runnable>()
能够保存 Integer.MAX_VALUE 使命 ,当有许多使命耗时使命增加到此线程池时,需求留意使命行列 OOM
- 创立单中心线程数为 1 ,最大线程数也为 1 ,堵塞行列
- newFixedThreadPool(int nThreads)
- 创立固定线程数量的线程池 ,单中心线程数为 nThreads ,最大线程数也为 nThreads ,堵塞行列 也是
LinkedBlockingQueue<Runnable>()
需求留意使命行列 OOM
- 创立固定线程数量的线程池 ,单中心线程数为 nThreads ,最大线程数也为 nThreads ,堵塞行列 也是
- newScheduledThreadPool()
- 创立 ScheduledExecutorService ,可设置守时使命,周期使命的线程池 。堵塞行列
DelayedWorkQueue()
也是能够保存Integer.MAX_VALUE 使命 ,当有许多使命耗时使命增加到此线程池时,需求留意使命行列 OOM
- 创立 ScheduledExecutorService ,可设置守时使命,周期使命的线程池 。堵塞行列