一、背景
前段时刻,做了一个用户主动领券的功能,进入小程序主动给用户发放符合条件的券。因为该接口是用户进入小程序必调用的接口,且不需求用户有任何的感知,考虑用户体验觉得这个场景非常适合做异步,然后在多考虑一点性能,线程的创立和毁掉等带来的一点开支大家很简单想到线程池来统一管理,运用线程池都逃脱不了那几个参数的界说,当时设置参数时,只清晰的知道它是吃IO的和回绝战略,然后当被问道我界说这几个数字的依据是什么,其实最初我只是凭一种感觉,觉得这样问题不大、够用,没有去了解过体系用户通常的活跃量,也更没有去好好读过线程池的源码。
二、线程池中心参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize: 中心线程数, 常驻在线程池中的作业线程,默许状况(
allowCoreThreadTimeOut = false
)不会被收回
allowCoreThreadTimeOut=true,可动态的收回中心线程资源,此刻空闲收回时刻由keepAliveTime操控,这种状况下,当有新使命到达时,或许需求从头创立一个新的中心线程来处理使命,存在体系不端的创立和毁掉中心线程
-
maximumPoolSize:最大线程数,线程池中最大容纳的线程数量(扩容机制)
-
keepAliveTime:
allowCoreThreadTimeOut = false
时,非中心线程收回的条件 -
TimeUnit :合作keepAliveTime一同运用
-
ThreadFactory:创立线程的工厂,可设置线程组、线程优先级、特别姓名便利排错
-
RejectedExecutionHandler:超过线程池可以处理的容量维护机制 中心线程都满,使命行列也满,也到达了最大线程,提交的使命会走回绝战略
注:且java 供给
Executors
创立线程池办法,不要运用,了解即可,因为它对线程的操控粒度很低,许多重要参数比方中心线程数,最大线程数,回绝战略等自己无法操控,主张手动创立线程池
三、线程池特点标识及状况流转
//int类型32bit 1、高三位代表线程池的状况;2、低29位线程池中的线程个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//29 便利后边做位运行
private static final int COUNT_BITS = Integer.SIZE - 3;
//位运算得出线程池最大容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池的5种状况
//运行状况,可以接受新的使命且会处理堵塞行列中的使命
private static final int RUNNING = -1 << COUNT_BITS;
//封闭状况,不接受新使命,但是会处理堵塞行列中的使命
private static final int SHUTDOWN = 0 << COUNT_BITS;
//中止状况,不接受新的使命,也不会处理等候行列中的使命而且会中止正在履行的使命
private static final int STOP = 1 << COUNT_BITS;
// 收拾,即一切的使命都中止了,线程池中线程数量等于0
private static final int TIDYING = 2 << COUNT_BITS;
//完毕状况,terminated()办法履行完了
private static final int TERMINATED = 3 << COUNT_BITS;
// 线程池的状况
private static int runStateOf(int c) { return c & ~CAPACITY; }
//线程池线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
四、JDK线程池履行流程
/**
* 提交使命
*
* @param command 使命
*/
public void execute(Runnable command) {
//.....省掉空判别校验代码、健壮性
//int类型32bit 1、高三位代表线程池的状况;2、低29位线程池中的线程个数
int c = ctl.get();
//1、创立中心线程数,作业线程数<中心线程数
if (workerCountOf(c) < corePoolSize) {
//作业线程数<中心线程数,增加一个作业线程为core=true 中心线程,履行提交的使命
if (addWorker(command, true)) {
//此提交使命完毕
return;
}
//(前面if失利,没有return完毕,并发问题)中心线程创立失利,从头获取32位的值
c = ctl.get();
}
//2、线程池中的线程数量大于等于中心线程数
//先判别线程池为running状况(因为shutdown和stop状况线程池是不接纳使命的),将使命增加到堵塞行列中
if (isRunning(c) && workQueue.offer(command)) {
//获取 ctl
int recheck = ctl.get();
// 判别是否是RUNNING状况,不是则移除此次提交的使命,(并发问题,running进来的,非running不接纳此次提交的使命)
if (!isRunning(recheck) && remove(command)) {
//走回绝战略
reject(command);
//假如处于RUNNING状况,而且线程池中正在作业线程数为0,但堵塞行列有使命
} else if (workerCountOf(recheck) == 0) {
//(并发,if running状况进来的,且参加堵塞行列)的 使命,堵塞行列有使命但没有作业线程,则增加一个使命为空的作业线程处理,防止堵塞行列中的使命没有线程处理
addWorker(null, false);
}
// 行列已满,创立非中心线程core=false处理使命
} else if (!addWorker(command, false)) {
//创立非中心线程失利(最大线程已满),走回绝战略
reject(command);
}
}
jdk线程池和tomcat线程池区别
- jdk线程池由源码分析提交使命的第一个第一个if判别,当new 一个线程池后,没有提交使命时,池里边是没有线程的,懒创立过程,tomcat线程池在项目发动时,中心线程数会默许创立的
- 当中心线程数已满都在作业时,有新的使命提交过来时,jdk线程池在使命行列没满时会优先将改使命放进行列,行列已满才创立最大线程;而tomcat线程池在作业线程没有到达最大线程数时会优先创立最大线程数,最大线程也满了在放入行列,相反过程
- jdk线程池尽量保证中心线程处于作业状况,中心线程处理使命,真实不行放进行列,tomcat线程池主要处理网络恳求,归于IO密集型,支撑更改吞吐量,尽量去保证一切的恳求都能得到当即处理
五、Worker封装
线程池中的线程,都会被封装成一个Worker类目标,ThreadPoolExecutor维护的其实就是一组Worker目标,其中用调集workers存储这些Worker目标; Worker 的有参结构传入了我们的使命,并用线程工厂的办法创立了线程,将work自己传入,线程发动start,会调用run办法,run() 办法调用了 runWorker(this),将自己传入进去
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
考虑:线程常驻与毁掉
- 当提交一个使命,作业线程数量未到达中心线程时会创立一个中心线程,使命履行完毕,没有新的使命提交,此中心线程怎么常驻在线程池中?
- 当中心线程没有空闲且堵塞行列已满且作业线程未到达最大线程时,会创立一个非中心线程履行该使命,使命履行完毕,没有新的使命提交过来,该非中心线程空闲到达超时时刻,此非中心线程怎么毁掉?
1、addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//双层for循环意图主要是作业线程数量标识+1
for (;;) {
int c = ctl.get();
//线程池状况
int rs = runStateOf(c);
// 除了RUNNING状况,
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) ){
// 线程池假如不是SHUTDOHN,就代表是STOP或许更高的状况,这时,不需求增加线程处
//假如使命为nul1, 而且线程池状况不是RUNNING,不需求处理
// 假如堵塞行列为空,返同false,外侧的!再次取反,获取true,不需求处理
//创立作业线程失利
return false;
}
for (;;) {
//作业线程个数
int wc = workerCountOf(c);
//当时作业线程>=最大线程数 || wc >=最大线程数或许中心线程数
if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize)){
//创立作业线程失利
return false;
}
//CAS成功 作业线程+1
if (compareAndIncrementWorkerCount(c))
//退出外侧循环
break retry;
//CAS失利从头判别线程池状况
c = ctl.get(); // Re-read ctl
//和最开端外侧获取的状况发生变化,完毕此外侧循环,持续下一次外侧循环
if (runStateOf(c) != rs)
//完毕外侧循环
continue retry;
//不然持续下一次内侧循环
// else CAS failed due to workerCount change; retry inner loop
}
}
//双层for循环作业线程数量+1 成功
//作业线程默许开端失利
boolean workerStarted = false;
//作业线程默许增加失利
boolean workerAdded = false;
//作业线程
Worker w = null;
try {
//将我们的使命传入作业线程
w = new Worker(firstTask);
//从worker目标活动线程目标
final Thread t = w.thread;
if (t != null) {
//线程池大局锁,防止增加使命时,其它线程干掉该线程池
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//线程池状况RUNNING
if (rs < SHUTDOWN ||
//线程池SHUTDOWN创立一个空使命处理堵塞行列中的使命时
(rs == SHUTDOWN && firstTask == null)) {
//线程是运行状况
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//作业线程增加到ashSet<Worker>
workers.add(w);
int s = workers.size();
//作业线程数大于记载的最大作业线程数,交换值
if (s > largestPoolSize)
largestPoolSize = s;
//作业线程增加成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//作业线程增加成功
if (workerAdded) {
//发动作业线程
t.start();
//发动作业线程成功
workerStarted = true;
}
}
} finally {
//作业线程发动失利
if (! workerStarted)
addWorkerFailed(w);
}
//返回作业线程是否发动成功
return workerStarted;
}
2、runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//当时作业线程的使命
Runnable task = w.firstTask;
w.firstTask = null;
//WORk结构涉及的AQS,暂不扩展
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//循环读取线程池中使命行列并履行
//第一次循环 task不为空,直接进入循环体,履行使命,履行完毕将 task = null
//第2次循环 task=null,||线程池的中的使命履行完毕 履行getTask()办法,从线程池堵塞使命行列中获取使命,或许会堵塞完成线程的复用,并履行该使命
while (task != null || (task = getTask()) != null) {
//加锁,防止线程池shutdown,使命也不中止
w.lock();
//状况TIDYING或许TERMINATED
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//履行使命前的操作,依据自己事务重写扩展
beforeExecute(wt, task);
Throwable thrown = null;
try {
//履行使命
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//履行使命前的操作,依据自己事务重写扩展
afterExecute(task, thrown);
}
} finally {
//履行完毕最终将task=null
task = null;
w.completedTasks++;
w.unlock();
}
}
// getTask()=null 中心线程>最大线程时,以超时的时刻堵塞住获取不到使命,超时时刻一过跳出循环
completedAbruptly = false;
} finally {
//获取不到使命的线程,履行该办法
processWorkerExit(w, completedAbruptly);
}
}
3、getTask
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//线程池中的线程数
int wc = workerCountOf(c);
// Are workers subject to culling?
//线程池中的线程数>大于中心线程数,timed=true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
//线程池中的线程数>中心线程数,带有超时时刻的办法堵塞住
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//线程池中的线程数<=中心线程数,行列中没有使命拿取不到会无限堵塞,直到行列有使命,拿取持续履行使命
workQueue.take();
if (r != null){
return r;
}
//线程池中的线程数>中心线程数,带有超时时刻,超时获取不到使命,此刻getTask获取不到使命
timedOut = true;
} catch (InterruptedException retry) {
//堵塞的线程,被中止
timedOut = false;
}
}
}
4、processWorkerExit
/**
* 获取不到使命的线程履行该办法
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
//线程池的状况,不是stop状况,即线程中止,直接完毕,退出,作业线程清理
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
//拿取中心线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty()){
min = 1;
}
//当时线程数>=中心线程数
if (workerCountOf(c) >= min){
//直接毁掉本线程,不会创立新的线程
return; // replacement not needed
}
}
//本线程直接毁掉,新开一个没有使命的线程
addWorker(null, false);
}
}