本文正在参与「金石计划 . 瓜分6万现金大奖」
在作业中,咱们经常运用线程池,可是你真的了解线程池的原理吗?一同,线程池作业原理和底层完成原理也是面试经常问的考题,所以,今日咱们一同聊聊线程池的原理吧。
为什么要用线程池
运用线程池主要有以下三个原因:
- 下降资源耗费。经过重复利用已创立的线程下降线程创立和毁掉形成的耗费。
- 提升响应速度。当使命到达时,使命可以不需求比及线程创立就能立即履行。
- 可以对线程做统一办理。线程是稀缺资源,假如无限制地创立,不仅会耗费体系资源,还会下降体系稳定性,运用线程池可以进行统一分配、调优和监控。
线程池的原理
Java中的线程池顶层接口是Executor
接口,ThreadPoolExecutor
是这个接口的完成类。
咱们先看看ThreadPoolExecutor
类。
ThreadPoolExecutor提供的结构办法
// 七个参数的结构函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
咱们先看看这些参数是什么意思:
- int corePoolSize:该线程池中中心线程数最大值
中心线程:线程池中有两类线程,中心线程和非中心线程。中心线程默许情况下会一向存在于线程池中,即便这个中心线程什么都不干(铁饭碗),而非中心线程假如长时间的搁置,就会被毁掉(临时工)。
- int maximumPoolSize:该线程池中线程总数最大值 。
该值等于中心线程数量 + 非中心线程数量。
- long keepAliveTime:非中心线程搁置超时时长。
非中心线程假如处于搁置状况超越该值,就会被毁掉。假如设置allowCoreThreadTimeOut(true),则会也作用于中心线程。
- TimeUnit unit:keepAliveTime的单位。
TimeUnit是一个枚举类型。
-
BlockingQueue workQueue:堵塞行列,维护着等候履行的Runnable使命目标。
常用的几个堵塞行列:
-
LinkedBlockingQueue:链式堵塞行列,底层数据结构是链表,默许巨细是
Integer.MAX_VALUE
,也可以指定巨细。 -
ArrayBlockingQueue:数组堵塞行列,底层数据结构是数组,需求指定行列的巨细。
-
SynchronousQueue:同步行列,内部容量为0,每个put操作必须等候一个take操作,反之亦然。
-
DelayQueue:延迟行列,该行列中的元素只有当其指定的延迟时间到了,才可以从行列中获取到该元素 。
-
-
ThreadFactory threadFactory
创立线程的工厂 ,用于批量创立线程,统一在创立线程时设置一些参数,如是否看护线程、线程的优先级等。假如不指定,会新建一个默许的线程工厂。
static class DefaultThreadFactory implements ThreadFactory {
// 省掉特点
// 结构函数
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
// 省掉
}
-
RejectedExecutionHandler handler
回绝处理战略,线程数量大于最大线程数就会采用回绝处理战略,四种回绝处理的战略为 :
- ThreadPoolExecutor.AbortPolicy:默许回绝处理战略,丢掉使命并抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy:丢掉新来的使命,可是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢掉行列头部(最旧的)的使命,然后重新测验履行程序(假如再次失利,重复此进程)。
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该使命。
ThreadPoolExecutor的战略
线程池本身有一个调度线程,这个线程便是用于办理布控整个线程池里的各种使命和事务,例如创立线程、毁掉线程、使命行列办理、线程行列办理等等。
故线程池也有自己的状况。ThreadPoolExecutor
类中运用了一些final int
常量变量来表明线程池的状况 ,分别为RUNNING、SHUTDOWN、STOP、TIDYING 、TERMINATED。
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
-
线程池创立后处于RUNNING状况。
-
调用shutdown()办法后处于SHUTDOWN状况,线程池不能承受新的使命,铲除一些闲暇worker,不会等候堵塞行列的使命完成。
-
调用shutdownNow()办法后处于STOP状况,线程池不能承受新的使命,中止一切线程,堵塞行列中没有被履行的使命悉数丢掉。此时,poolsize=0,堵塞行列的size也为0。
-
当一切的使命已终止,ctl记录的”使命数量”为0,线程池会变为TIDYING状况。接着会履行terminated()函数。
-
线程池处在TIDYING状况时,履行完terminated()办法之后,就会由 TIDYING -> TERMINATED, 线程池被设置为TERMINATED状况。
线程池主要的使命处理流程
处理使命的中心办法是execute
,咱们看看 JDK 1.8 源码中ThreadPoolExecutor
是怎么处理线程使命的:
// JDK 1.8
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1.当时线程数小于corePoolSize,则调用addWorker创立中心线程履行使命
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.假如不小于corePoolSize,则将使命添加到workQueue行列。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 2.1 假如isRunning回来false(状况查看),则remove这个使命,然后履行回绝战略。
if (! isRunning(recheck) && remove(command))
reject(command);
// 2.2 线程池处于running状况,可是没有线程,则创立线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3.假如放入workQueue失利,则创立非中心线程履行使命,
// 假如这时创立非中心线程失利(当时线程总数不小于maximumPoolSize时),就会履行回绝战略。
else if (!addWorker(command, false))
reject(command);
}
ctl.get()
是获取线程池状况,用int
类型表明。第二步中,入队行进行了一次isRunning
判别,入队之后,又进行了一次isRunning
判别。
为什么要二次查看线程池的状况?
在多线程的环境下,线程池的状况是时间发生变化的。很有可能刚获取线程池状况后线程池状况就改变了。判别是否将command
加入workqueue
是线程池之前的状况。倘若没有二次查看,假如线程池处于非RUNNING状况(在多线程环境下很有可能发生),那么command
永远不会履行。
总结一下处理流程
- 线程总数量 < corePoolSize,无论线程是否闲暇,都会新建一个中心线程履行使命(让中心线程数量快速到达corePoolSize,在中心线程数量 < corePoolSize时)。注意,这一步需求取得大局锁。
- 线程总数量 >= corePoolSize时,新来的线程使命会进入使命行列中等候,然后闲暇的中心线程会依次去缓存行列中取使命来履行(表现了线程复用)。
- 当缓存行列满了,阐明这个时分使命现已多到爆棚,需求一些“临时工”来履行这些使命了。于是会创立非中心线程去履行这个使命。注意,这一步需求取得大局锁。
- 缓存行列满了, 且总线程数到达了maximumPoolSize,则会采纳上面说到的回绝战略进行处理。
整个进程如图所示:
ThreadPoolExecutor怎么做到线程复用的?
咱们知道,一个线程在创立的时分会指定一个线程使命,当履行完这个线程使命之后,线程自动毁掉。可是线程池却可以复用线程,即一个线程履行完线程使命后不毁掉,持续履行别的的线程使命。那么,线程池怎么做到线程复用呢?
本来,ThreadPoolExecutor在创立线程时,会将线程封装成作业线程worker,并放入作业线程组中,然后这个worker反复从堵塞行列中拿使命去履行。
这儿的addWorker
办法是在上面说到的execute
办法里面调用的,先看看上半部分:
// ThreadPoolExecutor.addWorker办法源码上半部分
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
// 1.假如core是ture,证明需求创立的线程为中心线程,则先判别当时线程是否大于中心线程
// 假如core是false,证明需求创立的是非中心线程,则先判别当时线程数是否大于总线程数
// 假如不小于,则回来false
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
上半部分主要是判别线程数量是否超出阈值,超越了就回来false。咱们持续看下半部分:
// ThreadPoolExecutor.addWorker办法源码下半部分
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 1.创立一个worker目标
w = new Worker(firstTask);
// 2.实例化一个Thread目标
final Thread t = w.thread;
if (t != null) {
// 3.线程池大局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 4.发动这个线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
创立worker
目标,并初始化一个Thread
目标,然后发动这个线程目标。
咱们接着看看Worker
类,仅展示部分源码:
// Worker类部分源码
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
//其余代码略...
}
Worker
类完成了Runnable
接口,所以Worker
也是一个线程使命。在结构办法中,创立了一个线程,线程的使命便是自己。故addWorker
办法调用addWorker办法源码下半部分中的第4步t.start
,会触发Worker
类的run
办法被JVM调用。
咱们再看看runWorker
的逻辑:
// Worker.runWorker办法源代码
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 1.线程发动之后,经过unlock办法开释锁
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 2.Worker履行firstTask或从workQueue中获取使命,假如getTask办法不回来null,循环不退出
while (task != null || (task = getTask()) != null) {
// 2.1进行加锁操作,保证thread不被其他线程中止(除非线程池被中止)
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 2.2查看线程池状况,倘若线程池处于中止状况,当时线程将中止。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 2.3履行beforeExecute
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 2.4履行使命
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 2.5履行afterExecute办法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
// 2.6解锁操作
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
首先去履行创立这个worker时就有的使命,当履行完这个使命后,worker的生命周期并没有完毕,在while
循环中,worker会不断地调用getTask
办法从堵塞行列中获取使命然后调用task.run()
履行使命,然后到达复用线程的目的。只需getTask
办法不回来null
,此线程就不会退出。
当然,中心线程池中创立的线程想要拿到堵塞行列中的使命,先要判别线程池的状况,假如STOP或者TERMINATED,回来null
。
最终看看getTask
办法的完成:
// Worker.getTask办法源码
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 1.allowCoreThreadTimeOut变量默许是false,中心线程即便闲暇也不会被毁掉
// 假如为true,中心线程在keepAliveTime内仍闲暇则会被毁掉。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 2.假如运转线程数超越了最大线程数,可是缓存行列现已空了,这时递减worker数量。
// 假如有设置允许线程超时或者线程数量超越了中心线程数量,
// 并且线程在规守时间内均未poll到使命且行列为空则递减worker数量
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 3.假如timed为true(想想哪些情况下timed为true),则会调用workQueue的poll办法获取使命.
// 超时时间是keepAliveTime。假如超越keepAliveTime时长,
// poll回来了null,上边说到的while循序就会退出,线程也就履行完了。
// 假如timed为false(allowCoreThreadTimeOut为false
// 且wc > corePoolSize为false),则会调用workQueue的take办法堵塞在当时。
// 行列中有使命加入时,线程被唤醒,take办法回来使命,并履行。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
中心线程的会一向卡在workQueue.take
办法,被堵塞并挂起,不会占用CPU资源,直到拿到Runnable
然后回来(当然假如allowCoreThreadTimeOut设置为true
,那么中心线程就会去调用poll
办法,由于poll
可能会回来null
,所以这时分中心线程满足超时条件也会被毁掉)。
非中心线程会workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ,假如超时还没有拿到,下一次循环判别compareAndDecrementWorkerCount就会回来null
,Worker目标的run()
办法循环体的判别为null
,使命完毕,然后线程被体系收回 。
四种常见的线程池
Executors
类中提供的几个静态办法来创立线程池。
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CacheThreadPool
的运转流程如下:
- 提交使命进线程池。
- 由于corePoolSize为0的关系,不创立中心线程,线程池最大为Integer.MAX_VALUE。
- 测验将使命添加到SynchronousQueue行列。
- 假如SynchronousQueue入列成功,等候被当时运转的线程闲暇后拉取履行。假如当时没有闲暇线程,那么就创立一个非中心线程,然后从SynchronousQueue拉取使命并在当时线程履行。
- 假如SynchronousQueue已有使命在等候,入列操作将会堵塞。
当需求履行很多短时间的使命时,CacheThreadPool的线程复用率比较高, 会明显的提高性能。而且线程60s后会收回,意味着即便没有使命进来,CacheThreadPool并不会占用很多资源。
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
中心线程数量和总线程数量相等,都是传入的参数nThreads,所以只能创立中心线程,不能创立非中心线程。由于LinkedBlockingQueue的默许巨细是Integer.MAX_VALUE,故假如中心线程闲暇,则交给中心线程处理;假如中心线程不闲暇,则入列等候,直到中心线程闲暇。
与CachedThreadPool的区别:
- 由于 corePoolSize == maximumPoolSize ,所以FixedThreadPool只会创立中心线程。 而CachedThreadPool由于corePoolSize=0,所以只会创立非中心线程。
- 在 getTask() 办法,假如行列里没有使命可取,线程会一向堵塞在 LinkedBlockingQueue.take() ,线程不会被收回。 CachedThreadPool会在60s后收回。
- 由于线程不会被收回,会一向卡在堵塞,所以没有使命的情况下, FixedThreadPool占用资源更多。
- 都简直不会触发回绝战略,可是原理不同。FixedThreadPool是由于堵塞行列可以很大(最大为Integer最大值),故简直不会触发回绝战略;CachedThreadPool是由于线程池很大(最大为Integer最大值),简直不会导致线程数量大于最大线程数,故简直不会触发回绝战略。
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
有且仅有一个中心线程( corePoolSize == maximumPoolSize=1),运用了LinkedBlockingQueue(容量很大),所以,不会创立非中心线程。一切使命依照先来先履行的顺序履行。假如这个唯一的线程不闲暇,那么新来的使命会存储在使命行列里等候履行。
newScheduledThreadPool
创立一个定长线程池,支撑守时及周期性使命履行。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
四种常见的线程池基本够咱们运用了,可是《阿里巴巴开发手册》不主张咱们直接运用Executors类中的线程池,而是经过ThreadPoolExecutor
的方式,这样的处理方式让写的同学需求更加明确线程池的运转规则,规避资源耗尽的危险。
但假如你及团队本身对线程池十分熟悉,又确定事务规划不会大到资源耗尽的程度(比如线程数量或使命行列长度可能到达Integer.MAX_VALUE)时,其实是可以运用JDK提供的这几个接口的,它能让咱们的代码具有更强的可读性。
小结
在作业中,很多人由于不了解线程池的完成原理,把线程池配置过错,然后导致各种问题。希望你们阅读完本文,可以学会合理的运用线程池。
对于真正想弄懂java并发编程的小伙伴,网上的文章还有视频缺少体系性,我主张大家仍是买点书本看看,我推荐两本我看过的书。
《Java并发编程实战》:这本书浅显易懂地介绍了Java线程和并发,是一本十分棒的Java并发参考手册。
《Java并发编程艺术》:Java并发编程的概念本来就比较复杂,咱们需求的是一本可以把原理解释清楚的书本,而这本《Java并发编程的艺术》书是国内作者写的Java并发书本,刚好就比上面那一本更简单易懂,至少我自己看下来是这样的感觉。