大家好,我是小趴菜,在面试中,咱们经常会被面试官问到线程池的一些问题,比如
- 1:线程池履行流程
- 2:线程池是怎么收回非中心线程
- 3:线程池怎么维护中心线程
- 4:………
今天咱们来剖析一下线程池的全体中心流程,协助大家完全了解线程池的底层中心原理
关于线程池的用法如下,咱们要剖析的进口便是submit()办法,这是将使命提交到线程池的办法
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(1,2,3000,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.submit(() -> {
try {
TimeUnit.SECONDS.sleep(120);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
TimeUnit.SECONDS.sleep(1);
threadPoolExecutor.submit(() -> {
try {
TimeUnit.SECONDS.sleep(120);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
TimeUnit.SECONDS.sleep(1);
threadPoolExecutor.submit(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
threadPoolExecutor.shutdown();
public Future<?> submit(Runnable task) {
//判别提交的使命是否为空,为空就直接抛出空指针反常
if (task == null) throw new NullPointerException();
//将咱们的使命封装成一个RunnableFuture,后续能够经过Future.get()办法获取咱们使命的回来成果
RunnableFuture<Void> ftask = newTaskFor(task, null);
//中心办法,将使命提交给线程池履行
execute(ftask);
return ftask;
}
execute(ftask);最终进入ThreadPoolExecutor这个类中
public void execute(Runnable command) {
//对咱们的使命再次进行判空处理
if (command == null)
throw new NullPointerException();
//拿到咱们线程池中此刻存活的线程数
int c = ctl.get();
//假如存活的线程数小于中心线程数
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//假如此刻线程池存活的线程数不小于中心线程数
//isRunning(c):判别此刻线程池的状况,假如是Running,就能够接纳新的使命
//workQueue.offer(command): 将使命放入行列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//线程池的拒绝策略,也便是拒绝了这个使命
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//假如行列满了,也便是存活线程数不小于中心线程数而且行列也现已满了
//也便是判别线程数是否大于线程池最大的线程数
else if (!addWorker(command, false))
//线程池的拒绝策略,也便是拒绝了这个使命
reject(command);
}
在execute(ftask);办法中,呈现最多的便是addWorker()这个办法,接下来咱们看下这个办法的效果是什么
//firstTask:是咱们要提交给线程池的使命
//core:true表明的是这个线程是中心线程,也便是线程数小于中心线程数时分创立的线程,
false,表明是能够收回的线程,表明此刻线程数现已不小于中心线程数了
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//关于咱们要了解线程池中心原理,这个for(;;)其实没必要去深化了解,咱们能够直接忽略
//咱们先把全体的流程搞理解,关于一些不重要的步骤咱们能够直接忽略
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 判别线程池的状况,有RUNNING,STOP,SHUTDOWN等一些状况,只需不是RUNNING状况
//那么线程池就不会接纳新的使命,所以这儿直接会回来false,可是咱们此刻的
//线程池状况是RUNNING,所以不会回来进入这个if分支
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//拿到此刻线程池存活的线程数
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//咱们直接从这儿开始剖析
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//将咱们的使命封装成一个Worker目标,此刻这儿就创立了一个新的线程
w = new Worker(firstTask);
//从Worker目标中拿到履行当前使命的线程
final Thread t = w.thread;
//判别线程是否为空
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
//这儿仍是判别线程池的状况
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将咱们创立的新的线程放入到workers中
//private final HashSet<Worker> workers = new HashSet<Worker>();
//其实workers便是一个Set调集
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
//开释锁
mainLock.unlock();
}
if (workerAdded) {
//调用线程的start()办法去真实履行咱们的事务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
由于咱们的使命被封装成一个Worker的目标,而Worker又完成了Runnable接口,所以在调用start()办法时分,就会履行它的run()办法
// Worker中的run()办法
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//拿到咱们提交给线程池的使命,注意:此刻task不为空!!!!!!
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//由于此刻的task不为空,所以会直接进入到while循环中去
while (task != null || (task = getTask()) != null) {
w.lock();
//判别线程是否被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//履行使命的前置事务逻辑,由于咱们没有完成,所以这儿不做使命处理
beforeExecute(wt, task);
Throwable thrown = null;
try {
//真实去调用咱们的事务办法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//履行使命的后置事务逻辑,由于咱们没有完成,所以这儿不做使命处理
afterExecute(task, thrown);
}
} finally {
//此刻将task设置为null
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
第一次进入while循环就完毕了,完毕之后将task设置为null,由于这是个while循环,所以会再次进入这个while循环中去
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//再次进入到这个while循环中来,此刻的task现已为null了,所以第一个判别为false
//接下来就会履行task = getTask()办法
while (task != null || (task = getTask()) != null) {
w.lock();
//判别线程是否被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//履行使命的前置事务逻辑,由于咱们没有完成,所以这儿不做使命处理
beforeExecute(wt, task);
Throwable thrown = null;
try {
//真实去调用咱们的事务办法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//履行使命的后置事务逻辑,由于咱们没有完成,所以这儿不做使命处理
afterExecute(task, thrown);
}
} finally {
//此刻将task设置为null
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
//这儿是个死循环
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 假如此刻线程池状况等于RUNING,而且行列现已为空了,就直接回来null
//可是此刻咱们的线程池状况是RUNNING,所以不会进入到这个if分支
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//将线程池的线程数减1
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//allowCoreThreadTimeOut:false 表明假如没有使命,那么中心线程在超时时刻到达之后就会收回
// true:表明中心线程不会被收回
// 咱们能够经过 threadPoolExecutor.allowCoreThreadTimeOut(false);来设置
// allowCoreThreadTimeOut默认值便是false,所以timed的值便是由wc > corePoolSize来操控
//假如此刻存活的线程数大于中心线程数,那么便是true,不然便是false
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 假如此刻存活的线程数大于最大线程数而且行列为空
// 就履行compareAndDecrementWorkerCount(c),将线程数减1,直接回来null,
// 回来null之后,上层while循环就直接退出,然后将这个线程收回
// 这个if的效果便是假如此刻线程数大于最大线程数了,此刻行列又没有使命,这时分就将
// 那么就直接将这些线程收回
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
//假如timed = true:表明此刻的线程数大于中心线程数,可是小于最大线程数,此刻直接调用
// 堵塞行列的poll办法,并在咱们设置的存活时刻之后假如还没有获取到元素,就直接纳回这条线程
// 假如timed = false:表明此刻的线程数小于或等于中心线程数,那么此刻就调用堵塞行列的
// take()堵塞办法,知道行列中有使命
//所以中心线程之所以能存活,便是由于调用了take()办法一直堵塞在这儿,
//非中心线程是有堵塞时刻的,超过这个时刻没有使命就会被收回
try {
Runnable r = timed ?
//非中心线程会收回的中心原理
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//中心线程存活的中心原理
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}