本文主要内容
- 什么是线程池
- 线程池的运用
- 线程池的原理
- 线程池中的位运算
- 源码解析
什么是线程池
假设一再创建线程,也是会影响全体资源或效率的,线程池的发生是为了防止重复的创建线程和收回线程。线程池有以下几个作用:
- 下降资源消耗。经过重复利用已创建的线程下降线程创建、销毁线程形成的消耗
- 前进响应速度。当任务抵达时,任务可以不需求比及线程创建就能当即实行。
- 前进线程的可管理性。线程是稀缺资源,假设入约束的创建,不只会消耗系统资源,还会下降系统的稳定性,运用线程池可以进行一致的分配、调优和监控。
线程池的运用
先来看看线程池的中心结构方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//反常处理略过
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
需求用户指定以下几个要害变量:
- corePoolSize:中心线程数量
- maximumPoolSize:最大线程数量
- allowCoreThreadTimeOut:是否容许线程超时(设置为true时与keepAliveTime,TimeUnit一同起作用)
- keepAliveTime:线程存活时刻(当线程池容许线程超时且运转中的线程数量逾越- corePoolSize时,会按照此变量设置时刻关闭线程)
- TimeUnit:单位(一般与keepAliveTime一同运用,供线程池判别是否满足关闭线程的条件)
- workQueue:缓冲队伍
- RejectedExecutionHandler:回绝处理任务类(默许:AbortPolicy 会抛反常,见下方实例)
- threadFactory:线程工厂(默许:DefaultThreadFactory)
理解这些中心数据的作用,就可以随意运用线程池了
在Executors类中,封装好了几个接口用于初始化线程池,具体可参看源码,只处只罗列其间一个,不再胪陈
ExecutorService executor = Executors.newFixedThreadPool(5);
线程池的原理
假设让我们自己来规划一个线程池,我们应该怎么做呢?线程池最大的功能在于线程重复运用,不需求每实行一个任务就从头结构新线程。
假设用户提交的任务比较多,显然我们需求一个缓存队伍用于存储任务。
为了抵达线程重复运用的意图,线程应该不停地从缓存队伍中获取新的任务,并且实行它。
线程池中的位运算
线程池中运用一个AtomicInteger方针来表征线程池的情况和大小,为了抵达1个int型数据能保存2个数据的意图,源码采用了非常精妙的位运算来完成。
//线程池的各个情况
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
//两个常量
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//获取情况、线程池大小的方法以及一个或方法
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
//要害变量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
从源码可知,其实RUNNING等5个情况的值的前4位分别是:
- 1110 、0000、 0010 、0100、 0110
各情况后边接了28个0。
CAPACITY的值,前4位为0,后28位为1。
假设要获取线程池情况,则调用runStateOf此方法,ctl的值将与 ~CAPACITY 作与操作,得到的正是ctl值的前4位。
获取其时线程池大小,则是与CAPACITY 作与操作,正是取ctl的低位值。
所以 ctl 的高位值用于表征线程池情况,而低们值用于表征线程池的大小。
ctl的初始化,调用ctlOf方法,正是拿RUNNING与0作或操作,成果仍是RUNNING,所以可知,在线程池初始化的时分,默许ctl的情况值为RUNNING,而线程池大小为0。
ctlOf方法的意义也可知,正是组合情况值与线程池大小。
源码解析
一般来说,线程池添加任务有两种方法,一种是execute方法,另一种是submit方法,submit方法其实也是会调用execute,所以在此只研究execute方法即可。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取线程池情况
int c = ctl.get();
//假设线程池大小少于中心线程,则直接添加新的Worker并回来
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//假设中心线程数量已满,则考虑将任务添加进入缓存队伍,然后再次检查情况
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//最终中心线程数量已满,缓存队伍已满,那么直接交给回绝战略处理
else if (!addWorker(command, false))
reject(command);
}
再来看非常要害的addWorker方法:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
//获取线程池情况
int rs = runStateOf(c);
// 对情况进行检测,假设线程池现已被调用shutDown方法时,回来false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//获取线程池的大小
int wc = workerCountOf(c);
//假设线程池大小现已逾越中心线程池大小,那么也直接回来,这种情况应该把任务缓存到队伍中去
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//正常情况下,线程池大小加1即可
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//新建一个Worker,在实行Worker结构方法时,创建了新的线程,即下面的t,具体可拜见Worker的结构方法。
Worker w = new Worker(firstTask);
Thread t = w.thread;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
if (t == null ||
(rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null))) {
decrementWorkerCount();
tryTerminate();
return false;
}
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
} finally {
mainLock.unlock();
}
//发起线程
t.start();
// It is possible (but unlikely) for a thread to have been
// added to workers, but not yet started, during transition to
// STOP, which could result in a rare missed interrupt,
// because Thread.interrupt is not guaranteed to have any effect
// on a non-yet-started Thread (see Thread#interrupt).
if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
t.interrupt();
return true;
}
在addWorker方法中,发起了一个新的线程,那我们来看看线程中的run方法,其实即是runWorker方法:
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
boolean completedAbruptly = true;
try {
//请注意,这是一个while循环,假设其时Worker的task不为空,则取其时Worker的task,实行任务
//假设其时task为空,则从缓存队伍中取任务来实行,这便是线程池中最重要概念线程复用的体现
while (task != null || (task = getTask()) != null) {
w.lock();
clearInterruptsForTaskRun();
try {
beforeExecute(w.thread, task);
Throwable thrown = null;
try {
//实行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
我们来看看getTask方法是否真的是从缓存队伍中取任务:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
boolean timed; // Are workers subject to culling?
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
//正常情况下,线程池数量应该少于最大值,并且也不会timedOut,
//假设真的大于了最大值,则应该删去一个线程
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
//从缓存队伍中取出任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
线程池分析到这,底子结束了,内中最多的细节,比如同步锁的运用,比如情况的判别,比如最终中止线程池等等,不再细说了,线程池介绍到这底子原理都清楚了,有点类似于android中的Looper,死循环中取消息,分发消息并实行对应操作等,线程池也是这样。
线程池中的位运算,刚开始或许看不懂,但在纸上写下各个数,底子就能理解各个方法的意义了。
最终再弥补一个小细节,负数在计算机中的表明,为了更便利完成2进制数的加减法,负数运用补码表明,具体则是正数部分取反加1,而正数的补码则是正数本身。正因为如此,RUNNING 前4位才是1110,具体的各位可以计算下,补码的意义可以自行搜索。