1. 前语

我这边从一个问题引出这次的话题,咱们或许会在开中碰到一种OOM问题,java.lang.OutOfMemoryError: pthread_create (1040KB stack) failed: Try again

相信许多人碰到过这个错误,很简略从网上搜索到呈现这个问题的原因是线程过多,那线程过多为什么会导致OOM?线程什么状况下会开释资源?你又能怎样做到让它不开释资源?

有的人或许会想到,那既然创立线程过多会导致OOM,那我用线程池不就行了。可是有没有想过,线程池,也或许会造成OOM。其实这儿有个很经典的场景,你运用OkHttp的时分不留意,每次请求都创立OkHttpClient,导致线程池过多呈现OOM

2. 简略了解线程池

怎样去了解线程池,看源码,直接去看是很难看得懂的,要先了解线程池的原理,对它的规划思维有个大约的掌握之后,再去看源码,就会轻松许多,当然这儿只了解根底的原理还不够,还需要有一些多线程相关的根底常识。

本篇文章只从部分源码的视点去剖析,线程池怎样导致OOM的,而不会悉数去看一切线程池的源码细节,由于太多了

首要,要了解线程池,首要需要从它的参数入手:

  • corePoolSize:中心线程数量
  • maximumPoolSize:最大线程数量
  • keepAliveTime,unit:非中心线程的存活时间和单位
  • workQueue:堵塞行列
  • ThreadFactory:线程工厂
  • RejectedExecutionHandler:饱满战略

然后你从网上任何一个当地搜都能知道它大致的工作流程是,当一个使命开端履行时,先判别当时线程池数量是否到达中心线程数,没到达则创立一个中心线程来履行使命,假如超越,放到堵塞行列中等待,假如堵塞行列满了,未到达最大线程数,创立一条非中心线程履行使命,假如到达最大线程数,履行饱满战略。在这个进程中,中心线程不会收回,非中心线程会依据keepAliveTime和unit进行收回。

**这儿能够多提一嘴,这个进程用了工厂形式ThreadFactory和战略形式RejectedExecutionHandler,关于战略形式能够看我这篇文章 ** /post/719502…

其实从这儿就能够看出为什么线程池也会导致OOM了:中心线程不会收回,非中心线程运用完之后会依据keepAliveTime和unit进行收回 ,那中心线程就会一向存活(我这不考虑shutdown()和shutdownNow()这些状况),一向存活就会占用内存,那你假如创立许多线程池,就会OOM。

所以我这篇文章要剖析:中心线程不会开释资源的进程,它内部怎样做到的。 只从这部分的源码去进行剖析,不会悉数都具体讲。

先别急,为了照顾一些根底不太好的朋友,触及一些根底常识感觉仍是要多讲一下。上面提到的线程收回和shutdown办法这些是什么意思?线程履行完它内部的代码后会自动开释资源吗?

咱们都知道开发中有个概念叫生命周期,当然线程池和线程也有生命周期(这很重要),在开发中,咱们称之为lifecycle。

生命周期当然是规划这个东西的开发者所定义的,咱们先看线程池的生命周期,在ThreadPoolExecutor的注释中有写:

*
* The runState provides the main lifecycle control, taking on values:
*
*   RUNNING:  Accept new tasks and process queued tasks
*   SHUTDOWN: Don't accept new tasks, but process queued tasks
*   STOP:     Don't accept new tasks, don't process queued tasks,
*             and interrupt in-progress tasks
*   TIDYING:  All tasks have terminated, workerCount is zero,
*             the thread transitioning to state TIDYING
*             will run the terminated() hook method
*   TERMINATED: terminated() has completed
*

看得出它的生命周期有RUNNING,SHUTDOWN,STOP,TIDYING和TERMINATED。而shutdown()和shutdownNow()办法会改变生命周期,这儿不是对线程池做全面解析,所以先有个大约了解就行,能够暂时了解成这篇文章的一切剖析都是针对RUNNING状况下的。

看完线程池的,再看看线程的生命周期。线程的生命周期有:

  • NEW:创立,简略来说便是new出来没start
  • RUNNABLE:运转,简略来说便是start后履行run办法
  • TERMINATED:间断,简略来说便是履行完run办法或许进行中止操作之后会变成这个状况
  • BLOCKED:堵塞,便是加锁之后竞赛锁会进入到这个状况
  • WAITING、TIMED_WAITING:休眠,比方sleep办法

这个很重要,需要了解,你要学会线程这块相关的常识点的话,这些生命周期要深刻了解 。比方BLOCKED和WAITING有什么不同?然后学这块又会触及到锁那一块的常识。以后有时间能够独自写几篇这类的文章,这儿先大约有个概念,只需要能先看懂后面的源码就行。

从生命周期的概念你就能知道线程履行完它内部的代码后会自动开释资源,由于它run履行完之后生命周期会到TERMINATED,那这又触及到了一个常识点,为什么主线程(ActivityThread),履行完run的代码后不会生命周期变成TERMINATED,这又触及到Looper,就得了解Handler机制,能够看我这篇文章 /post/715882…

扯远了,现在进入正题,先想想,假如是你,你怎样做让中心线程履行完run之后不开释资源,很明显,只要让它不履行到TERMINATED生命周期就行,怎样让它不变成TERMINATED状况,只需要让它进入BLOCKED或许WAITING状况就行。所以我的主意是这样的,当这个中心线程履行完这个使命之后,我让它WAITING,等到有新的使命进来的时分我再唤醒它进入RUNNABLE状况。 这是我从理论这个视点去剖析的做法,那看看实际ThreadPoolExecutor是怎样做的

3. 线程池部分源码剖析

前面说了,不会悉数都讲,这儿触及到文章相关内容的流程便是中心线程的使命履行进程,所以这儿首要剖析中心线程。

当咱们运用线程池履行一个使命时,会调用ThreadPoolExecutor的execute办法

public void execute(Runnable command) {
    ......
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 咱们只看中心线程的流程,所以后面的代码不必管
    ......
}

这个ctl是一个状况相关的代码,能够先不必管,我后面会简略一致做个解说,这儿不去管它会比较简略了解,咱们现在首要是为了看中心线程的流程。从这儿能够看出,当时线程的数量小于中心线程的话履行addWorker办法

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

这个addWorker分为上下两部分,咱们分别来做解析

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 下半部分
    ......
}

这儿首要是做了状况判别的一些操作,我说过状况相关的咱们能够先不管,可是这儿的写法我觉得要独自讲一下为什么会这么写。否则它内部许多代码是这样的,我怕劝退许多人。

首要retry: …… break retry; 这个语法糖,平常咱们开发很少用到,能够去了解一下,这儿便是为了跳出循环。 其次,这儿的compareAndIncrementWorkerCount内部的代码是AtomicInteger ctl.compareAndSet(expect, expect + 1) ,Atomic的compareAndSet操作调配死循环,这叫自旋,所以说要看懂这个需要必定的java多线程相关的根底。自旋的目的是为了什么?这就又触及到了锁的分类中有达观锁,有失望锁。不清楚的能够去学一下这些常识,你就知道为什么它要这么做了,这儿就不逐个解说。包含你看它的源码,能看到,它会许多当地用自旋,许多当地用ReentrantLock,但它便是不必synchronized ,这些都是多线程这块根底的常识,这儿不多说了。

看看下半部分

private boolean addWorker(Runnable firstTask, boolean core) {
    // 上半部分
    ......
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            ......
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        ......
    }
    return workerStarted;
}

看到它先创立一个Worker目标,再调用Worker目标内部的线程的start办法,咱们看看Worker

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    private static final long serialVersionUID = 6138294804551838833L;
    final Thread thread;
    Runnable firstTask;
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    public void run() {
        runWorker(this);
    }
    // 其它办法
   ......
}

看到它内部首要有两个目标firstTask便是使命,thread便是履行这个使命的线程,而这个线程是经过getThreadFactory().newThread(this)创立出来的,这个便是咱们创立ThreadPoolExecutor时传的“线程工厂”
外部调t.start();之后就会履行这儿的run办法,由于newThread传了this进去,你能够先简略了解调这个线程start会履行到这个run,然后run中调用runWorker(this);

留意,你想想runWorker(this)办法,包含之后的流程,都是履行在哪个线程中?都是履行在子线程中,由于这个run办法中的代码,都是履行在这个线程中。你必定要了解这一步,否则你自己看源码会或许看懵。 由于有些人长时间不触摸多线程环境的状况下,你会习惯单线程的思维去看问题,那就很简略呈现了解上的错误。

咱们持续看看runWorker,时刻提示你自己,之后的流程都是在子线程中进行,这条子线程的生命周期变为RUNNABLE

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {s
            w.lock();
            // 中止相关的操作
            ......
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    ......
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                ......
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

先讲讲这儿的一个开发技巧,task.run()便是履行使命,它前面的beforeExecute和afterExecute便是模板办法规划形式,便利扩展用。
履行完使命后,最后履行processWorkerExit办法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    ......
}

workers.remove(w)后履行tryTerminate办法测验将线程池的生命周期变为TERMINATED

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

先不必管状况的改变,一般一眼都能看得出这儿是完毕的操作了,咱们追踪的中心线程正常在RUNNING状况下是不会履行到这儿的。 那咱们期望的没使命状况下让线程休眠的操作在哪里?
看回runWorker办法

final void runWorker(Worker w) {
    ......
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {s
            ......
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

看到它的while中有个getTask()办法,认真看runWorker办法其实能看出,中心线程履行完一个使命之后会getTask()拿下一个使命去履行,这便是当中心线程满的时分使命会放到堵塞行列中,中心线程履行完使命之后会从堵塞行列中拿下一个使命履行。 getTask()从抽象上来看,便是从行列中拿使命。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        ......
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

先把timed当成正常状况下为false,然后会履行workQueue.take(),这个workQueue是堵塞行列BlockingQueue, 留意,这儿又需要有点根底了。正常有点根底的人看到这儿,现已知道这儿便是当没有使命会让中心线程休眠的操作,看不懂的,能够先了解下什么是AQS,能够看看我这篇文章 /post/716801…

假如你说你懒得看,行吧,我随意拿个ArrayBlockingQueue给你举例

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

notEmpty是Condition,这儿调用了Condition的await()办法,然后想想履行这步操作的是在哪条线程上?线程进入WAITING状况了吧,不会进入TERMINATED了吧。

然后当有使命添加之后会唤醒它,它持续在循环中去履行使命。

这就验证了咱们的猜测,经过让中心线程进入WAITING状况以此来到达履行完run办法中的使命也不会自动TERMINATED而开释线程。所以中心线程一向占用资源,这儿说的资源指的是空间,而cpu的时间片是会让出的。

4. 部分线程池的操作解读

为什么线程池也会导致OOM,上面现已经过源码告知你,中心线程不会开释内存空间,导致线程池多的状况下也会导致OOM。这儿为了便利新手阅览ThreadPoolExecutor相关的代码,仍是觉得写一些它内部的规划思维,否则没点根底的话确实很难看懂。

首要便是状况,上面源码中都有关线程池的生命中周期状况(ctl字段),能够看看它怎样规划的

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;  // Integer.SIZE是32
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

它这儿用了两个规划思维,第一个便是用位来表明状况,关于这类型的规划,能够看我这2篇文章 /post/715547… 和 /post/720550…

别的一个规划思维是:用一个变量的高方位表明状况,低位表明数量。 这儿便是用高3位来表明生命周期,剩余的低位表明线程的数量。和这个相似的操作有view中的MeasureSpec,也是一个变量表明两个状况。

然后关于规划形式,能够看到它这儿最经典的便是用了战略形式,假如你看饱满战略那块的源码,能够好好看看它是怎样规划的。其它的还有工厂、模板之类的,这些也不难,便是战略仍是主张学下它怎样去规划的。

然后多线程相关的根底,这个仍是比较重要的,这块的根底欠好,看ThreadPoolExecutor的源码会相对吃力。比方我上面提过的,线程的生命周期,锁相关的常识,还有AQS等等。假如你了解这些,再看这个源码就会轻松许多。

关于整体的规划,你第一看会觉得它的源码很绕,为什么会这样?由于有中止操作+自旋锁+状况的规划 ,它的这种规划就根本能够说是优化代码到极致,比方说状况的规划,就比普通的能省内存,能更便利经过CAS操作。用自旋便是达观锁,能节省资源等。有中止操作,能让整个系统更灵活。相对的缺陷便是不安全,什么意思呢?已是便是这样写代码很简略出BUG,所以这儿的让人觉得很绕的代码,便是许多的状况的判别,这些都是为了保证这个流程的安全。

5. 总结

从部分源码的视点去剖析,得到的结论是线程池也或许会导致OOM

那再考虑一个问题:不断的创立线程池,“必定”会导致OOM吗? 假如你对线程池现已有必定的了解,相信你也知道这个问题的答案。

本文正在参加「金石方案」