运用

线程池的运用有许多种不同的办法,JDK中也给咱们供给了许多现成的封装好的完成,例如:

Executors.newCachedThreadPool();

可是依据各个开发者的实践,特别是阿里巴巴的Java最佳实践,咱们仍是应该直接运用ThreadPoolExecutor的结构器来构建满足特定需求的线程池目标。例如,咱们能够自己经过代码来完成跟上面相同的缓存特性的线程池目标。

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                              60L, TimeUnit.SECONDS,
                              new SynchronousQueue<Runnable>());

基础理论

最中心的几个特点

在上述的构建代码中,咱们能够看到结构器中需求咱们传入好几个参数,而便是这几个参数的不同,就能够构建出满足不同事务需求的线程池目标,下面就先来解释一下这几个参数的含义。

corePoolSize

中心线程的数量。中心线程的意思便是,即便是线程的作业现已完毕,进入闲暇状况,除非咱们其他设置了allowCoreThreadTimeOut为true,那么这些线程目标将一向存在于线程池中。默许allowCoreThreadTimeOut为false,即:中心线程将一向存在于线程池中。

maximumPoolSize

线程池能容纳的线程数量的最大值。

keepAliveTime

这个参数用于设置处于闲暇状况的线程经过多久今后会被开释。既包括非中心线程完成作业今后进入闲暇状况,也包括中心线程在敞开了allowCoreThreadTimeOut参数今后进入闲暇状况。

unit

keepAliveTime参数的时刻单位

workQueue

作业线程的堵塞行列。这个参数是一个堵塞行列的容器,用于寄存还没轮到被履行的使命。这个行列接受的元素的类型便是完成了Runnable接口的示例啦。

threadFactory

用于创立新线程的工厂实例

handler

当用于存储使命的堵塞行列满了之后,应该怎么处理新进入的使命的战略完成实例,默许完成为AbortPolicy,直接扔掉使命,而且抛出反常RejectedExecutionException

使命是怎么流通的

首要简单的对咱们向线程池中提交使命有一个开端的认识,下图便是当咱们调用execute办法今后使命的流经过程。

Java并发包中的线程池解析

线程池的内部状况切换

和使命一样,线程池本身也是有状况的,而且会在不同的状况下流通,仍是先建立一个直观的印象。下图是一个线程池本身状况的流通图。

Java并发包中的线程池解析

代码解读

线程池的状况是用什么数据结构完成的

线程池的运转是建立在不同的状况切换上的,不同的状况下,关于使命和线程的调度也是不同的,因而搞清楚线程池的状况就理解了一半。下面是和线程池状况相关的主要特点和办法,咱们一个一个来解释。

这个ctl便是线程池状况和持有的线程的数量,也便是一个整形记载了2个逻辑信息
假如看框架比较多的话,对这种运用一个整形的高位和低位来存储数据的应该不会陌生
比方Android中的丈量参数,MeasureSpec,高2位表明丈量模式,低30位保存实践的数值
其他需求留意的是,这儿用到了原子类来确保这个参数在多个线程同享中的安全性
初始化的时分,能够看到,设置了线程池的默许状况为 RUNNING,线程数量为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
获取当时系统中整形是多少位的,当然整形是4个字节,32位来保存的
这儿计算取得的29,意思便是,咱们要用低29位来保存线程池中线程的数量
高3位用来保存线程池的状况值
private static final int COUNT_BITS = Integer.SIZE - 3;
这个值便是咱们用于获取ctl中存储的线程数量的掩码,二进制的值便是:
00011111 11111111 11111111 11111111
这儿像不熟悉微操作的同学解释一下计算的过程:见注释1
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
二进制表明为:11100000 00000000 00000000 00000000
private static final int RUNNING    = -1 << COUNT_BITS;
二进制表明为:00000000 00000000 00000000 00000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
二进制表明为:00100000 00000000 00000000 00000000
private static final int STOP       =  1 << COUNT_BITS;
二进制表明为:01000000 00000000 00000000 00000000
private static final int TIDYING    =  2 << COUNT_BITS;
二进制表明为:01100000 00000000 00000000 00000000
private static final int TERMINATED =  3 << COUNT_BITS;
从ctl中分离出高3位表明的线程池的状况码
位的与运算规则为,都是1才是1,否则为0
对COUNT_MASK取反便是:1110000....
所以说,低29位不管传入的是什么值,都会变成0,高3位则传入的数高三位是什么便是什么
private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
从ctl中取得线程数量
private static int workerCountOf(int c)  { return c & COUNT_MASK; }
将rs的高三位和wc的后29位组合成一个新的整形
private static int ctlOf(int rs, int wc) { return rs | wc; }

注释1:

首要是1的二进制表明:00000000 00000000 00000000 00000001

左移29位,那么便是: 00100000 00000000 00000000 00000000

减1,那么便是: 00011111 11111111 11111111 11111111

小结一下: 线程池运用了一个线程安全的整形来保存当时状况和作业线程数量,高3位位状况,后29位位作业线程数量,默许线程池处于RUNNING状况,作业线程数量为0。除了RUNNING状况,其他状况都对错负数。

execute办法

这个办法官方是自带了阐明的,描绘了往线程池中增加新使命需求处理的3种景象,原理咱们上面现已用流程图说的比较清楚了。

public void execute(Runnable command) {
    // 健壮性查看,确保传入的使命不为空
    if (command == null)
        throw new NullPointerException();
    // 由于ctl是一个原子类,因而需求运用get办法获取其间的值
    int c = ctl.get();
    // 假如当时线程池中的作业线程数量还没抵达中心线程的数量
    if (workerCountOf(c) < corePoolSize) {
        // 直接增加一个中心线程来履行使命,addWorker的第二个参数的含义便是是否创立的是中心线程
        if (addWorker(command, true))
            创立成功的话那么就完毕这个流程了
            return;
        // 假如创立线程失利,那么要从头更新一下当时的ctl值,由于这期间或许其他线程也会有修改行为
        c = ctl.get();
    }
    // 走到这儿阐明当时线程池中的作业线程数量现已超越中心线程数量了
    // 判别当时是否能够往堵塞行列中增加这个使命,线程池状况必须是RUNNING状况
    if (isRunning(c) && workQueue.offer(command)) {
        // 能进来阐明使命现已被成功增加进堵塞行列了,等候被履行
        // 获取ctl的最新值
        int recheck = ctl.get();
        // 假如线程池的状况现已不是RUNNING,那么咱们就不能让这个使命进入行列或许被履行了
        // 因而要先从行列中移除该使命
        if (! isRunning(recheck) && remove(command))
            // 然后调用回绝战略
            reject(command);
        // 假如当时依然是RUNNING状况,可是作业线程现已没有了
        else if (workerCountOf(recheck) == 0)
            // 能进来的意思是,咱们成功将传入的使命放进了堵塞行列中,
            // 可是乜有线程能履行了,
            // 那么咱们就需求创立一个空使命的非中心线程来履行行列中的使命
            addWorker(null, false);
    }
    // 到这儿代表当时作业线程数量现已超越中心线程数量,而且堵塞行列也插入失利,
    // 一般便是行列满了,那么咱们就要测验创立非中心线程来履行当时使命
    else if (!addWorker(command, false))
        // 假如非中心线程也不允许创立了,那么就直接走回绝战略了
        reject(command);
}

小结一下: 能够看到,execute办法的逻辑十分清晰,紧密的环绕咱们在构建线程池的时分构建的那几个参数打开。先判别中心线程数量是否还足够,假如中心线程数量还有充裕,那么直接创立中心线程而且履行当时使命。假如中心线程数量现已满了,那么就会先看堵塞行列还有没有位置,假如有位置的话,那么就会把使命存进堵塞行列中,等候被履行。假如堵塞行列也满了,那么就查看能不能创立非中心线程来履行使命。假如仍是不可,那么就走回绝战略了。 引申一下: 从上面的剖析咱们能够看到,非中心线程的优先级是在堵塞行列之后的,可是假定咱们希望提升履行效率,缩短使命履行时刻,优先运用非中心线程来处理使命呢?依据源码,咱们只要控制作业行列的offer办法才干达到这个意图。因而,一个可行的方案便是承继运用的堵塞行列,重写offer办法,在某些景象下让该办法回来false,也便是不让使命入队,而优先走创立非中心线程的分支。

addWorker

这个办法很长,咱们分红2部分,依次来看看这个办法做了一些什么操作。这个办法有2个参数,第一个参数为咱们需求被履行的使命,第二个代表咱们希望是中心线程来履行仍对错中心线程。当然,关于真实履行使命的线程目标来说,并没有什么差异,他们唯一的差异便是存活时刻长短的差异。

private boolean addWorker(Runnable firstTask, boolean core) {
    // 这是一个很风趣的特性,这是java的代码tag,你其实能够运用任何的单词作为tag
    // 然后在循环中,break或许continue就能够直接直接跳转到这些标签的位置
    // 类似goto句子
    retry:
    // 敞开一个无限循环,给临时变量c复制为最新的ctl的值
    // 无限让自己循环,直到操作成功,这便是很典型的自旋锁确保线程安全履行的办法
    for (int c = ctl.get();;) {
        // 线程池的状况至少要是SHUTDOWN,也便是能够是:除了RUNNING以外的状况
        // 也便是说当时线程池现已没有处于运转中状况了,那么依照规划,此时就
        // 不该该再接纳新使命了
        // 这儿咱们分状况来讨论,假如当时是SHUTDOWN之后的状况,那么就直接回来false
        // 假如当时是SHUTDOWN状况,尽管不接纳新使命,可是咱们要继续让线程池处理行列中的使命
        // 因而要越过这个return的办法只要,传入的使命为空或许行列不为空,还记得上一部分
        // 咱们发现没有线程了,可是行列中海油使命待处理的场景吗,便是这儿要刨除的场景
        // 好让咱们创立一个非中心线程来接着处理行列中的使命
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;
        for (;;) {
            // 这儿便是判别当时线程池中的作业线程数量是否现已超越最大限度
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                // 假如现已超越了,那么就不要再增加了
                return false;
            // 运用CAS来新增线程池中的线程数量
            if (compareAndIncrementWorkerCount(c))
                // 假如更新成功,那么就跳出循环,回到这个办法打retry标记的地方
                break retry;
            // 走到这儿,阐明线程数量未满可是更新数量失利了
            // 更新ctl最新的数据给局部变量c
            c = ctl.get();
            // 假如当时状况现已对错运转中状况
            if (runStateAtLeast(c, SHUTDOWN))
                // 那么就跳转到retry标签,也便是代码的最开端除从头运转
                continue retry;
            // 假如当时依然是RUNNING状况,那么就回到内部循环出再次更新作业线程数量
        }
    }
}

小结一下: 前半部分说白了便是在测验新增一个作业线程的数量,那么就需求线程池满足2个条件。第一,处于运转中,即:RUNNING状况;第二,线程池中的作业线程数量还没满。

private boolean addWorker(Runnable firstTask, boolean core) {
    // ......省掉上一部分
    // 上一部分咱们主要是决议要不要开新线程来履行新加入的使命
    // 那么接下来,就代表的确需求新开线程来履行使命了,下面便是详细敞开的办法
    // 作业线程是否被发动
    boolean workerStarted = false;
    // 作业线程是否现已被增加
    boolean workerAdded = false;
    // 作业线程封装目标,简单的介绍一下这个类
    // Worker类承继自AQS,完成了Runnable接口,因而主要便是作为Runnable增强功用
    // 承继AQS是为了经过AQS中的state来更便利的呼应锁和中止
    // 在new Worker的时分,会默许将Worker父类AQS中的state设置为-1,
    // 线程开端运转今后就设置为0
    Worker w = null;
    try {
        // 构建一个作业线程目标
        w = new Worker(firstTask);
        // 取得真实的线程目标
        final Thread t = w.thread;
        if (t != null) {
            // 构建一个可重入锁目标
            final ReentrantLock mainLock = this.mainLock;
            // 获取可履行凭证,获取不到就会自旋或许被挂起
            mainLock.lock();
            try {
                // 再次获取ctl的最新值,以防被其他线程更新过了
                int c = ctl.get();
                // 判别条件有2个
                // 1. 线程池状况是否处于运转中状况
                // 2. 运转状况为SHUTDOWN而且传入的使命为空
                // 还记得上面咱们见过的那个addWorker(null, false)的特别处理吗?
                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    // 进来就代表线程池状况是支持咱们新开作业线程的
                    // 判别新创立的线程目标的状况是否是初始状况,不是的话就抛出反常
                    if (t.getState() != Thread.State.NEW)
                        throw new IllegalThreadStateException();
                    // 增加到作业行列的调集中
                    // workers是一个HashSet,确保作业线程的唯一性
                    workers.add(w);
                    // 记载作业线程现已被增加成功
                    workerAdded = true;
                    // 获取当时作业线程的数量
                    int s = workers.size();
                    // 更新线程池拥有过的最多的线程数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                }
            } finally {
                // 开释锁
                mainLock.unlock();
            }
            // 假如增加作业线程成功
            if (workerAdded) {
                // 发动线程,传入的Runnable就会被履行
                t.start();
                // 记载作业线程被成功发动
                workerStarted = true;
            }
        }
    } finally {
        // 假如没有增加作业线程成功
        if (! workerStarted)
            // 那么就要回滚增加作业线程的操作,这个办法很重要,咱们下面会独自解释
            addWorkerFailed(w);
    }
    // 回来作业线程有没有被成功发动
    return workerStarted;
}
// Worker类的结构器
Worker(Runnable firstTask) {
    // 设置为-1表明默许让线程处于堵塞状况,唤醒今后将被设置为0
    setState(-1);
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}
// 回滚增加作业线程失利的一切操作
private void addWorkerFailed(Worker w) {
    // 获取线程操作锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            // 从作业线程调集中移除该目标
            workers.remove(w);
        // 给当时的线程数量-1
        decrementWorkerCount();
        // 测验将线程池切入TIDYING状况,然后完毕线程池
        tryTerminate();
    } finally {
        // 开释锁
        mainLock.unlock();
    }
}
final void tryTerminate() {
    // 自旋确保线程使命被履行
    for (;;) {
        // 获取最新的ctl的值
        int c = ctl.get();
        // 这儿判其他是不需求进入TIDYING状况的状况
        // 假如当时线程池的状况还在运转中
        // 或许现已是TIDYING及之后的状况了
        // 或许是SHUTDOWN状况而且堵塞行列中还有使命没处理完,咱们知道线程池的规划要求
        // 处于SHUTDOWN状况下只是不接纳新使命,但关于现已处于行列中的使命要继续履行的
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
            return;
        // 走到这儿就代表当时现已满足进入TERMINATED状况的条件
        // 判别作业线程的数量是不是不为0
        if (workerCountOf(c) != 0) {
            // 中止一个作业线程
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        // 走到这儿就代表线程池中的作业线程数量也没有了,那么依照规划,就能够正式
        // 停止线程池了,进入TERMINATED状况
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 将线程池的状况转换为TIDYING状况,代表线程池的清理要开端了
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 需求由子类来完成的回调,默许为空完成
                    terminated();
                } finally {
                    // 将线程池状况过渡到TERMINATED状况,表明线程池被终结了
                    ctl.set(ctlOf(TERMINATED, 0));
                    // 开释清理相关的线程锁
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // 假如操作失利了,那么就回到循环的开端处,再来重试一次
    }
}

小结一下:addWorker办法的第二阶段,咱们需求测验创立一个作业线程来处理使命,而这个作业线程会被封装在Worker类中,这个类承继自AQS,能够经过state参数来控制是否中止绑定的线程。作业线程创立成功今后就会马上开端履行,而且增加到数据结构为HashSetworkers调集中。假如线程运转失利,那么就阐明或许当时线程池现已进入了封闭阶段,就会调用tryTerminate办法测验停止线程池。

至此,增加作业线程来处理新使命的流程咱们现已讲清楚了。

总结

至此,咱们就对线程池的完成逻辑从源代码层面有了一个比较清晰的认知。

  1. 线程池的内部状况由一个AtomicInteger来存储,确保线程的安全性。其间的高3位存储线程池的状况,低29位存储作业线程的数量。运转中RUNNING状况为-1,其他跟停止相关的状况都对错负数。
  2. 线程池初始状况为RUNNING。
  3. 当有新使命进来时,首要判别是否还有中心线程的坑位,假如有,那么就直接创立中心线程来履行使命。假如中心线程满了,那么就先看看堵塞行列是否有坑位,有的话就放进堵塞行列中,等候被履行。假如堵塞行列也满了,那么就看看非中心线程还有没有坑位,有的话就创立非中心线程来履行使命。最终都不可,那就只能走回绝战略了。
  4. 默许的回绝战略是AbortPolicy,抛出反常。