前言

在之前文章介绍协程时,咱们说过协程有个特性便是结构化并发,这是因为协程是具有父子联系的,撤销父协程,会让一切子协程都撤销,这能够有用避免内存走漏。

那本篇文章就来看看结构化并发的原理。

正文

在协程结构的中层概念中,CoroutineScope便是完结结构化并发的要害,其实从字面意思也非常好理解,协程效果域,也便是规定了一个效果域,能够批量办理一个效果域内的一切协程。

为什么有CoroutineScope

其实越是到后边,越简单串起来整个协程结构的知识,让知识构成体系。

咱们这儿回忆一下发动协程的2个API:launchasync

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    ...
}
public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    ...
}

这儿发现它们都是CoroutineScope的扩展函数,这儿为什么要规划为CoroutineScope的扩展函数呢?

其实不然,在早期的协程API,这2个函数还真不是CoroutineScope的扩展函数,假如是运用早期API,伪代码如下:

// 运用协程最初的API,仅仅伪代码
private fun testScopeJob() {
    val job = Job()
    launch(job){
        launch {
            delay(1000000L)
            logX("Inner")
        }
        logX("Hello!")
        delay(1000000L)
        logX("World!")  // 不会执行
    }
    launch(job){
        launch {
            delay(1000000L)
            logX("Inner!!!")
        }
        logX("Hello!!!")
        delay(1000000L)
        logX("World1!!!")  // 不会执行
    }
    Thread.sleep(500L)
    job.cancel()
}

这儿想完结结构化并发,咱们不得不创立一个Job目标,然后传入launch中作为参数,可是开发者可能会忘记传输这个参数,所以就会打破结构化联系。

所今后边开展就专门规划出CoroutineScope来办理协程批量处理,并且把launchasync都作为该类的扩展函数,这样就不会有前面所说的忘记传递参数然后导致的非结构联系。

原理剖析

从前面协程API的迭代就能够看出,其实起效果的还是Job,而CoroutineScope中包含了协程上下文,协程上下文又包含了Job,所以咱们还是以launch{}发动协程为例,来剖析其结构化并发的原理。

创立父子联系

这儿咱们写出下面示例代码:

private fun testScope() {
    //新建一个CoroutineScope
    val scope = CoroutineScope(Job())
    //因为launch是CoroutineScope的扩展函数
    scope.launch{
        //block函数类型参数的接收者是CoroutineScope
        launch {
            delay(1000000L)
            logX("Inner")  // 不会执行
        }
        logX("Hello!")
        delay(1000000L)
        logX("World!")  // 不会执行
    }
    Thread.sleep(500L)
    // 2
    scope.cancel()
}

在上面代码中,有个值得留意的地方,便是launch办法不只仅是CoroutineScope的扩展函数,它的block类型是:suspend CoroutineScope.() -> Unit,所以在协程体中,咱们依旧能够调用launch办法。

这儿咱们创立了一个CoroutineScope,这儿咱们来看一下这个办法源码

public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
    ContextScope(if (context[Job] != null) context else context + Job())
internal class ContextScope(context: CoroutineContext) : CoroutineScope {
    override val coroutineContext: CoroutineContext = context
    override fun toString(): String = "CoroutineScope(coroutineContext=$coroutineContext)"
}

能够发现CoroutineScope()是一个顶层函数,同理函数体内部的Job()也是一个顶层函数,这儿还有一个小知识点:当顶层函数作为”结构函数”来运用时,这个函数的命名能够不运用驼峰命名法,而是以大写开端。

这儿回来的是CoroutineScope,在前面文章咱们知道它是对CoroutineContext的封装:

public interface CoroutineScope 
    public val coroutineContext: CoroutineContext
}

CoroutineScope()办法中,经过context[Job]就能够取出保存在context中的Job目标,假如没有Job目标的话,就创立一个Job目标传入到context中,这阐明一件事,每一个CoroutineScope目标,它的context当中必定存在一个Job目标。

一起也阐明在调用CroutineScope()办法时,也能够不传Job目标。

接着咱们持续看launch的源码:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    //注释1 承继父协程的上下文
    val newContext = newCoroutineContext(context)
    //注释2 创立协程目标
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    //注释3 开端协程
    coroutine.start(start, coroutine, block)
    return coroutine
}

这儿的注释1和3,分别在前面文章剖析launch发动以及线程调度都剖析过了,现在轮到了注释2:

//Standalone翻译便是独立,即独立运转的协程
private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
    //承继协程抽象类泛型为Unit,initParentJob为true
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
    override fun handleJobException(exception: Throwable): Boolean {
        handleCoroutineException(context, exception)
        return true
    }
}
//懒惰的协程
private class LazyStandaloneCoroutine(
    parentContext: CoroutineContext,
    block: suspend CoroutineScope.() -> Unit
    //承继至上面的类,initParentJob还是为true
) : StandaloneCoroutine(parentContext, active = false) {
    private val continuation = block.createCoroutineUnintercepted(this, this)
    override fun onStart() {
        continuation.startCoroutineCancellable(this)
    }
}

能够发现StandaloneCoroutineAbstractCoroutine的子类,在前面文章中咱们说过这个能够当作是代表协程的抽象类,在调用其结构函数时,第二个参数initParentJob参数,一向为true,其实便是代表了协程创立今后,需求初始化协程的父子联系。

AbstractCoroutine结构函数如下:

public abstract class AbstractCoroutine<in T>(
    parentContext: CoroutineContext,
    initParentJob: Boolean,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
    init {
        if (initParentJob) initParentJob(parentContext[Job])
    }
 }

该类的承继联系,在上一篇文章中咱们要点剖析了承继Continuation分支的,主要是用来调用intercepted()来拦截其线程调度器,本篇文章要点便是其JobSupport类。

这儿initParentJob参数咱们从前面可知,这儿必为true,即需求初始化父子联系,其间initParentJob()函数界说在JobSupport类中:

public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0

JobSupport内容比较多,能够把它当作一个具体化的Job完结,这是因为关于Job的各种操作,都是在该类中完结的。

下面是initParentJob办法:

//parent便是父协程的Job
protected fun initParentJob(parent: Job?) {
    assert { parentHandle == null }
    //当没有父协程时,不需求创立和父协程的联系
    if (parent == null) {
        parentHandle = NonDisposableHandle
        return
    }
    //保证父协程已经发动了
    parent.start()
    //把当时Job添加到父Job中
    val handle = parent.attachChild(this)
    parentHandle = handle
    // now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
    if (isCompleted) {
        handle.dispose()
        parentHandle = NonDisposableHandle // release it just in case, to aid GC
    }
}

上面代码比较简单,看注释即可,所以咱们能够把协程当作一颗N叉树,每一个协程都对应一个Job目标,而每一个Job能够有一个父Job和多个多个子Job

协程(21) | 结构化并发原理解析

结构化撤销

已然Job的联系如上图中的N叉树,所以结构化撤销原理其实也便是事情传递了。

当某个Job收到撤销事情时,需求告知其上下级。这个规划思路,就和咱们公司架构一样,当某个人需求告知重要事情时,能够先告知其部属,再告知其领导,经过循环迭代然后能够让整个公司都知道。

咱们能够想象出其撤销协程的代码应该如下:

fun Job.cancelJob() {
    //告知子Job撤销
    children.forEach {
        cancelJob()
    }
    //告知父Job撤销
    notifyParentCancel()
}

当然这是仅仅简化的伪代码,实在代码杂乱很多,可是原理差不多。

咱们先来看一下CoroutineScopecancel函数的代码:

public fun CoroutineScope.cancel(cause: CancellationException? = null) {
    val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
    job.cancel(cause)
}

正常状况下,咱们调用scope.cancel()时,一般都不会传递参数,假如要传递额外阐明参数,这儿必须是CancellationException类型的。

在办法完结中,咱们会发现实在是调用Jobcancel()办法,该办法的完结便是在前面所说的JobSupport类中:

//外部带原因的撤销,内部不能隐式调用
public override fun cancel(cause: CancellationException?) {
    cancelInternal(cause ?: defaultCancellationException())
}

这个办法是供外部来调用的,这儿留意当cause为空时,这儿在调用cancelInternal时会传入一个默许的CancellationException完结:

public open fun cancelInternal(cause: Throwable) {
    cancelImpl(cause)
}

该办法由办法名能够看出是内部调用,这种规划思路,咱们在平常也能够运用,办法的拜访权限要严厉分隔,该办法的参数类型是Throwable类型,会调用下面办法:

internal fun cancelImpl(cause: Any?): Boolean {
    var finalState: Any? = COMPLETING_ALREADY
    if (onCancelComplete) {
        //1
        finalState = cancelMakeCompleting(cause)
        if (finalState === COMPLETING_WAITING_CHILDREN) return true
    }
    if (finalState === COMPLETING_ALREADY) {
        //2
        finalState = makeCancelling(cause)
    }
    return when {
        finalState === COMPLETING_ALREADY -> true
        finalState === COMPLETING_WAITING_CHILDREN -> true
        finalState === TOO_LATE_TO_CANCEL -> false
        else -> {
            afterCompletion(finalState)
            true
        }
    }
}

在该办法中,cause的类型是Any?,其实从源码注释咱们能够知道该参数可能是Throwable,也可能是一个ParentJob,第二种状况只会在cancelChild办法被调用时传入。

并且该办法回来true则表明反常被处理,不然表明没有被处理。

那么为什么该类中有这么多状况判断呢?原因非常简单,因为Job的状况改变是一个持续进程,只要子Job都撤销完结后,该Job才能算完结了。

所以这儿会调用注释1的cancelMakeComplting办法:

private fun cancelMakeCompleting(cause: Any?): Any? {
    loopOnState { state ->
        // 省掉部分
        val finalState = tryMakeCompleting(state, proposedUpdate)
        if (finalState !== COMPLETING_RETRY) return finalState
    }
}

从办法名中的Completing为完结、完整的意思就能够看出,这个进程是一个持续的进程,这儿有一个循环办法loopOnState,咱们能够在日常项目中学习一下:

private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
    while (true) {
        block(state)
    }
}

这儿的中心还是调用tryMakeCompleting办法:

private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {
    if (state !is Incomplete)
        return COMPLETING_ALREADY
        ...
        // 省掉部分
        return COMPLETING_RETRY
    }
    ...
    return tryMakeCompletingSlowPath(state, proposedUpdate)
}

经过源码注释,咱们可知该办法会回来状况,并且是已完结的状况,比如这儿的COMPLETING_ALREADYCOMPLETING_RETRY等,一起在该办法平分出了2个分支。

一个是快速回来分支,当该Job没有子Job,能够立即回来。当有子Job时,才会调用tryMakeCompletingSlowPath办法,这也是简化函数逻辑的一种常见手段,办法如下:

private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
    // 省掉部分
    notifyRootCause?.let { notifyCancelling(list, it) }
    return finalizeFinishingState(finishing, proposedUpdate)
}

这儿代码调用比较杂乱,咱们能够不用重视,终究会调用notifyCancelling办法,这个才是最要害的代码。

前面为什么调用一个撤销要附带这么多状况判断,也是因为Job需求办理协程的状况,即只要子Job都完结时,父Job才算完结,所以这是一个持续进程。

咱们看一下这个中心办法:

private fun notifyCancelling(list: NodeList, cause: Throwable) {
    onCancelling(cause)
    // 1,告知子Job
    notifyHandlers<JobCancellingNode>(list, cause)
    // 2,告知父Job
    cancelParent(cause)
}

这个办法和咱们前面所说的伪代码逻辑基本共同了,咱们分别来看看其间的逻辑:

//告知子Job进行撤销
private inline fun <reified T: JobNode> notifyHandlers(list: NodeList, cause: Throwable?) {
    var exception: Throwable? = null
    list.forEach<T> { node ->
        try {
            //调用每个子Job的invoke办法
            node.invoke(cause)
        } catch (ex: Throwable) {
            exception?.apply { addSuppressedThrowable(ex) } ?: run {
                exception =  CompletionHandlerException("Exception in completion handler $node for $this", ex)
            }
        }
    }
    exception?.let { handleOnCompletionException(it) }
}

这儿便是遍历当时Job的子Job,并且将撤销的case传递曩昔,这儿的invoke()终究会调用ChildHandleNodeinvoke()办法:

//这儿是Node类型,也旁边面阐明了Job是树结构
internal class ChildHandleNode(
    @JvmField val childJob: ChildJob
) : JobCancellingNode(), ChildHandle {
    override val parent: Job get() = job
    //调用parentCancelled办法
    override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
    override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
}
//JobSupport中完结
public final override fun parentCancelled(parentJob: ParentJob) {
    cancelImpl(parentJob)
}

而这儿代码终究会调用cancelImpl()办法,即对应了前面所说的该办法参数可能是一个Job,一起也阐明这是一个递归调用,一向会调用到没有子JobJob

咱们接着看一下如何告知父Job:

private fun cancelParent(cause: Throwable): Boolean {
    if (isScopedCoroutine) return true
    //是否是CancellationException反常
    val isCancellation = cause is CancellationException
    val parent = parentHandle
    if (parent === null || parent === NonDisposableHandle) {
        return isCancellation
    }
    // 1
    return parent.childCancelled(cause) || isCancellation
}

留意注释1的回来值,这个回来值是有意义的,回来true代表父协程处理了反常,而回来false,代表父协程没有处理反常。

该办法代码如下:

public open fun childCancelled(cause: Throwable): Boolean {
    //特别处理撤销反常
    if (cause is CancellationException) return true
    return cancelImpl(cause) && handlesException
}

这儿咱们发现当反常是CancellationException的时候,协程是会进行特别处理的。一般来说,父协程会疏忽子协程的撤销反常,当是其他反常时,那么父协程就会呼应子协程的撤销了。这时又会调用cancelImpl(),来持续递归调用。

这儿咱们再结合前面文章所说的协程反常处理,咱们就说过关于CancellationException反常要特别处理,一般都是要抛出去,这儿咱们就能够看到原因了,原来协程的结构化撤销,是需求依靠这个反常的。

这也就阐明一件事,当出现CancellationException反常时,只会向下传达,来到达结构化撤销的效果;可是当是其他反常时,则会双向传递,如下图:

协程(21) | 结构化并发原理解析

SupervisorJob原理

在之前文章,咱们说过一个特别的Job,便是SupervisorJob,它能够避免子协程反常的蔓延,这时咱们就能够知道其完结原理了:

//顶层函数当结构函数运用
public fun SupervisorJob(parent: Job? = null) : CompletableJob = SupervisorJobImpl(parent)
//这儿一向回来false
private class SupervisorJobImpl(parent: Job?) : JobImpl(parent) {
    override fun childCancelled(cause: Throwable): Boolean = false
}

根据上面源码剖析,这个childCancelled办法是用来对上陈述的,这儿直接回来false且不处理,也便是不论是什么反常都不会蔓延到其他兄弟Job

总结

本篇文章触及的代码跳转较多,咱们做个总结:

  1. 每次创立CoroutineScope的时候,它的内部会保证CoroutineContext当中一定有Job元素,而CoroutineScope便是经过这个Job目标来办理协程的。
  2. 在咱们经过launchasync创立协程的时候,会一起创立AbstractCoroutine的子类,在它的initParentJob()办法中,会树立父子联系。每个协程都会对应一个Job,而每个Job都会有一个父Job,多个子Job。终究他们会构成一个N叉树的结构。
  3. 因为协程是一个N叉树的结构,因而协程的撤销事情以及反常传达,也会按照这个结构进行传递。每个Job撤销的时候,都会告知自己的子Job和父Job,终究以递归的形式传递给每一个子协程。
  4. 协程向上撤销父Job的时候,还利用了职责链形式,保证撤销事情能够一步步传递到顶层的协程。这儿还有一个细节便是,默许状况下,父协程会疏忽子协程的CancellationException
  5. 关于其他反常,父协程不只会呼应,还会造成其他兄弟Job出现反常,所以这儿能够运用SupervisorJob来阻断反常的向上传递。