ThreadPoolExecutor
线程池详解
1. 布景
项目最近的迭代中运用到了ThreadPoolExecutor
线程池,之前都只是知道怎样用,没有了解过线程池的底层原理,项目刚上线,有时刻整理一下线程池的用法,学习一下线程池的底层实现与作业原理。
2. ThreadPoolExecutor
作业原理
2.1 构造办法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
2.2 线程池的运用
worker
/**
* @author itender
* @date 2023/8/7 14:41
* @desc
*/
public class Worker implements Runnable {
private String command;
public Worker(String s) {
this.command = s;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + command + " startTie = " + DateUtil.now());
processCommand();
System.out.println(Thread.currentThread().getName() + command + " endTime = " + DateUtil.now());
}
private void processCommand() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + command +" 处理使命逻辑。。。。。。。。");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
@Override
public String toString() {
return this.command;
}
}
- 线程池
/**
* @author itender
* @date 2023/8/7 14:37
* @desc
*/
public class ThreadPoolExecutorDemo {
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final Long KEEP_ALIVE_TIME = 1L;
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
// 中心线程数 5
CORE_POOL_SIZE,
// 最大线程数 10
MAX_POOL_SIZE,
// 超越中心线程数,线程最大存活时刻
KEEP_ALIVE_TIME,
// 时刻单位
TimeUnit.MINUTES,
// 作业行列最大值
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
// 线程工厂,创立线程的时分运用
r -> {
Thread thread = new Thread(r);
thread.setName("pool-");
return thread;
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
for (int i = 0; i < 10; i++) {
// 创立使命
Worker myRunnable = new Worker("" + i);
// 履行使命
threadPoolExecutor.execute(myRunnable);
}
// 栽培线程池,不承受新使命,但是有作业线程处理行列中的使命
threadPoolExecutor.shutdown();
while (!threadPoolExecutor.isTerminated()) {
}
System.out.println("Finished All Threads!");
}
}
2.3 中心参数
2.3.1 中心参数详解
- corePoolSize:中心线程数,使命行列没有到达行列最大容量时,最大能够一起运转的线程数。
- maximumPoolSize:最大线程数。当使命行列中存储的使命到达行列的容量时,当时能够一起运转的线程数量变为最大线程数。
-
keepAliveTime:线程池中的线程数量超越
corePoolSize
时,假如没有新使命提交,中心线程外的线程不会当即销毁,而是等候,直到等候的时刻超越了keepAliveTime
才会被销毁收回。 -
unit:
keepAliveTime
参数的时刻单位。 - workQueue:作业行列。当有新的使命提交的时分,会先判别当时运转的线程数是否到达中心线程数,假如到达中心线程数,则会把新提交的使命放到作业行列中。
- threadFactory:线程工厂,创立新的线程时会运用。
- handler:回绝战略。
2.3.2 回绝战略
假如当时一起运转的线程数量到达最大线程数量而且行列也现已被放满了使命时,ThreadPoolTaskExecutor
界说一些战略:
-
AbortPolicy:默许回绝战略。抛出
RejectExecutionException
来回绝新使命的处理。 - CallerRunsPolicy:调用当时提交使命的线程来履行使命。一般不希望使命丢掉会选用这种战略,但从实践视点来看,原来的异步调用意图会退化为同步调用。
- DiscardPolicy:不处理新使命,直接丢掉。
- DiscardOldestPolicy:丢掉最早的未处理的使命。
2.4 履行流程
2.5 线程池状况
2.5.1 线程池中心特点ctl
// ctl实质是 Integer 型变量,进行了原子性的封装
// ctl表明两种状况:
// 高3位:线程池当时的状况
// 低29位:线程池当时作业线程的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS 的值为 29(整型Integer.SIZE = 32 位);
private static final int COUNT_BITS = Integer.SIZE - 3;
// CAPACITY = (1 << 29) - 1; 1左移29位,减去1;即1*2^29-1;
// 0001 1111 1111 1111 1111 1111 1111 1111
// 低29位用来表明线程池的最大线程容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 高3位用来表明线程池5种状况
// 111 运转状况
private static final int RUNNING = -1 << COUNT_BITS;
// 000 shutdown状况
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 中止状况
private static final int STOP = 1 << COUNT_BITS;
// 010 过渡状况
private static final int TIDYING = 2 << COUNT_BITS;
// 011 中介状况
private static final int TERMINATED = 3 << COUNT_BITS;
// 依据ctl的值,核算当时线程池的状况
// 核算方式:c 与 非capacity
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 依据ctl的值,核算线程池当时运转的线程的容量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 经过运转状况和作业线程数核算ctl的值,或运算
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* Attempts to CAS-increment the workerCount field of ctl.
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* Attempts to CAS-decrement the workerCount field of ctl.
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* Decrements the workerCount field of ctl. This is called only on
* abrupt termination of a thread (see processWorkerExit). Other
* decrements are performed within getTask.
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
2.5.2 状况切换
- RUNNING :能承受新提交的使命,而且也能处理堵塞行列中的使命。
-
SHUTDOWN:封闭状况,不再承受新提交的使命,但却能够持续处理堵塞行列中已保存的使命。在线程池处于 RUNNING 状况时,调用
shutdown()
办法会使线程池进入到该状况。(finalize()
办法在履行过程中也会调用shutdown()
办法进入该状况)。 -
STOP:不能承受新使命,也不处理行列中的使命,会中止正在处理使命的线程。在线程池处于 RUNNING 或 SHUTDOWN 状况时,调用
shutdownNow()
办法会使线程池进入到该状况。 -
TIDYING:假如所有的使命都已终止了,
workerCount
(有效线程数) 为0,线程池进入该状况后会调用terminated()
办法进入 TERMINATED 状况。 -
TERMINATED:在
terminated()
办法履行完后进入该状况,默许terminated()
办法中什么也没有做。
3. 源码分析
3.1 execute
办法
- 源码
public void execute(Runnable command) {
// 判别使命是否为空,假如使命为空,抛出空指针异常
if (command == null)
throw new NullPointerException();
// 获取ctl特点
int c = ctl.get();
// 判别当时作业线程数量是否小于中心线程的数量
if (workerCountOf(c) < corePoolSize) {
// 作业线程数小于中心线程数,创立一个中心线程履行command使命
if (addWorker(command, true))
// 创立中心线程成功,直接回来
return;
// 并发状况下增加中心线程失利,需求从头获取ctl特点
c = ctl.get();
}
// 创立中心线程失利,当时作业线程数量大于或等于中心线程数量corePoolSize
// 判别线程池的状况是否为running,假如是增加使命到作业行列中(放入使命失利回来false)
if (isRunning(c) && workQueue.offer(command)) {
// 使命增加到行列成功,再次获取ctl特点
int recheck = ctl.get();
// 二次查看,判别线程池的状况是否为running,假如不是行列中移除刚刚增加的使命
if (!isRunning(recheck) && remove(command))
// 履行回绝战略
reject(command);
// 1.使命增加到行列
// 2.线程池或许是running状况
// 3.传入的使命或许从使命行列中移除失利(移除失利的唯一或许就是使命现已被履行了)
// 判别作业线程数量是否为0
else if (workerCountOf(recheck) == 0)
// 作业线程数量为0
// 作业行列中有使命在排队,增加一个空使命,创立非中心线程履行行列中等候的使命
addWorker(null, false);
}
// 创立中心线程失利,
// 线程池状况不是running状况
// 线程池或许是running状况,但是使命行列现已满了
// 增加使命到作业行列失利,创立非中心线程履行使命
else if (!addWorker(command, false))
// 创立非中心线程失利,履行回绝战略
reject(command);
}
第一点中心:经过execute办法源码能够看出线程池详细的履行流程,以及一些避免并发状况的判别。
第二点中心:线程池为什么会增加空使命非中心线程到线程池。
这里是一个疑惑点:为什么需求二次查看线程池的运转状况,当时作业线程数量为0,尝试创立一个非中心线程而且传入的使命目标为null?这个能够看API注释:
假如一个使命成功加入使命行列,咱们仍然需求二次查看是否需求增加一个作业线程(由于所有存活的作业线程有或许在最后一次查看之后现已完结)或许履行当时办法的时分线程池是否现已shutdown了。所以咱们需求二次查看线程池的状况,有必要时把使命从使命行列中移除或许在没有可用的作业线程的前提下新建一个作业线程。
3.2 addWorker
办法
- 源码
private boolean addWorker(Runnable firstTask, boolean core) {
// for循环标识
// 对线程池当时状况和当时作业线程数量的判别
retry:
for (;;) {
// 获取线程池的状况
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取线程池作业线程的数量
int wc = workerCountOf(c);
// 1. 假如传入的core为true,表明即将创立中心线程,经过wc和corePoolSize判别,假如wc >= corePoolSize,则回来false表明创立中心线程失利
// 2. 假如传入的core为false,表明即将创非建中心线程,经过wc和maximumPoolSize判别,假如wc >= maximumPoolSize,则回来false表明创立非中心线程失利
// core参数为false阐明作业行列现已满了,线程池巨细变为maximumPoolSize最大线程数
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS更新作业线程数wc,原子操作将workCount的数量加1,更新成功则直接跳出最外层循环
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS更新作业线程数失利,判别线程池的状况是否从running编程shutdown,假如线程池的状况改动了在履行上面的操作
c = ctl.get(); // Re-read ctl
// 假如线程池状况现已变成shutdown,越过最外层本次循环,履行下一次循环
if (runStateOf(c) != rs)
continue retry;
// 假如线程池状况仍然是RUNNING,CAS更新作业线程数wc失利阐明有或许是并发更新导致的失利,则在内层循环重试即可
// else CAS failed due to workerCount change; retry inner loop
}
}
// 作业线程是否发动成功
boolean workerStarted = false;
// 作业线程是否创立成功
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 加锁,由于会改动一些指标值和非线程安全的调集
final ReentrantLock mainLock = this.mainLock;
// 加锁
mainLock.lock();
try {
// 获取线程池状况
int rs = runStateOf(ctl.get());
//rs < SHUTDOWN 假如线程池状况仍然为RUNNING,而且线程的状况是存活的话,就会将作业线程增加到作业线程调集中
//(rs=SHUTDOWN && firstTask == null)假如线程池状况小于STOP,也就是RUNNING或许SHUTDOWN状况下,一起传入的使命实例firstTask为null,则需求增加到作业线程调集和发动新的Worker
// 对于2,换言之,假如线程池处于SHUTDOWN状况下,一起传入的使命实例firstTask不为null,则不会增加到作业线程调集和发动新的Worker
// 这一步其实有或许创立了新的Worker实例但是并不发动(暂时目标,没有任何强引用),这种Worker有或许成功下一轮GC被收集的垃圾目标
// firstTask == null证明只新建线程而不履行使命
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将新建的作业线程增加到作业线程的调集
workers.add(w);
// 更新当时作业线程的最大容量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 作业线程是否增加成功
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
// 假如成功增加作业线程,则调用Worker内部的线程实例t的Thread#start()办法发动真实的线程实例
if (workerAdded) {
// 发动线程,标识线程发动成功
t.start();
workerStarted = true;
}
}
} finally {
// 线程发动失利,需求从作业线程中移除对应的Worker
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
4. 线程池常见问题
4.1 execute()
和submit()
的区别
-
execute()
办法用于提交不需求回来值的使命,所以无法判别使命是否被线程池履行成功与否; -
submit()
办法用于提交需求回来值的使命。线程池会回来一个Future
类型的目标,经过这个Future
目标能够判别使命是否履行成功,而且能够经过Future
的get()
办法来获取回来值,get()
办法会堵塞当时线程直到使命完成,而运用get(long timeout,TimeUnit unit)
办法的话,假如在timeout
时刻内使命还没有履行完,就会抛出java.util.concurrent.TimeoutException
。
4.3 堵塞行列的效果
- 一般的行列只能是有限长度的缓冲区,一旦超出缓冲长度,就无法保留了。堵塞行列经过堵塞能够保留住当时想要持续入队的使命。
- 堵塞行列能够在行列中没有使命时,堵塞想要获取使命的线程,使其进入wait状况,释放cpu资源。
- 堵塞行列带有堵塞和唤醒的功用,不需求额外处理,无使命履行时,线程池使用堵塞行列的take办法挂起,然后维持中心线程的存活,不至于一直占用cpu资源。
4.2 为什么先增加行列而不是先创立最大线程
- 在创立新线程的时分,是要获取全局锁的,这时分其他线程会被堵塞,影响全体效率。
- 在中心线程已满时,假如使命持续增加那么放在行列中,等行列满了而使命还在增加那么就要创立暂时线程了,这样代价低。
5. 参考文章
www.throwx.cn/2020/08/23/…
javaguide.cn/java/concur…