写在前面
文章的前面部分是介绍四种线程池现已怎么运用线程池,后边会对线程池的机制做一个详细的介绍,假如只是为了知道怎么运用线程池,只需求看前面部分即可。
线程池的原理
其实线程池运用的是一种池化的思想,能自动地帮咱们去办理线程,线程的创立和毁掉都依赖于线程池去完成。假如咱们每次都独立地去创立线程,咱们很难对咱们创立过的线程进行一个高效的办理,比如说当咱们的线程履行完之后该怎么毁掉等等问题,而线程池就能够处理这个问题,它是一个容器,里边有一或多个线程在履行,有一个统一的战略去办理里边的线程。官方一点讲便是:线程池的中心思想是经过维护一组线程,合理地办理使命的履行,提高体系的功能和稳定性。经过恰当调整线程池的参数,能够使得线程池适应不同规模和负载的使命需求。
四种线程池的介绍以及运用场景分析
newFixedThreadPool
特色:
- 没有非中心线程数,
中心线程数和最大线程数一致
,由传入参数决议。 -
keepAliveTime(非中心线程存活时刻):0s
,也便是说非中心线程一旦没有使命就立即被毁掉。不过newFixedThreadPool也没有非中心线程,所以线程闲暇也不会被毁掉。 - 堵塞行列:
LinkBlockingQueue
能够创立容纳一定数量的线程池,池子里的一切线程都是中心线程,每个线程存活的时刻都是无限的,当线程数达到中心线程数,就不在增加新的线程,假如中心线程都在繁忙中,新增加的使命会参加堵塞行列。
运用场景:适用于履行长期使命,功能比较好这种线程池,适用于需求限制并发线程数量的场景,能够防止因为线程过多而导致体系负载过大的问题。
newCachedThreadPool
特色:
- 中心线程数:0
- 最大中心线程数:Integer.MAX_VALUE
- keepAliveTime: 60s
- 堵塞行列:SynchrnousQueue
因为线程池的中心线程数为0,所以当一个新的使命交给线程池去履行的时分会刺进SyncchrnousQueue行列中,假如有闲暇的线程就交给闲暇的线程,没有则新建一个线程去履行使命。需求留意的是在一些极点的情况下,太多的使命可能会导致线程池新建太多的线程而导致耗尽cpu的资源。相同的,因为中心线程数为0,所以闲暇的newCachedThreadPool不会占用任何cpu资源。
运用场景: 履行许多短期异步的小程序或许负载较轻的服务器
newSingleThreadPool
特色:
- 中心线程数:1
- 最大线程数:1
- keepAliveTime: 0s
- 堵塞行列:LinkBlockingQueue
创立一个只要一个线程
的线程池,且线程的存活时刻为无限。当线程池现已在履行使命的时分,有新使命参加则参加堵塞行列。
运用场景:适用于需求一个个履行的使命场景
newScheduledThreadPool
特色:
- 中心线程数:由传入的参数决议
- 最大中心线程数:Interger.MAX_VALUE
- keepAliveTime: 0s,非中心线程一旦闲暇就被毁掉
- 堵塞行列:推迟行列
DelayedWorkQueue
,按超时时刻升序排序的行列。
创立一个固定巨细的线程池,线程池内非中心线程一旦闲暇就会被毁掉,线程池能够支持守时及周期性使命履行,假如一切线程均处于繁忙状况,对于新使命会进入DelayedWorkQueue
行列中,这是一种按照超时时刻排序的行列结构,使命履行完后将time修改位下次被履行时刻,然后将使命放回推迟行列中。
运用场景:周期性履行使命的场景
线程池的运用
创立线程池: 运用Executors
类供给的静态办法创立不同类型的线程池选择适合你需求的线程池类型和巨细。
public static ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
提交使命
execute()办法
cachedThreadPool.execute(() -> {
......
});
}
运用线程池的submit()
办法提交需求履行的使命,能够是Runnable
接口或Callable
接口的完成类。
javaCopy code
executor.submit(new MyRunnableTask());
封闭线程池: 在程序结束时,需求显式地封闭线程池,释放资源。
executor.shutdown();
submit() 办法与execute() 办法的差异:
在Java的Executor结构中,Executor接口供给了两种首要的使命提交办法:submit()
和execute()
。它们之间的首要差异在于回来值和反常处理:
1. submit()
办法:
submit()
办法用于提交一个完成了Callable
接口的使命或许一个Runnable
接口的使命。它有以下特色:
-
回来值:
submit()
办法回来一个Future
对象,你能够经过这个Future
对象获取使命的履行成果,或许等候使命履行完成。javaCopy codeExecutorService executor = Executors.newFixedThreadPool(10); Future<String> futureResult = executor.submit(new CallableTask());
-
反常处理: 假如使命抛出了反常,能够经过调用
Future
对象的get()
办法获取到反常,或许运用isDone()
办法来判别使命是否完成。javaCopy codetry { String result = futureResult.get(); // 处理使命履行成果 } catch (InterruptedException | ExecutionException e) { // 处理使命履行过程中的反常 }
2. execute()
办法:
execute()
办法用于提交一个完成了Runnable
接口的使命。它有以下特色:
-
回来值:
execute()
办法没有回来值,它是void
类型的,不供给任何关于使命履行情况的回来信息。javaCopy codeExecutorService executor = Executors.newFixedThreadPool(10); executor.execute(new RunnableTask());
-
反常处理: 假如使命抛出了反常,无法经过
execute()
办法获取到反常。需求在Runnable
的具体完成中进行反常处理,或许运用try-catch
块来捕获反常,避免使命抛出反常后导致线程池中的线程中止。javaCopy codepublic class RunnableTask implements Runnable { @Override public void run() { try { // 使命逻辑 } catch (Exception e) { // 处理使命履行过程中的反常 } } }
综上所述,首要差异在于submit()
办法供给了对使命履行成果的获取和反常处理的才能,而execute()
办法则不供给这些才能,只能用于提交不需求获取成果的简略使命。选择办法应该依据使命的需求来决议是否需求获取回来值和处理反常。
线程池的结构参数
ThreadPoolExecutor结构函数参数:
-
corePoolSize
:中心线程数,即便在闲暇状况,也会在线程池中存在的线程数量,除非设置了allowCoreThreadTimeOut
。 -
maximumPoolSize
:答应在线程池中的最大线程数量。当作业行列已满且池中线程数小于最大线程数时,线程池会创立新线程来处理使命,直抵达到最大线程数为止。 -
keepAliveTime
:非中心线程最长闲暇时刻,超越这个时刻,闲暇的非中心线程会被回收,设置allowCoreThreadTimeOut=true
,相同也会作用在中心线程中。 -
unit
:时刻单位,用于指定keepAliveTime的时刻单位,能够是TimeUnit.MILLISECONDS
、TimeUnit.SECONDS
等。 -
workQueue
:存储将被execute
办法履行的Runnable
使命的堵塞行列。 -
threadFactory
:线程工厂,用于创立线程的工厂。能够自定义线程的名称、优先级等特点。 -
RejectedExecutionHandler
:回绝战略,线程池的饱满战略事件。
ctl
在ThreadPoolExecutor
中维护了一个叫做ctl的AtomicInteger,ctl包含两部分,高三位代表线程池的状况,低29位代表线程池作业线程的数量
线程池共有五种状况:
- 运转状况(RUNNING): 这个状况下的线程池,既接纳新的使命,也处理已增加使命。
- 封闭状况(SHUTDOWN): 这个状况不再承受新的使命,可是会处理现已增加到作业行列中的使命,假如调用了
shutdown()
办法,线程池会切换到这个状况。 - 中止状况(STOPING): 这个状况不再接纳新的使命,也不处理作业行列中的使命,中止正在运转的使命。假如调用了
shutdownNow()
办法,线程池会切换到这个状况。 - 整理状况(TIDYING): 这个状况下的作业线程数量为0,一切使命都现已中止,当线程池切换到这个状况后,会调用
terminated()
办法,这个办法能够被重写以在线程池彻底中止之前履行一些整理操作。 - 中止状况(TERMINATED):线程池彻底中止,不再处理任何使命。
这些状况首要是经过ThreadPoolExecutor
类的内部状况变量来表明和办理的。在线程池的生命周期中,它会依次经历这些状况。具体的状况切换由线程池的内部逻辑来办理,开发者一般无需手动干涉状况的切换。一般,咱们只需求关心线程池的使命提交和封闭操作,线程池会依据使命的提交和封闭来自动办理状况。
堵塞行列
-
ArrayBlockingQueue
: 有界行列,依据数组完成。承受一个int参数来指定行列的巨细,经过FIFO(First-In-First-Out:先入先出)方式读取行列中的使命。 -
LinkedBlockingQueue
: 依据链表完成的堵塞行列,容量能够设置,假如不设置则为无界行列。最大值为Integer.MAX_VALUE。 -
LinkedTransferQueue
:依据链表完成的无界堵塞行列 -
LinkedBlockingDeque
:一个由链表结构组成的双向堵塞行列 -
DelayQueue
:推迟行列,是一个使命守时周期的推迟履行的行列,依据指定的履行时刻从小到大排序,不然依据刺进行列的先后排序。 -
PriorityBlockingQueue
:优先级行列,具有优先级的无界堵塞行列。存入PriorityBlockingQueue
的元素需求完成Comparator
接口,Comparator
接口完成决议了元素存取次序。 -
SynchronousQueue
:同步行列,一个不存储元素的堵塞行列。生产者-消费者模式。在SynchronousQueue
中,一个使命被刺进之后,有必要等候另一个线程调用移除操作,不然刺进操作一向处于堵塞状况。吞吐量一般要高于LinkedBlockingQueue
,newCachedThreadPool
线程池运用了这个行列。
回绝战略
以下是Java中内置的几种回绝战略:
-
AbortPolicy(默许战略): 当使命无法被履行时,会抛出
RejectedExecutionException
反常。javaCopy codeThreadPoolExecutor executor = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() );
-
CallerRunsPolicy: 当使命无法被履行时,会由提交使命的线程履行该使命。这种战略能够降低向线程池提交使命的速度。
javaCopy codeThreadPoolExecutor executor = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy() );
-
DiscardPolicy: 当使命无法被履行时,会默默地丢弃该使命,不会抛出反常也不会履行使命。
javaCopy codeThreadPoolExecutor executor = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy() );
-
DiscardOldestPolicy: 当使命无法被履行时,会丢弃行列中等候时刻最长的使命,然后将新使命参加行列。
javaCopy codeThreadPoolExecutor executor = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy() );
-
自定义回绝战略: 你也能够完成
RejectedExecutionHandler
接口,定义自己的回绝战略。例如:javaCopy codepublic class CustomRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 自定义的回绝战略逻辑 // ... } } ThreadPoolExecutor executor = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new CustomRejectedExecutionHandler() );
使命履行
线程池履行使命,调用execute()办法:
// ThreadPoolExecutor # execute()
public void execute(Runnable command) {
// 使命为空,抛出空指针反常
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1. 判别当时正在运转线程数是否小于中心线程池,是则新创立一个线程履行使命,不然将使命放到使命行列中
if (workerCountOf(c) < corePoolSize) {
// 在addWorker中创立作业线程履行使命
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 判别线程池是否处于运转状况,且判别向使命行列中刺进使命是否成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 假如线程池不处于运转状况,将刚刚增加的使命出队
if (! isRunning(recheck) && remove(command))
reject(command);
// 没有作业线程则创立一个新线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 刺进行列不成功,且当时线程数数量小于最大线程池数量,此刻则创立新线程履行使命,创立失利抛出反常
else if (!addWorker(command, false))
reject(command);
}
线程池履行使命流程如下:
- 当时作业线程数未超越中心线程数时,就直接创立一个中心线程去履行使命
- 当线程池中中心线程数已满,就将使命增加到使命行列
workQueue
中排队等候履行 - 当线程池中中心线程数已满且使命行列workQueue也满时,判别作业线程数是否抵达最大线程数,假如没抵达,创立一个非中心线程去履行使命
- 当线程数量超越最大线程数时,直接选用回绝战略处理,也便是经过
RejectedExecutionHandler
通知使命调用者
流程图如下:
增加作业线程
在线程池使命履行的时分,咱们知道经过调用addWorker()
办法能够向线程池中增加中心线程或非中心线程。
// ThreadPoolExecutor # addWorker()
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
// 1. 首要进行了一次线程池状况的检测:
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//判别当时线程池的状况是不是现已shutdown,假如shutdown了回绝线程参加
//(rs!=SHUTDOWN || first!=null || workQueue.isEmpty())
// 假如rs不为SHUTDOWN,此刻状况是STOP、TIDYING或TERMINATED,所以此刻要回绝请求
// 假如此刻状况为SHUTDOWN,而传入一个不为null的使命,那么需求回绝请求
// 假如状况为SHUTDOWN,同时行列中现已没使命了,那么需求回绝请求
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 2. 假如当时的数量超越了CAPACITY,或许超越了corePoolSize和maximumPoolSize(试core而定),那么需求回绝请求
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 3. compareAndIncrementWorkerCount先将正在运转的线程数 1,数量自增成功则跳出循环,自增失利则继续从头继续循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 4. 将使命封装为一个Worker对象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// workers是HashSet线程不安全的,那么此刻需求加锁
final ReentrantLock mainLock = this.mainLock; //大局锁
//获取大局锁
mainLock.lock();
try {
// 当持有了大局锁的时分,还需求再次查看线程池的运转状况等
int rs = runStateOf(ctl.get());
// 线程池处于运转状况,或许线程池封闭且使命为空
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 线程处于活跃状况,即线程现已开端履行或许还未逝世,正确的线程在这儿应该是还未开端履行的
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//private final HashSet<Worker> workers = new HashSet<Worker>();
// 包含线程池中一切的作业线程,只要在获取了大局锁的时分才能访问它。将新结构的作业线程参加到作业线程调集中
workers.add(w);
// 作业线程数量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 新结构的作业线程参加成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 在被结构为Worker作业线程,且被参加到作业线程调集中后,履行线程使命,留意这儿的start实际上履行Worker中run办法
t.start();
workerStarted = true;
}
}
} finally {
//未能成功创立履行作业线程
if (! workerStarted)
//在发动作业线程失利后,将作业线程从调集中移除
addWorkerFailed(w);
}
return workerStarted;
}
首要完成如下:
- 查看线程池状况,假如线程池不是
RUNNING
状况,则不答应增加作业线程 - 判别线程数是否最大容量,中心线程数是否已满或最大线程数是否已满(依据传入的
core
参数而定),若其间一个条件满意则不答应增加作业线程 - 经过
CAS
操作将作业线程数 1,若失利则重试上述操作 - 将使命封装成一个
Worker
对象 - 将
worker
增加到workers
调集中,因为workers
是一个HashSet,因而需求加锁确保线程安全,然后查看线程池状况是否为RUNNING
状况,或SHUTDOWN
状况且使命为空,然后将worker
增加到workers
调集中,释放锁。假如worker
增加成功,调用对应线程的start()
办法。
Worker
类是ThreadPoolExecutor
的一个内部类,继承AQS
,完成了Runnable
接口,看看Worker
的结构办法:
// ThreadPoolExecutor # Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
// 保存履行使命的线程,从线程工厂获取,创立失利则为Null
final Thread thread;
// 要履行的初始使命,可能为Null
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
// Worker是对firstTask的包装,而且Worker自身便是Runnable
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//经过线程工厂来创立一个线程,将自身作为Runnable传递传递
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
首要保存了传入的使命,然后从线程工厂中创立一个线程。
获取使命
线程发动后,最终会调用到Worker#run()
办法,然后调用runWorker()
办法:
// ThreadPoolExecutor # runWorker()
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
//标识线程是不是反常中止的
boolean completedAbruptly = true;
try {
//task不为null情况是初始化worker时,假如task为null,则去行列中取线程--->getTask()
while (task != null || (task = getTask()) != null) {
//获取worker的锁,防止线程被其他线程中止
w.lock();
// 假如线程池中止,确保线程被中止,不然,确保线程不被中止
// 需求二次查看
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 线程开端履行之前履行此办法,能够完成Worker未履行退出,本类中未完成
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 履行使命
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//使命履行后履行,能够完成标识Worker反常中止的功能,本类中未完成
afterExecute(task, thrown);
}
} finally {
//运转过的task标null
task = null;
w.completedTasks ;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//处理worker退出的逻辑
processWorkerExit(w, completedAbruptly);
}
}
这儿首要查看了线程池的状况,假如线程池处于STOP
状况,则有必要确保线程处于中止状况,首要履行Worker
中的首个使命firstTask
,假如没有或许现已履行过了则调用getTask()
办法从使命行列中获取使命,然后履行。
下面看看getTask()
办法怎么获取使命行列中的使命:
// ThreadPoolExecutor # getTask()
// 行列中获取线程
private Runnable getTask() {
// 判别后边的poll是否要超时
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 标识着当时Worker超时是否要退出
// wc > corePoolSize时需求减小闲暇的Worker数,那么timed为true
// 可是wc <= corePoolSize时,不能减小中心线程数timed为false
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 假如timed为true那么运用poll取使命。假如正常回来,那么回来取到的task
// 假如超时,证明worker闲暇,同时worker超越了corePoolSize,需求删去,回来r=null
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}