参阅文章:/post/698321…
一、Java线程池的根本运用
在Java中能够经过如下两种方式运用线程池,不过终究都是运用到ThreadPoolExecutor。
1、直接运用ThreadPoolExecutor
可经过ThreadPoolExecutor的结构函数创立出线程池的实例目标,并调用其execute函数进行使命添加。代码完成如下:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
10,//corePoolSize
20,//maximumPoolSize
120,//keepAliveTime
TimeUnit.SECONDS,//keepAliveTime时刻单位
new ArrayBlockingQueue<>(10)//等候行列
);
//将使命添加到该线程池中
threadPoolExecutor.execute(() -> {
//do something
});//留意这儿运用时lambda表达式
这儿简单说一下lambda表达式,留意遵从如下几个准则即可,详细可参阅该篇博客。
(1)需求函数式接口(即一个接口中只要一个函数);留意必定要是接口;
(2)lambada表达式->前面的是形参,形参可省掉掉参数类型;假如只要一个形参则能够省掉掉括号;
(3)->后边的是函数体,假如只要一行代码则可省掉掉大括号,假如该一行代码是回来语句则省掉掉大括号的一起也需求省掉掉return关键词。
2、运用Executors
经过该类供给的函数,能够创立如下几种类型的线程池;不过其底层依然运用的是ThreadPoolExecutor。
//该线程池只包含一个线程,一切使命顺序履行;适用于需求顺序履行使命的场景
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
//创立固定线程数量线程池,即该线程池中线程数量固定,假如线程池中线程都处于忙碌状况,则新进来的使命需求进入到堵塞行列中,直到线程闲暇停止。
//适用于需求约束线程数量的场景
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
//可缓存线程池是一个依据需求主动调整巨细的线程池;适用于履行大量的短期异步使命的场景
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
//定时使命线程池,底层运用的是ScheduledThreadPoolExecutor,不过该类也继承至ThreadPoolExecutor;
//用于履行定时使命和周期性使命
ExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
如下以函数newCachedThreadPool为例子,学习一下Executors创立线程池的底层完成。从其源码能够知道其也是经过ThreadPoolExecutor类创立的线程池目标,仅仅其中的各个参数现已定义好了而已。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
二、ThreadPoolExecutor源码
1、ThreadPoolExecutor结构函数
/**
* corePoolSize:线程池中中心线程数量,一般来说中心线程会一向存在,即便处于闲暇状况;
* maximumPoolSize:线程池中最大线程数量;当使命量超越中心线程数量并且使命行列已满,线程池还能创立新的线程来处理使命,可是线程数(包含中心线程数量)不会超越该值;
* keepAliveTime:非中心线程在闲暇状况下能够存活的时刻;
* unit:存活时刻单位(ms、s、h等);
* workQueue:等候履行的使命列表(先进先出);
* threadFactory:线程创立工厂,可运用默许的,也能够自定义;
* handler:使命超越 maximumPoolSize+使命行列巨细回调函数,即无法处理新的使命;也称回绝战略。
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//各个参数合法性判别
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
2、execute函数
经过调用该函数以提交新的使命到线程池中进行处理。因此以该函数的源码完成作为学习线程池源码的进口点。代码如下:
public void execute(Runnable command) {
//不答应提交的使命为空
if (command == null)
throw new NullPointerException();
//获取当时线程池状况+线程数量(详细可阅览开头的参阅文章)
int c = ctl.get();
//判别当时线程数是否小于中心线程数
if (workerCountOf(c) < corePoolSize) {
//假如小于中心线程数则创立新的线程并直接履行
//这儿参数true表明当时需求创立的线程是中心线程(用于判别当时答应的最大线程数量)
if (addWorker(command, true))
return;
//使命履行失利,则从头获取线程池状况+当时线程数
c = ctl.get();
}
//首先判别线程池是否处于运转状况,假如是则将当时使命添加到使命行列中
if (isRunning(c) && workQueue.offer(command)) {
//从头获取线程池状况标识
int recheck = ctl.get();
//假如线程池没有运转,则从行列中移除该使命,并履行回绝战略
if (! isRunning(recheck) && remove(command))
reject(command);
//当时正在运转线程为0
//则需求发动一个中心线程以运转使命行列中的使命
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//使命行列已满,则发动非中心线程,false标识发动非中心线程
//假如使命运转失利,则履行回绝战略
else if (!addWorker(command, false))
reject(command);
}
如上代码可缩减为如下几个步骤:
- 判别是否能够创立中心线程以履行使命
- 将使命添加到使命行列
- 创立非中心线程履行使命
- 使命履行失利则履行回绝战略
3、addWorker函数
该函数主要有两个功用:(1)经过CAS将当时线程数量+1;(2)创立新的线程并添加到作业行列中,一起立马开端履行当时传递进来的使命;
private boolean addWorker(Runnable firstTask, boolean core) {
retry://类似于goto
//获取当时线程池状况+线程数量
for (int c = ctl.get();;) {
//假如当时线程池状况是RUNNING则能够持续履行使命
//不然假如当时线池状况是STOP、TIDYING或许TERMINATED或许待履行使命为空或许作业行列为空,则直接回来false
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
//经过CAS将线程数量+1,直到修正成功停止
for (;;) {
//依据当时是否是中心线程挑选当时答应的最大线程数量
//假如超越答应的最大线程数量则直接回来false
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
//经过CAS将线程数量+1
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
//CAS修正线程数量失利,再次判别线程池状况
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//该类为ThreadPoolExecutor内部类,继承了AbstractQueuedSynchronizer类并且完成了Runnable类
//并且在其结构函数中会调用所供给的工厂创立新的线程
w = new Worker(firstTask);
//获取创立的新线程
final Thread t = w.thread;
if (t != null) {
//加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int c = ctl.get();
//判别线程池是否处于运转状况或许可履行使命状况
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
//假如线程状况不是处于新创立状况则直接抛出反常
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
//将新创立的worker添加到作业行列中
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
//释放锁
} finally {
mainLock.unlock();
}
//假如是新创立的线程则直接开端履行使命
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//没有发动新的使命或许发动失利,则将新的使命从使命行列移除,并将线程数量-1
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
3、Worker类源码
该类比较简单,其结构函数中会存储当时需求运转的使命(可为空);并经过供给的工厂创立新的线程,一起将当时目标引证作为Runnable使命传递给该线程;当上述addWorker函数中调用t.start()就会调用到该类的run函数中以履行后续的逻辑。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
@SuppressWarnings("serial") // Unlikely to be serializable
final Thread thread;
@SuppressWarnings("serial") // Not statically typed as Serializable
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
//当时需求履行的使命
this.firstTask = firstTask;
//经过供给的工厂创立新的线程,留意这儿将当时类的引证传递给了Thread作为Runnable使命
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
//省掉掉部分源码
......
}
4、runWorker函数
如下代码便是当时线程开端履行使命了;(1)首先判别worker是否存在需求履行的使命,假如有则优先履行该使命;(2)不然从行列中获取使命进行履行;
final void runWorker(Worker w) {
//获取当时线程
Thread wt = Thread.currentThread();
//获取当时worker中需求履行的使命(优先级最高)
Runnable task = w.firstTask;
//将worker中需求履行使命设置为空,避免后续重复履行
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
//首先判别当时worker中需求履行的使命是否为空,假如不为空则履行该使命
//不然从使命列表中获取使命进行履行,留意getTask会堵塞当时线程
while (task != null || (task = getTask()) != null) {
//加锁
w.lock();
//判别当时线程池以及线程是否能够持续履行使命
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//该函数未完成
beforeExecute(wt, task);
try {
//履行使命
task.run();
//该函数未完成
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//完毕当时线程
processWorkerExit(w, completedAbruptly);
}
}
5、getTask函数
在该函数中会依据是否答应超时收回线程以设置使命获取堵塞时刻,假如是中心线程则会一向堵塞,非中心线程假如超时依然没有获取到可履行使命则会被收回。
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
//判别线程池是否处于可履行使命状况,假如不是则回来null
//留意这儿SHUTDOWN状况是能够持续履行使命的
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//获取当时线程数量
int wc = workerCountOf(c);
//allowCoreThreadTimeOut表明是否答应闲暇的中心线程超时收回
//或许假如当时线程数量大于中心线程数量,则超越中心线程数量的线程能够超时收回
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//假如某个线程超时获取task为null,则会履行到该处
//留意这儿修正线程数量是经过CAS,也便是同一时刻只要一个线程能够修正成功,其他线程假如同步在修正则会失利
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//经过CAS将线程数量-1,并直接回来null,即直接完毕当时线程
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//经过堵塞的方式等候行列使命
//假如timed为true则说明能够超时收回线程,即假如在特定时刻内没有新的使命,那么当时线程就需求收回
//take函数会一向等候直到有新的使命呈现需求处理
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
三、总结
如上代码剖析下来,答复如下几个问题则没有什么难度了:
问题1:线程池使命处理根本流程
答复:经过execute函数将使命提交到线程池,假如中心线程数未超越则直接创立中心线程履行使命,不然添加到使命行列;
假如使命行列满了则创立非中心线程履行使命,不然履行回绝战略。
问题2:怎么复用现有线程
答复:从上述代码剖析能够知道,每个线程会优先履行当时worker中的task,假如当时worker的task为空,则会去使命行列中获取使命进行履行;
假如使命行列为空则会堵塞当时线程;即典型的出产者-消费者模型,上层事务会向使命行列中添加使命(出产),线程会从使命行列中获取使命进行履行(消费),这样线程就能够不断复用了。
问题3:中心线程与非中心线程是否存在差异
答复:没有啥差异,假如当时既有中心线程也有非中心线程,那么终究被保留下来的线程便是中心线程,详细的可参阅上述getTask函数源码。