本文正在参与「金石计划 . 瓜分6万现金大奖」

在作业中,咱们经常运用线程池,可是你真的了解线程池的原理吗?一同,线程池作业原理和底层完成原理也是面试经常问的考题,所以,今日咱们一同聊聊线程池的原理吧。

为什么要用线程池

运用线程池主要有以下三个原因:

  1. 下降资源耗费。经过重复利用已创立的线程下降线程创立和毁掉形成的耗费。
  2. 提升响应速度。当使命到达时,使命可以不需求比及线程创立就能立即履行。
  3. 可以对线程做统一办理。线程是稀缺资源,假如无限制地创立,不仅会耗费体系资源,还会下降体系稳定性,运用线程池可以进行统一分配、调优和监控。

线程池的原理

Java中的线程池顶层接口是Executor接口,ThreadPoolExecutor是这个接口的完成类。

咱们先看看ThreadPoolExecutor类。

ThreadPoolExecutor提供的结构办法

// 七个参数的结构函数
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

咱们先看看这些参数是什么意思:

  • int corePoolSize:该线程池中中心线程数最大值

中心线程:线程池中有两类线程,中心线程和非中心线程。中心线程默许情况下会一向存在于线程池中,即便这个中心线程什么都不干(铁饭碗),而非中心线程假如长时间的搁置,就会被毁掉(临时工)。

  • int maximumPoolSize:该线程池中线程总数最大值

该值等于中心线程数量 + 非中心线程数量。

  • long keepAliveTime非中心线程搁置超时时长

非中心线程假如处于搁置状况超越该值,就会被毁掉。假如设置allowCoreThreadTimeOut(true),则会也作用于中心线程。

  • TimeUnit unit:keepAliveTime的单位。

TimeUnit是一个枚举类型。

  • BlockingQueue workQueue:堵塞行列,维护着等候履行的Runnable使命目标

    常用的几个堵塞行列:

    1. LinkedBlockingQueue:链式堵塞行列,底层数据结构是链表,默许巨细是Integer.MAX_VALUE,也可以指定巨细。

    2. ArrayBlockingQueue:数组堵塞行列,底层数据结构是数组,需求指定行列的巨细。

    3. SynchronousQueue:同步行列,内部容量为0,每个put操作必须等候一个take操作,反之亦然。

    4. DelayQueue:延迟行列,该行列中的元素只有当其指定的延迟时间到了,才可以从行列中获取到该元素 。

  • ThreadFactory threadFactory

    创立线程的工厂 ,用于批量创立线程,统一在创立线程时设置一些参数,如是否看护线程、线程的优先级等。假如不指定,会新建一个默许的线程工厂。

static class DefaultThreadFactory implements ThreadFactory {
    // 省掉特点
    // 结构函数
    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
        Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
            poolNumber.getAndIncrement() +
            "-thread-";
    }
    // 省掉
}
  • RejectedExecutionHandler handler

    回绝处理战略,线程数量大于最大线程数就会采用回绝处理战略,四种回绝处理的战略为 :

    1. ThreadPoolExecutor.AbortPolicy默许回绝处理战略,丢掉使命并抛出RejectedExecutionException异常。
    2. ThreadPoolExecutor.DiscardPolicy:丢掉新来的使命,可是不抛出异常。
    3. ThreadPoolExecutor.DiscardOldestPolicy:丢掉行列头部(最旧的)的使命,然后重新测验履行程序(假如再次失利,重复此进程)。
    4. ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该使命。

ThreadPoolExecutor的战略

线程池本身有一个调度线程,这个线程便是用于办理布控整个线程池里的各种使命和事务,例如创立线程、毁掉线程、使命行列办理、线程行列办理等等。

故线程池也有自己的状况。ThreadPoolExecutor类中运用了一些final int常量变量来表明线程池的状况 ,分别为RUNNING、SHUTDOWN、STOP、TIDYING 、TERMINATED。

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
  • 线程池创立后处于RUNNING状况。

  • 调用shutdown()办法后处于SHUTDOWN状况,线程池不能承受新的使命,铲除一些闲暇worker,不会等候堵塞行列的使命完成。

  • 调用shutdownNow()办法后处于STOP状况,线程池不能承受新的使命,中止一切线程,堵塞行列中没有被履行的使命悉数丢掉。此时,poolsize=0,堵塞行列的size也为0。

  • 当一切的使命已终止,ctl记录的”使命数量”为0,线程池会变为TIDYING状况。接着会履行terminated()函数。

  • 线程池处在TIDYING状况时,履行完terminated()办法之后,就会由 TIDYING -> TERMINATED, 线程池被设置为TERMINATED状况。

线程池主要的使命处理流程

处理使命的中心办法是execute,咱们看看 JDK 1.8 源码ThreadPoolExecutor是怎么处理线程使命的:

// JDK 1.8 
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();   
    int c = ctl.get();
    // 1.当时线程数小于corePoolSize,则调用addWorker创立中心线程履行使命
    if (workerCountOf(c) < corePoolSize) {
       if (addWorker(command, true))
           return;
       c = ctl.get();
    }
    // 2.假如不小于corePoolSize,则将使命添加到workQueue行列。
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 2.1 假如isRunning回来false(状况查看),则remove这个使命,然后履行回绝战略。
        if (! isRunning(recheck) && remove(command))
            reject(command);
            // 2.2 线程池处于running状况,可是没有线程,则创立线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3.假如放入workQueue失利,则创立非中心线程履行使命,
    // 假如这时创立非中心线程失利(当时线程总数不小于maximumPoolSize时),就会履行回绝战略。
    else if (!addWorker(command, false))
         reject(command);
}

ctl.get()是获取线程池状况,用int类型表明。第二步中,入队行进行了一次isRunning判别,入队之后,又进行了一次isRunning判别。

为什么要二次查看线程池的状况?

多线程的环境下,线程池的状况是时间发生变化的。很有可能刚获取线程池状况后线程池状况就改变了。判别是否将command加入workqueue是线程池之前的状况。倘若没有二次查看,假如线程池处于非RUNNING状况(在多线程环境下很有可能发生),那么command永远不会履行。

总结一下处理流程

  1. 线程总数量 < corePoolSize,无论线程是否闲暇,都会新建一个中心线程履行使命(让中心线程数量快速到达corePoolSize,在中心线程数量 < corePoolSize时)。注意,这一步需求取得大局锁。
  2. 线程总数量 >= corePoolSize时,新来的线程使命会进入使命行列中等候,然后闲暇的中心线程会依次去缓存行列中取使命来履行(表现了线程复用)。
  3. 当缓存行列满了,阐明这个时分使命现已多到爆棚,需求一些“临时工”来履行这些使命了。于是会创立非中心线程去履行这个使命。注意,这一步需求取得大局锁。
  4. 缓存行列满了, 且总线程数到达了maximumPoolSize,则会采纳上面说到的回绝战略进行处理。

整个进程如图所示:

一文弄懂Java中线程池原理

ThreadPoolExecutor怎么做到线程复用的?

咱们知道,一个线程在创立的时分会指定一个线程使命,当履行完这个线程使命之后,线程自动毁掉。可是线程池却可以复用线程,即一个线程履行完线程使命后不毁掉,持续履行别的的线程使命。那么,线程池怎么做到线程复用呢?

本来,ThreadPoolExecutor在创立线程时,会将线程封装成作业线程worker,并放入作业线程组中,然后这个worker反复从堵塞行列中拿使命去履行。

这儿的addWorker办法是在上面说到的execute办法里面调用的,先看看上半部分:

// ThreadPoolExecutor.addWorker办法源码上半部分
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                // 1.假如core是ture,证明需求创立的线程为中心线程,则先判别当时线程是否大于中心线程
                // 假如core是false,证明需求创立的是非中心线程,则先判别当时线程数是否大于总线程数
                // 假如不小于,则回来false
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

上半部分主要是判别线程数量是否超出阈值,超越了就回来false。咱们持续看下半部分:

    // ThreadPoolExecutor.addWorker办法源码下半部分
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 1.创立一个worker目标
        w = new Worker(firstTask);
        // 2.实例化一个Thread目标
        final Thread t = w.thread;
        if (t != null) {
            // 3.线程池大局锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 4.发动这个线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

创立worker目标,并初始化一个Thread目标,然后发动这个线程目标。

咱们接着看看Worker类,仅展示部分源码:

// Worker类部分源码
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    final Thread thread;
    Runnable firstTask;
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    public void run() {
            runWorker(this);
    }
    //其余代码略...
}

Worker类完成了Runnable接口,所以Worker也是一个线程使命。在结构办法中,创立了一个线程,线程的使命便是自己。故addWorker办法调用addWorker办法源码下半部分中的第4步t.start,会触发Worker类的run办法被JVM调用。

咱们再看看runWorker的逻辑:

// Worker.runWorker办法源代码
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 1.线程发动之后,经过unlock办法开释锁
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 2.Worker履行firstTask或从workQueue中获取使命,假如getTask办法不回来null,循环不退出
        while (task != null || (task = getTask()) != null) {
            // 2.1进行加锁操作,保证thread不被其他线程中止(除非线程池被中止)
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 2.2查看线程池状况,倘若线程池处于中止状况,当时线程将中止。 
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 2.3履行beforeExecute 
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 2.4履行使命
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 2.5履行afterExecute办法 
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                // 2.6解锁操作
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

首先去履行创立这个worker时就有的使命,当履行完这个使命后,worker的生命周期并没有完毕,在while循环中,worker会不断地调用getTask办法从堵塞行列中获取使命然后调用task.run()履行使命,然后到达复用线程的目的。只需getTask办法不回来null,此线程就不会退出。

当然,中心线程池中创立的线程想要拿到堵塞行列中的使命,先要判别线程池的状况,假如STOP或者TERMINATED,回来null

最终看看getTask办法的完成:

// Worker.getTask办法源码
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        // 1.allowCoreThreadTimeOut变量默许是false,中心线程即便闲暇也不会被毁掉
        // 假如为true,中心线程在keepAliveTime内仍闲暇则会被毁掉。 
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // 2.假如运转线程数超越了最大线程数,可是缓存行列现已空了,这时递减worker数量。 
     // 假如有设置允许线程超时或者线程数量超越了中心线程数量,
        // 并且线程在规守时间内均未poll到使命且行列为空则递减worker数量
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            // 3.假如timed为true(想想哪些情况下timed为true),则会调用workQueue的poll办法获取使命.
            // 超时时间是keepAliveTime。假如超越keepAliveTime时长,
            // poll回来了null,上边说到的while循序就会退出,线程也就履行完了。
            // 假如timed为false(allowCoreThreadTimeOut为false
            // 且wc > corePoolSize为false),则会调用workQueue的take办法堵塞在当时。
            // 行列中有使命加入时,线程被唤醒,take办法回来使命,并履行。
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

中心线程的会一向卡在workQueue.take办法,被堵塞并挂起,不会占用CPU资源,直到拿到Runnable 然后回来(当然假如allowCoreThreadTimeOut设置为true,那么中心线程就会去调用poll办法,由于poll可能会回来null,所以这时分中心线程满足超时条件也会被毁掉)。

非中心线程会workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ,假如超时还没有拿到,下一次循环判别compareAndDecrementWorkerCount就会回来null,Worker目标的run()办法循环体的判别为null,使命完毕,然后线程被体系收回 。

四种常见的线程池

Executors类中提供的几个静态办法来创立线程池。

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

CacheThreadPool运转流程如下:

  1. 提交使命进线程池。
  2. 由于corePoolSize为0的关系,不创立中心线程,线程池最大为Integer.MAX_VALUE。
  3. 测验将使命添加到SynchronousQueue行列。
  4. 假如SynchronousQueue入列成功,等候被当时运转的线程闲暇后拉取履行。假如当时没有闲暇线程,那么就创立一个非中心线程,然后从SynchronousQueue拉取使命并在当时线程履行。
  5. 假如SynchronousQueue已有使命在等候,入列操作将会堵塞。

当需求履行很多短时间的使命时,CacheThreadPool的线程复用率比较高, 会明显的提高性能。而且线程60s后会收回,意味着即便没有使命进来,CacheThreadPool并不会占用很多资源。

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

中心线程数量和总线程数量相等,都是传入的参数nThreads,所以只能创立中心线程,不能创立非中心线程。由于LinkedBlockingQueue的默许巨细是Integer.MAX_VALUE,故假如中心线程闲暇,则交给中心线程处理;假如中心线程不闲暇,则入列等候,直到中心线程闲暇。

与CachedThreadPool的区别

  • 由于 corePoolSize == maximumPoolSize ,所以FixedThreadPool只会创立中心线程。 而CachedThreadPool由于corePoolSize=0,所以只会创立非中心线程。
  • 在 getTask() 办法,假如行列里没有使命可取,线程会一向堵塞在 LinkedBlockingQueue.take() ,线程不会被收回。 CachedThreadPool会在60s后收回。
  • 由于线程不会被收回,会一向卡在堵塞,所以没有使命的情况下, FixedThreadPool占用资源更多
  • 都简直不会触发回绝战略,可是原理不同。FixedThreadPool是由于堵塞行列可以很大(最大为Integer最大值),故简直不会触发回绝战略;CachedThreadPool是由于线程池很大(最大为Integer最大值),简直不会导致线程数量大于最大线程数,故简直不会触发回绝战略。

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

有且仅有一个中心线程( corePoolSize == maximumPoolSize=1),运用了LinkedBlockingQueue(容量很大),所以,不会创立非中心线程。一切使命依照先来先履行的顺序履行。假如这个唯一的线程不闲暇,那么新来的使命会存储在使命行列里等候履行。

newScheduledThreadPool

创立一个定长线程池,支撑守时及周期性使命履行。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}

四种常见的线程池基本够咱们运用了,可是《阿里巴巴开发手册》不主张咱们直接运用Executors类中的线程池,而是经过ThreadPoolExecutor的方式,这样的处理方式让写的同学需求更加明确线程池的运转规则,规避资源耗尽的危险。

但假如你及团队本身对线程池十分熟悉,又确定事务规划不会大到资源耗尽的程度(比如线程数量或使命行列长度可能到达Integer.MAX_VALUE)时,其实是可以运用JDK提供的这几个接口的,它能让咱们的代码具有更强的可读性。

小结

在作业中,很多人由于不了解线程池的完成原理,把线程池配置过错,然后导致各种问题。希望你们阅读完本文,可以学会合理的运用线程池。

对于真正想弄懂java并发编程的小伙伴,网上的文章还有视频缺少体系性,我主张大家仍是买点书本看看,我推荐两本我看过的书。

《Java并发编程实战》:这本书浅显易懂地介绍了Java线程和并发,是一本十分棒的Java并发参考手册。

《Java并发编程艺术》:Java并发编程的概念本来就比较复杂,咱们需求的是一本可以把原理解释清楚的书本,而这本《Java并发编程的艺术》书是国内作者写的Java并发书本,刚好就比上面那一本更简单易懂,至少我自己看下来是这样的感觉。