java结构中例如Tomcat、Dubbo等都离不开线程池,这些结构用到线程的当地,都会用线程池来负责。咱们在运用这些结构的时分,会设置线程池参数,用于进步功用。那么开多少线程合适?今天咱们将围绕这个问题来学习一下线程池。
为什么运用线程池
往常咱们运用java线程的时分,都是直接创立一个Thread
目标,java线程的创立和毁掉都会涉及到Thread
目标的创立和毁掉,线程切换等问题。创立Thread
目标,仅仅是在 JVM 的堆里分配一块内存而已;而创立一个线程,却需求调用操作体系内核的 API,然后操作体系要为线程分配一系列的资源,这个成本就很高了。所以线程是一个重量级的目标,应该防止频频创立和毁掉。
一般能够经过“池化”思想来处理上述的问题,而JDK中供给的线程池完结是根据ThreadPoolExecutor
。
运用线程池能够带来一系列优点:
- 降低资源耗费:经过池化技术重复运用已创立的线程,降低线程创立和毁掉造成的损耗。
- 进步响应速度:使命抵达时,无需等候线程创当即可当即履行。
- 进步线程的可管理性:线程是稀缺资源,假如无限制创立,不只会耗费体系资源,还会因为线程的不合理散布导致资源调度失衡,降低体系的稳定性。运用线程池能够进行共同的分配、调优和监控。
-
供给更多更强大的功用:线程池具有可拓展性,答应开发人员向其中添加更多的功用。比方延时守时线程池
ScheduledThreadPoolExecutor
,就答应使命延期履行或定期履行。
线程池中心设计与完结
总体设计
-
顶层接口是
Executor
,java.util.concurrent.Executor#execute
,用户只需供给Runnable
目标,将使命的运转逻辑提交到履行器(Executor
)中,由Executor
结构完结线程的调配和使命的履行部分。 -
ExecutorService
接口扩展了Executor
并添加了一些能力:- 扩充履行使命的能力,经过调用
submit()
或许invokeAll()
办法能够为一个或一批异步使命生成Future的办法; - 供给了管控线程池的办法,比方调用
shutdown()
等办法中止线程池的运转。
- 扩充履行使命的能力,经过调用
-
AbstractExecutorService
则是上层的抽象类,将履行使命的流程串联了起来,确保下层的完结只需重视一个履行使命的办法即可。 -
详细完结类是
ThreadPoolExecutor
,ThreadPoolExecutor
将会一方面保护本身的生命周期,另一方面一起管理线程和使命,使两者杰出的结合从而履行并行使命。 -
ScheduledThreadPoolExecutor
又扩展了ThreadPoolExecutor
和ScheduledExecutorService
接口,添加了调度能力,使使命能够延时守时履行。 -
别的还有一个供给了线程池创立的工厂办法的类
Executors
,用来创立线程池。
本章首要阐明ThreadPoolExecutor
的完结原理,ScheduledThreadPoolExecutor
下篇会讨论。
ThreadPoolExecutor完结原理
ThreadPoolExecutor结构参数阐明
ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
-
corePoolSize:表明线程池保有的最小线程数。中心线程数,这些中心线程一旦被创立,就不会被毁掉。相反,假如对错中心线程,等使命履行完并长期未被运用则会被毁掉。
-
maximumPoolSize:表明线程池创立的最大线程数。
-
keepAliveTime&unit:一个线程假如在一段时间内,都没有履行使命,阐明很闲,
keepAliveTime
和unit
便是用来界说这个一段时间的参数。也便是说,假如线程现已闲暇了keepAliveTime
和unit
这么久了,而且线程数大于corePoolSize
,那么这个闲暇线程就要被收回。 -
workQueue:用来存储使命,当有新的使命恳求线程处理时,假如中心线程池已满,那么新来的使命会加入workQueue行列中,workQueue是一个堵塞行列。
-
threadFactory:经过这个参数能够自界说怎么创立线程。
-
handler:经过这个参数能够自界说使命的回绝战略。假如线程池中一切的线程都在繁忙,而且作业行列也满了(前提是作业行列是有界行列),那么此时提交使命,线程池就会回绝接纳。至于回绝的战略,能够经过这个参数来指定
ThreadPoolExecutor
现已供给了四种战略。- CallerRunsPolicy:提交使命的线程自己去履行该使命。
- AbortPolicy:默许的回绝战略,会throws RejectedExecutionException.
- DiscardPolicy:直接丢掉使命,没有任何反常输出。
- DiscardOldestPolicy:丢掉最老的使命,其实便是把最早进入作业行列的使命丢掉,然后把新使命加入到作业行列。
ThreadPoolExecutor履行流程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
- 首要检测线程池运转状况,假如不是RUNNING,则直接回绝,线程池要确保在RUNNING的状况下履行使命。
- 假如workerCount < corePoolSize,则创立并发动一个线程来履行新提交的使命。
- 假如workerCount >= corePoolSize,且线程池内的堵塞行列未满,则将使命添加到该堵塞行列中。
- 假如workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的堵塞行列已满,则创立并发动一个线程来履行新提交的使命。
- 假如workerCount >= maximumPoolSize,而且线程池内的堵塞行列已满, 则依据回绝战略来处理该使命, 默许的处理方式是直接抛反常。
线程池运转状况
线程池的运转状况,由线程池内部保护,线程池内部运用AtomicInteger
变量,用于保护运转状况runState
和作业线程数workerCount
,高3位保存runState
,低29位保存workerCount
,两个变量之间互不搅扰。用一个变量去存储两个值,可防止在做相关决策时,出现不共同的状况,不用为了保护两者的共同,而占用锁资源。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS=29,(对于int长度为32来说)表明线程数量的字节位数
private static final int COUNT_BITS = Integer.SIZE - 3;
// 状况掩码,高三位是1,低29位满是0,能够经过 ctl&COUNT_MASK 运算来获取线程池状况
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS; // 111 00000 00000000 00000000 00000000;
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000 00000 00000000 00000000 00000000;
private static final int STOP = 1 << COUNT_BITS; // 001 00000 00000000 00000000 00000000;
private static final int TIDYING = 2 << COUNT_BITS; // 010 00000 00000000 00000000 00000000;
private static final int TERMINATED = 3 << COUNT_BITS; // 011 00000 00000000 00000000 00000000;
// 核算当时运转状况
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
// 核算当时线程数量
private static int workerCountOf(int c) { return c & COUNT_MASK; }
//经过状况和线程数生成ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
状况 | 描绘 |
---|---|
RUNNING | 能接受新的使命,也能处理堵塞行列中的使命 |
SHUTDOWN | 封闭状况,不能接受新的使命,只能处理堵塞行列中的使命 |
STOP | 不能接受新的使命,也不能处理堵塞行列中的使命,会中止正在处理使命的线程 |
TIDYING | 一切使命都中止了,workerCount为0 |
TERMINATED | 在履行terminated()办法会进入到这个状况 |
状况搬运:
堵塞行列
再介绍线程池总体设计的时分,说过线程池的设计,采用的都是生产者 – 顾客形式,其完结首要便是经过BlockingQueue
来完结的,意图是将使命和线程两者解耦,堵塞行列缓存使命,作业线程从堵塞行列中获取使命。
运用不同的行列能够完结不一样的使命存取战略。在这里,咱们能够再介绍下堵塞行列的成员:
堵塞行列 | 描绘 |
---|---|
ArrayBlockingQueue |
根据数组完结的有界行列,支撑公正锁和非公正锁 |
LinkedBlockingQueue |
根据链表完结的有界行列,行列巨细默许为Integer.MAX_VALUE ,所以默许创立该行列会有容量危险 |
PriorityBlockingQueue |
支撑优先级排序的无界行列,不能确保同优先级的顺序 |
DelayQueue |
根据PriorityBlockingQueue 完结的延期行列,只有当延时期满了,才能从中取出元素 |
SynchronousQueue |
同步行列,不存储任何元素,调用一次put() 就必须等候take() 调用完。支撑公正锁和非公正锁 |
LinkedTransferQueue |
根据链表完结的无界行列,多了transfer() 和tryTransfer() 办法 |
LinkedBlockingDeque |
根据双向链表完结的行列,多线程并发时,能够将锁的竞赛最多降到一半 |
Worker
Worker整体设计
-
Worker
继承了AQS
,运用AQS来完结独占锁这个功用。没有运用可重入锁ReentrantLock,而是运用AQS,为的便是完结不可重入的特性去反应线程现在的履行状况。
-
Worker
完结了Runnable
接口,持有一个线程thread
,一个初始化的使命firstTask
。thread
是在调用结构办法时经过ThreadFactory
来创立的线程,能够用来履行使命;
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;//Worker持有的线程
Runnable firstTask;//初始化的使命,能够为null
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
// ...省略其他代码
}
Worker怎么添加使命
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker()
办法有两个参数:
-
firstTask
用它来保存传入的第一个使命,这个使命能够有也能够为null
。假如这个值对错空的,那么线程就会在发动初期当即履行这个使命,也就对应中心线程创立时的状况;假如这个值是null
,那么就需求创立一个线程去履行workQueue
中的使命,也就对错中心线程的创立。 -
core
参数为true
表明在新增线程时会判断当时活动线程数是否少于corePoolSize
,false
表明新增线程前需求判断当时活动线程数是否少于maximumPoolSize
。
详细流程如下:
Worker怎么获取使命
使命的履行有两种可能:一种是使命直接由新创立的线程履行。另一种是线程从使命行列中获取使命然后履行,履行完使命的闲暇线程会再次去从行列中恳求使命再去履行。
第一种在上述addWorker()
办法中,假如firstTask
不为空的话,会直接运转。第二种firstTask
为空,使命将从workQueue
中获取,调用getTask()
办法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
详细流程:
Worker怎么运转使命
// java.util.concurrent.ThreadPoolExecutor#runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
详细流程:
- while循环不断地经过
getTask()
办法获取使命。 - 假如线程池正在中止,那么要确保当时线程是中止状况,否则要确保当时线程不是中止状况。
- 履行使命。
- 假如
getTask
结果为null
则跳出循环,履行processWorkerExit()
办法,毁掉线程。
Worker线程怎么收回
线程的毁掉依靠JVM自动的收回,但线程池中中心线程是不能被jvm收回的,所以当线程池决议哪些线程需求收回时,只需求将其引证消除即可。Worker被创立出来后,就会不断地进行轮询,然后获取使命去履行,中心线程能够无限等候获取使命,非中心线程要限时获取使命。当Worker无法获取到使命,也便是获取的使命为空时,循环会结束,Worker会主动消除本身在线程池内的引证。
其首要逻辑在processWorkerExit()
办法中
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
详细流程:
运用线程池最佳实践
Executors
考虑到ThreadPoolExecutor
的结构函数完结有些复杂,所以java供给了一个线程池的静态工厂类,Executors
,运用Executors
能够快速创立线程池。但是大厂都不主张运用Executors
,原因:Executors
的很多办法默许运用的是无参结构的LinkedBlockQueue
,默许巨细为Integer.MAX_VALUE
,高负载状况下,行列很简单导致OOM。而OOM了就会导致一切恳求都无法处理。强烈主张运用ArrayBlockingQueue有界行列。
运用有界行列,当使命过多时,线程池会触发履行回绝战略,线程池默许的回绝战略会throw RejectedExecutionException
这个运转时反常,所以开发人员很简单疏忽,因此默许回绝战略需求慎重运用。假如线程处理的使命非常重要,主张自界说回绝战略,实际开发中,自界说回绝战略往往和降级战略配合运用。
下面介绍常用的办法:
newFixedThreadPool()
-
newFixedThreadPool()
函数用来创立巨细固定的线程池。 -
ThreadPoolExecutor
中的maximumPoolSize
跟corePoolSize
相等,因此,线程池中的线程都是中心线程,一旦创立便不会毁掉。 -
workQueue为LinkedBlockingQueue
,默许巨细为Integer.MAX_VALUE
,巨细非常大,相当于无界堵塞行列。使命能够无限的往workQueue
中提交,永久都不会触发回绝战略。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newSingleThreadExecutor()
-
newSingleThreadExecutor()
函数用来创立单线程履行器。 -
ThreadPoolExecutor
中的maximumPoolSize
跟corePoolSize
都等于1。 -
workQueue
同样是巨细为Integer.MAX_VALUE
的LinkedBlockingQueue
。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newCachedThreadPool()
-
newCachedThreadPool()
函数创立的线程池只包括非中心线程,线程闲暇60秒以上便会毁掉。 -
workQueue
是SynchronousQueue
类型的,而SynchronousQueue
是长度为0的堵塞行列,所以,workQueue
不存储任何等候履行的使命。- 假如线程池内存在闲暇线程,那么新提交的使命会被闲暇线程履行
- 假如线程池内没有闲暇线程,那么线程池会创立新的线程来履行新提交的使命。
-
线程池巨细为
Integer.MAX_VALUE
,因此,线程池中创立的线程个数能够非常多。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
反常捕获
运用线程池,还需求注意反常处理的问题,经过ThreadPoolExecutor
目标的execute()
办法履行使命时,假如在使命履行期间出现运转时反常,会导致使命的线程终止,但是你却获取不到任何告诉,这会让你误认为使命都履行得很正常。虽说线程池供给了很多用于反常处理的办法,但是最保险和简单的方案还是捕获反常信息,并按需处理。
配置线程池参数
从使命的优先级,使命的履行时间长短,使命的性质(CPU密布/ IO密布),使命的依靠关系这四个视点来剖析。而且近可能地运用有界的作业行列。
性质不同的使命可用运用不同规模的线程池分开处理:
- CPU密布型: 尽可能少的线程,Ncpu+1
- IO密布型: 尽可能多的线程, Ncpu*2,比方数据库连接池
- 混合型: CPU密布型的使命与IO密布型使命的履行时间差别较小,拆分为两个线程池;否则没有必要拆分。
参阅动态参数化github.com/shawn-happy…
参阅文档
美团线程池