本文带着以下问题
❶ execute办法里为啥要用workQueue.offer(command)这个非堵塞办法呢,而不必put等堵塞办法呢?
❷ 线程池收到Runnable使命紧就start履行了,为什么还要将使命放入调集(workers.add(w))呢? workers调集存在的含义是什么呢?
❸ workers 与 workerQueue 与 ctl的联系?
❹ getTask办法中为啥运用workQueue.poll(num, timeUnit) 和 take()堵塞办法呢,为啥不运用非堵塞办法呢?
❺ 线程池里寄存的是线程吗?
❻ 线程池运用了两个锁lock,worker运用一个,reentrantLock运用一个,效果别离是什么?
❼ 线程池里怎样区别闲暇线程和履行中线程?
了解ThreadPoolExecutor,乃至任何其他的类或组件,我觉得从两个点动身会更顺滑:他的数据结构和结构中的特点改变
所以,现在咱们看下ThreadPoolExecutor的数据结构。如下代码
private final BlockingQueue<Runnable> workQueue;
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private final HashSet<Worker> workers;
class Worker
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
做个比方,在一个森林中,有个超高档的城堡,城堡太大了,为了防止迷路,客人来的时候,自进门那一刻起,就主动给客人装备一辆车和一个保姆。
现在,线程池便是城堡,worker 便是那辆车,thread 便是那个保姆,task 便是那个客人。
他们的联系是,线程池会有0或多个 worker,每个worker装备一个 thread 线程和一个 task 使命。workers 是城堡里的客房。城堡太火爆了,装不下了,城堡为这些人供给了临时露营帐子,workQueue便是那个露营帐子。露营帐子不是一般的帐子,自带堵塞
ctl是一个巧妙的设计,既表明 workerCount 又表明 runState
本文将ThreadPoolExecutor深邃的位运算转换为二进制,以便更直观的了解办法和特点的运用。对参加线程池,履行worker的线程,开释worker的线程,中止线程池等进行详尽的了解,以求每个判别,每行代码都能了解。
状况
NOTE: ThreadPoolExecutor代码中的采用了高效的位运算,但阅览源码时不好直观了解,所以我将他们转成十进制,直观大家便于了解。
转换时凭借以下办法 1、二进制 -> 十进制 Integer.parseInt(“00111100”, 2) 2、十进制 -> 二进制 Integer.toBinaryString(1)
ctl便是又是 既是workerCount:表明有用的线程数 又是runState: 表明线程池的状况
/**
*
* <pre>
* ctl便是又是
* workerCount:表明有用的线程数
* runState: 表明线程池的状况
* </pre>
* <pre>
* Run state is stored in the high-order bits; worker count is stored in the low-order bits. 解说如下:
* | --- 高位 --- | --- 低位 --- |
* | -536870912 --> 0 --> 536870912 |
* | --- 线程状况 --- | --- 线程数量 --- |
* 所以线程状况的改变轨迹是从 -536870912 开端递增,一向到 0;线程数量的改变轨迹是从0 开端递增,一向到 536870912
* 2. ctl直接等于 536870910,再界说两个线程,debug看效果 do it by practise
* </pre>
*
* <pre>
* ctl的初始值:-536870912
* ctl = ctlOf(RUNNING, 0): -536870912 -> 11100000000000000000000000000000 -> size: 32
* </pre>
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
/**
* <pre>
* (1 << COUNT_BITS) = (1 << 29) -> 536870912 -> 100000000000000000000000000000 -> size: 30
* CAPACITY = ((1 << 29) - 1) -> 536870911 -> 11111111111111111111111111111 -> size: 29
* ~CAPACITY = ~((1 << 29) - 1) -> -536870912 -> 11100000000000000000000000000000 -> size: 32
* </pre>
*/
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
* NOTE: 这几个状况值是有数值次序的,所以这几个状况值才能够进行大于、小于等操作
*
* runState is stored in the high-order bits
* <pre>
* COUNT_BITS = 29
* RUNNING (-1 << 29): -536870912 -> 11100000000000000000000000000000 -> size: 32
* SHUTDOWN (0 << 29): 0 -> 0 -> -> size: 1
* STOP (1 << 29): 536870912 -> 100000000000000000000000000000 -> size: 30
* TIDYING (2 << 29): 1073741824 -> 1000000000000000000000000000000 -> size: 31
* TERMINATED (3 << 29): 1610612736 -> 1100000000000000000000000000000 -> size: 31
* </pre>
*
* 》》》重要知识点出现了:这几个状况值是有数值次序的,所以这几个状况值才能够进行大于、小于等操作 》》》
*
* 经过实践得出:这儿的常量仅仅状况的边界值。换句话说,每个状况其实是一个规模,详细如下
* runState: ------- RUNNING -------- )[ ---------- SHUTDOWN --------- )[ ------------ STOP ---------- )[ ------------- TIDYING -------- )[ TERMINATED
* 11100000000000000000000000000000 ~ 0 ~ 100000000000000000000000000000 ~ 1000000000000000000000000000000 ~ 1100000000000000000000000000000 ~ 无穷
*/
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
》》再次着重:runState的几个常量仅是状况的边界值。换句话说,每个状况其实是一个规模,
正是由于每个状况其实是一个规模,状况常量仅是状况规模的边界。介于次,状况办法的运用就简单明了了
状况办法
/**
* 这个办法:回来负数阐明线程状况是RUNNING;回来0阐明线程状况是SHUTDOWN;理论上不回来正数
*
* <pre>
* 由于
* ~CAPACITY = ~((1 << 29) - 1) -> -536870912 -> 11100000000000000000000000000000 -> size: 32
* ctl = ctlOf(RUNNING,0) -> -536870912 -> 11100000000000000000000000000000 -> size: 32
* 所以
* c=ctl时,ctl & ~CAPACITY -> -536870912 & -536870912 -> 11100000000000000000000000000000 = -536870912,
* 所以
* 跟着ctl ++,runStateOf办法成果也是负数,并从-536870912开端递 +1,一向到 0,所以也能够是说负数表明线程状况是RUNNING(运转状况)时
*
* 举例:
* 当第一次ctl ++后,ctl -> -536870911 -> 11100000000000000000000000000001
* 此刻,ctl & ~CAPACITY -> -536870911 & -536870912 -> 11100000000000000000000000000001 & 11100000000000000000000000000000
* NOTE: 当runStateOf等于0时,线程状况就变成了SHUTDOWN
* </pre>
*/
private static int runStateOf(int c) { return c & ~CAPACITY; }
/**
* <pre>
* 由于
* CAPACITY = (1 << 29) - 1 -> 536870911 -> 11111111111111111111111111111 -> size: 29
* ctl = ctlOf(RUNNING, 0) -> -536870912 -> 11100000000000000000000000000000 -> size: 32
* 所以
* c=ctl时,ctl & CAPACITY -> -536870912 & 536870911 -> 00000000000000000000000000000000 = 0
* 所以
* 跟着ctl ++,所以workerCountOf办法成果从0开端递 +1,一向到 536870911
* </pre>
*/
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池的履行进程,这个网上说的很了解了
Tips => 正式开端前,再次着重 ctl其值为正数时表明线程数,其值为负数时表明线程状况
增加使命办法 – execute
public void execute(Runnable command) {
/*
* Proceed in 3 steps:
* 线程参加线程池的履行进程,见上图
*/
int c = ctl.get();
// 其时线程数小于中心线程数
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 在addWorker里履行时,假设其他线程对线程池调用shutdown or shutdownNow or terminate and so on,
// 那么addWorker回来false,然后走到这行代码
c = ctl.get();
}
// =》isRunning(c) = c < SHUTDOWN =》记住一点:ctl小于0便是Running状况
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 假设其他线程对线程池调用shutdown or terminate相关办法,对于刚才参加行列的使命要删去调
if (! isRunning(recheck) && remove(command))
reject(command);
// 假定线程池中只需一个运转着的线程:T1,当main线程走到这行代码时,T1运转完了,并对ctl履行了-1操作后便是0了,此刻这行判别为true
// 但是此刻addWorker传入的使命是null,疑问吗?这是由于代码履行到这时,使命task现已参加到workQueue行列了,而在runWorker办法中,假设worker的firstTask是null,那么会从workQueue行列里取使命task履行,所以此处传null给addWorker得以有机会履行t.start(),然后履行runWorker办法。__从这儿看出一点:work count是不包括行列中使命的__
else if (workerCountOf(recheck) == 0)
addWorker(null, false); // (1)
}
// 假设履行到这儿,两种状况:
// 1. 线程池是RUNNING状况,但workerCount >= corePoolSize并且workQueue已满。运用最大线程数的逻辑
// 2. 线程池现已不是RUNNING状况,即c >= SHUTDOWN。那为什么还要走addWorker办法呢,我的了解是:这是作者代码精简的成果,addWorker办法有c >= SHUTDOWN的判别逻辑
else if (!addWorker(command, false))
reject(command);
}
这儿抛出一个问题(问题A):这儿为啥要用workQueue.offer(command)这个非堵塞办法呢,而不必put等堵塞办法呢?先想想,文末一同说说
增加使命办法 – addWorker
记住一个条件:进入这个办法的条件是其时线程数<=中心线程数,或行列已满&其时线程数<=最大线程数。有了这个条件就好了解多了
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 线程池不是RUNNING状况时触发
// 且,(rs不是0 或 firstTask不是null 或 行列是空)
// 回来false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 对c进行CAS操作,直到成功
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// (rs == SHUTDOWN && firstTask == null)这种状况仅仅适用于execute办法的(1)处状况
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); //(1)
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker办法一共做了两件事:1.ctl递增;2.Worker目标参加workers调集并start Worker.thread线程(即线程池中的线程)。再详细点说,这个办法主要是 start Worker.thread线程。既然是这样,那么有两个疑问:
-
问题B: 传入的Runnable类型的使命紧接着就进行start了,那么为什么还要workers.add(w)放入调集呢,workers调集存在的含义是什么呢?自己先想想,文末给出答案
-
请注意一个细节: addWorker办法在什么线程里履行的?这有助于问题A的了解
再问个问题(问题E):线程池(ThreadPoolExecutor)中寄存的是线程吗?不是,是一堆的Worker目标,Worker既不是thread线程也不是要履行的使命。那么它是做啥的呢
咱们来看下Worker的构造办法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker (1) inhibit interrupts:抑制中止
this.firstTask = firstTask; // (2)
this.thread = getThreadFactory().newThread(this); // (3)
}
setState(-1):包含了一个很重要的点:worker中的thread线程能够被中止的条件:aqs.state=0 从构造办法知道,咱们要履行的使命成为Worker的一个字段,一同Worker还有一个thread字段,看Worker的(3)处代码,我觉得这行很关键,我改下它的同义写法:this.thread.target = this,即worker作为他本身的thread字段的值,从Worker的界说知道,Worker本身也是Runnable的。所以,当履行addWorker办法的(1)处t.start()时,咱们的使命也跟着履行了,这个流程如图
了解了这儿,Worker.run()和runWorker(Worker)怎样触发的就很容易了解了
履行使命办法 – runWorker
Worker的thread字段值,履行thread.start()办法,触发了此办法的履行。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// worker的lock用于区别线程是否闲暇的
w.unlock(); // allow interrupts (b)
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock(); // 这儿为啥要加锁,结合 interruptIdleWorkers 办法一同思考?先透漏一点:w.lock() 将 w.state 设置为1,然后确保 w.thread 线程不能被中止
// 假设线程池正在中止,那么要确保其时线程是中止状况;
// 假设不是的话,则要确保其时线程不是中止状况;
// ctl > STOP
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())
wt.interrupt();
try {
// 业务人员自己完成
beforeExecute(wt, task);
try {
task.run();
} catch (Throwable x) {
throw new Error(x);
} finally {
// 业务人员自己完成
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
// completedAbruptly变量来表明在履行使命进程中是否出现了反常,在processWorkerExit办法中会对该变量的值进行判别。
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- 请注意一个细节: runWorker办法是在什么线程里履行?这有助于问题D的了解,线程池的一个线程中履行
runWorker办法的目的便是履行使命(即task.run())。它首先履行Worker的firstTask,然后再从workQueue行列里取task持续履行。简单来说,便是取使命 履行,取使命 履行,取使命 履行。在这其中,怎样取,履行前中后会做什么作业,如ctl判别,线程中止检测,w.unlock()和w.lock()等都很重要
- 怎样取:这是getTask()办法的作业,稍后说
- ctl判别:runStateAtLeast(ctl.get(), STOP) => 是否 ctl.get() > STOP,当线程被中止了,这个办法才回来true
- w.unlock()和w.lock(),这两个办法操作是用于区别线程是否繁忙/闲暇的?为什么呢
由于:runWorker中线程在履行使命时,线程是时间被线程池管理和监控着呢。线程池能够视状况随时进行封闭(调用shutdown)、马上封闭(调用shutdownNow)、中止(调用tryTerminate)。咱们回头看线程池的状况,其中 SHUTDOWN:不接纳新使命,持续处理行列的使命;STOP:不接纳新使命,不处理行列的使命,中止正在进行的使命。SHUTDOWN状况下要完成的逻辑便是经过 worker 的 lock 完成的。
假定线程正在履行使命,此刻线程池进行封闭,即调用 shutdown 办法。假设没有 worker.lock(),那么正履行使命的线程就会被打断,使命无法持续履行。这样SHUTDOWN状况的逻辑就无法完成了。
一同,w.unlock()和w.lock(),为什么说他们可区别线程是否繁忙/闲暇的呢。试想,worker 相关的线程被 lock 了,阐明正在履行使命呢。所以说此线程是繁忙的线程
注意一点:getTask 办法是在 lock 之外的。所以,堵塞着的线程便是闲暇线程了
worker的lock用于区别线程是否闲暇的。 结合shutdown()办法一同了解,见下文shutdown的部分
从while条件可知:null值对于runWorker()来说有特别用途:告诉获取使命的作业线程完毕并退出,所以getTask办法回来null时是很特别的
履行使命办法 – getTask
private Runnable getTask() {
boolean timedOut = false; // 上一次的poll()的调用是否超时?
for (;;) {
int c = ctl.get();
// 这儿需要记住文章开头的那些状况字段值,才反应的快些
// => c & ~CAPACITY
int rs = runStateOf(c);
/*
* 假设线程池状况rs >= SHUTDOWN,也便是非RUNNING状况,再进行以下判别:
* 1. rs >= STOP,线程池是否正在stop;
* 2. 堵塞行列是否为空。
* 假设以上条件满意,则整个判别条件为true。阐明线程池突然中止,
* 由于假设其时线程池状况的值是SHUTDOWN或以上时,不允许再向堵塞行列中增加使命
*
* rs >= SHUTDOWN 阐明其时线程池至少处于待封闭状况,不再承受新的使命
* rs >= STOP: 阐明不需要在再处理使命了(即使使命在履行)
* 所以,阐明线程池在封闭,那就不履行使命task了
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 线程池要/正在封闭了,所以回来null,这样外层的runWorker办法就能够退出了。
// 所以代码走到这儿含义是此使命的作业线程就要退出了。
// 相应的,ctl当然随之要减一
decrementWorkerCount();
return null;
}
// => c & CAPACITY
int wc = workerCountOf(c);
// timed:其时线程池中的线程数量是否超过了最大数量 或 中心线程允许超时 时为 true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// A:wc > maximumPoolSize什么场景下是true呢?答案是有人动态调整了最大线程数
// B:timeOut:poll超时 或 take被打断,再回到for循环时为 true;timed:如上
// C:wc > 1:线程数大于1
// D:workQueue.isEmpty():行列为空
/// 线程退出条件
// A C 为 true => 线程池里线程太多了,其时线程要退出了
// A D 为 true => 线程池里线程太多了,且 行列里没有使命了,其时线程要退出了
// B C 为 true => 线程池里线程数量大于中心线程数量,且 中心线程允许超时,其时线程要退出了
// B D 为 true => 线程池里线程数量大于中心线程数量,且 中心线程允许超时,且 行列里没有使命了,其时线程要退出了
// 所以线程数量减一。一同return null,意味着此作业线程要退出了
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
// 此线程要退出了,ctl随之减一
if (compareAndDecrementWorkerCount(c))
return null;
// 假设减1失利,则for回来重试
continue;
}
/// 程序第一次履行到这,B:timedOut:必定false
// 假设A:false (C D是啥都行),那么其时线程池会堵塞,直到行列有使命或超时,然后再回到“线程退出条件”代码,依其时条件决议线程是否退出
try {
// 这行语句表达了一个主意:线程只需是存活着的,他就应该履行使命。没有使命时,就等着有使命
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take(); // (1)
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
这儿有个疑问(问题D):getTask 办法的(1)处代码为什么要用堵塞的办法呢,不堵塞的办法不行吗,答案文末给出
addWorker 办法与 runWorker 办法就像是蓄水池中进水管和出水管,剖析的时候需要两个结合的捋逻辑。而 getTask 办法是 runWorker办法中的中心逻辑。而 getTask 办法中的条件判别逻辑更中心。一同,剖析的时候,与shutdown,shutdownNow,tryTerminate,awaitTermination等办法的逻辑相互结合在一同会更清晰
以下是封闭相关的办法
shutdown和shutdownNow办法比较相似,类比图如下
shutdown
调用shutdown()办法会进入 SHUTDOWN 状况。在 SHUTDOWN 状况下,线程池不承受新的使命,但是会持续履行使命行列中已有的使命。 怎样证明它此刻不接纳新的使命了呢,you need to do it by practise
经过调用shutdown()封闭的线程池,封闭今后表现的行为便是不能再提交使命给线程池,但是在封闭前现已提交的使命仍旧会被履行。比及使命行列空了今后线程池才会进入封闭流程
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
shutdown办法中心由三部分组成:
- advanceRunState(SHUTDOWN):改线程池状况为SHUTDOWN
- interruptIdleWorkers():中止线程池中闲暇线程
- tryTerminate():Transitions to TERMINATED state if either (SHUTDOWN and pool and queue empty) or (STOP and pool empty)
advanceRunState
转换线程池状况为入参值,入参值只能是SHUTDOWN or STOP 假设是SHUTDOWN,那么履行完advanceRunState办法后ctl的值>=0,即>=SHUTDOWN。假设其时线程数是3,那么ctl便是3 假设是STOP,那么履行完advanceRunState办法后ctl的值>=536870912,即>=STOP。假设其时线程数是3,那么ctl便是536870912+3
所以,这也证明了线程池的状况是一个规模,而不是一个值,这个规模正如文档开头处所述
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
interruptIdleWorkers
办法很清晰,便是将workers对应的线程中止。从办法的名称就能够知道功能是对闲暇的线程中止。那怎样知道哪些work的线程是闲暇的呢
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
注意这点:w.tryLock(),为啥要试图加锁呢。这时候就要看看runWorker办法了,runWorker履行时是要对worker加锁的(即调用lock)。 一切作业中的线程都需要 lock 加锁(state从0变为1),所以在这儿经过Worker.tryLock()来判别被查看的作业线程是否是闲暇状况(试图将state从0变为1),闲暇就能够发送interrupt()指令。所以,逻辑上看,w.lock => state从0变为1 => Worker.tryLock() 试图将state从0变为1,成果必定是false;成果上看,worker加锁了,便是作业状况,即不是闲暇状况,就不能中止了
tryTerminate
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 线程池为下面三种状况,直接return
// 1.线程池为RUNNING状况,线程池还在运转中,不能中止
// 2.线程池为TIDYING或TERMINATED,由于线程池现已中止了,不必再中止了
// 3.线程池为SHUTDOWN状况 & 线程池行列不为空,行列里有使命,不能中止
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 代码走到这,阐明此刻线程池是STOP状况或(SHUTDOWN状况且线程池行列是空), 线程池里还有线程
if (workerCountOf(c) != 0) { // Eligible to terminate
//这时候或许只需一个闲暇线程了,它是在getTask办法中履行workQueue.take()了的线程,此线程归于闲暇线程(在w.lock()外),它正在堵塞着等待着线程来呢。假设不履行中止会一向堵塞。你或许会说,在前面履行interruptIdleWorkers(false)办法时,会中止一切的闲暇线程,这儿重复履行了吧?试想下假设在履行interruptIdleWorkers(false)时恰好有个作业线程没有闲暇,你刚履行完interruptIdleWorkers(false),那个线程就回到while里去调用了getTask办法,这时workQueue中没有使命了,就会调用workQueue.take()一向堵塞。所以每次在作业线程完毕时调用tryTerminate办法来测验中止那个闲暇作业线程,防止在行列为空时取使命一向堵塞的状况
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将线程池状况设置为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
// 履行terminated后,将线程池状况设置为TERMINATED,线程池完毕
ctl.set(ctlOf(TERMINATED, 0));
// 告诉awaitTermination办法,线程池完毕
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
shutdownNow
调用shutdownNow()会进入 STOP 状况。在 STOP 状况下线程池既不承受新的使命,也不处理现已在行列中的使命。对于还在履行使命的作业线程,线程池会建议中止请求来中止正在履行的使命,一同会清空使命行列中还未被履行的使命。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
shutdownNow办法中心由四部分组成:
- advanceRunState(STOP):改线程池状况为STOP,与advanceRunState(SHUTDOWN)逻辑相同
- interruptWorkers():中止线程池中一切线程,这个与interruptIdleWorkers的差异细细领会,这个办法中止一切现已启动的作业线程,即进行中的使命(履行了w.lock,但还没履行w.unlock),这些线程中止或许成功也或许不成功
- tryTerminate():前面已说完
- drainQueue():从使命行列中取出一切未被履行的使命,未被履行的使命列表会被作为回来值回来给应用程序
interruptWorkers
// Interrupts all threads, even if active
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
有个疑问:interruptWorkers是中止线程池中一切的线程(闲暇的和履行中的总和),但interruptIfStarted()办法仅仅中止履行中的线程。假设你有这个疑问的话,咱们一同看下getState() >= 0这个判别,咱们看下runWorker办法,先履行了w.unlock(),再履行w.lock(),在履行w.unlock(),unlock是把state设置为0,lock把state设置为1,又只需履行了runWorker,那么state的值便是>=等于0的了,所以不论闲暇与否,state总是>=0,所以interruptWorkers办法这时候履行interruptIfStarted办法,中止的便是一切的线程
drainQueue
将workerQueue行列里的worker回来
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
问答
❶ execute办法里为啥要用workQueue.offer(command)这个非堵塞办法呢,而不必put等堵塞办法呢?
answer:
以为execute办法是运转在main线程里的,假设运用堵塞办法,那么后边的使命就无法增加到线程池了
❷ 线程池收到Runnable使命紧就start履行了,为什么还要将使命放入调集(workers.add(w))呢? workers调集存在的含义是什么呢?
answer:
放入workers是为了保存其时的thread和worker,不然后边怎样对worker和thread进行加锁和中止啊,addWorker和runWorker本来便是并行的联系,还要时间监督着shutdown,shutdownNow,terminate之类的动作
❸ workers 与 workerQueue 与 ctl的联系
workers的size应该=ctl.get;workerQueue与ctl没有联系
❹ getTask办法中为啥运用workQueue.poll(num, timeUnit) 和 take()办法,为啥不运用非堵塞办法呢
answer:
getTask里有一个for自旋,一向找使命履行,假设不运用堵塞办法,那么for自旋将一向占着cpu,这个原理和synchronized的晋级原理是相同的
❺ 线程池里寄存的是线程吗
answer:
不是,是Worker,Worker是线程池中的线程和使命task之间的纽带
❻ 线程池运用了两个锁lock,worker运用一个,reentrantLock运用一个,效果别离是什么?
answer:
worker的lock是归于worker的,是为了区别线程是否为闲暇仍是运转中
reentrantLock mainLock 是归于线程池的,是为了完成线程池中各办法之间坚持不竞赛的
❼ 线程池里怎样区别闲暇线程和履行中线程
answer:
worker的thread是否被lock
两个发散性的问题:
❽ 线程池中如何运用ThreadLocal
❾ ThreadPoolExecutor运转在多线程环境中会怎样的
小结
对ThreadPoolExecutor的了解每次都会有新的收获,看似不经意的一行代码,一个判别,实践懂了之后都感叹和震慑作者的编程能力,每次领会之后,都感觉作者的完成技巧都敞开了我以前没见过的窗。
ThreadPoolExecutor特点源码和自己debug实践的比如:ThreadPoolExecutorTest0
参考
深入了解Java线程池:ThreadPoolExecutor
ThreadPoolExecutor完成原理
java-threadpoolexecutor