前言
在之前文章介绍协程时,咱们说过协程有个特性便是结构化并发,这是因为协程是具有父子联系的,撤销父协程,会让一切子协程都撤销,这能够有用避免内存走漏。
那本篇文章就来看看结构化并发的原理。
正文
在协程结构的中层概念中,CoroutineScope
便是完结结构化并发的要害,其实从字面意思也非常好理解,协程效果域,也便是规定了一个效果域,能够批量办理一个效果域内的一切协程。
为什么有CoroutineScope
其实越是到后边,越简单串起来整个协程结构的知识,让知识构成体系。
咱们这儿回忆一下发动协程的2个API:launch
和async
:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
...
}
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
...
}
这儿发现它们都是CoroutineScope
的扩展函数,这儿为什么要规划为CoroutineScope
的扩展函数呢?
其实不然,在早期的协程API,这2个函数还真不是CoroutineScope
的扩展函数,假如是运用早期API,伪代码如下:
// 运用协程最初的API,仅仅伪代码
private fun testScopeJob() {
val job = Job()
launch(job){
launch {
delay(1000000L)
logX("Inner")
}
logX("Hello!")
delay(1000000L)
logX("World!") // 不会执行
}
launch(job){
launch {
delay(1000000L)
logX("Inner!!!")
}
logX("Hello!!!")
delay(1000000L)
logX("World1!!!") // 不会执行
}
Thread.sleep(500L)
job.cancel()
}
这儿想完结结构化并发,咱们不得不创立一个Job
目标,然后传入launch
中作为参数,可是开发者可能会忘记传输这个参数,所以就会打破结构化联系。
所今后边开展就专门规划出CoroutineScope
来办理协程批量处理,并且把launch
和async
都作为该类的扩展函数,这样就不会有前面所说的忘记传递参数然后导致的非结构联系。
原理剖析
从前面协程API的迭代就能够看出,其实起效果的还是Job
,而CoroutineScope
中包含了协程上下文,协程上下文又包含了Job
,所以咱们还是以launch{}
发动协程为例,来剖析其结构化并发的原理。
创立父子联系
这儿咱们写出下面示例代码:
private fun testScope() {
//新建一个CoroutineScope
val scope = CoroutineScope(Job())
//因为launch是CoroutineScope的扩展函数
scope.launch{
//block函数类型参数的接收者是CoroutineScope
launch {
delay(1000000L)
logX("Inner") // 不会执行
}
logX("Hello!")
delay(1000000L)
logX("World!") // 不会执行
}
Thread.sleep(500L)
// 2
scope.cancel()
}
在上面代码中,有个值得留意的地方,便是launch
办法不只仅是CoroutineScope
的扩展函数,它的block
类型是:suspend CoroutineScope.() -> Unit
,所以在协程体中,咱们依旧能够调用launch
办法。
这儿咱们创立了一个CoroutineScope
,这儿咱们来看一下这个办法源码:
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
ContextScope(if (context[Job] != null) context else context + Job())
internal class ContextScope(context: CoroutineContext) : CoroutineScope {
override val coroutineContext: CoroutineContext = context
override fun toString(): String = "CoroutineScope(coroutineContext=$coroutineContext)"
}
能够发现CoroutineScope()
是一个顶层函数,同理函数体内部的Job()
也是一个顶层函数,这儿还有一个小知识点:当顶层函数作为”结构函数”来运用时,这个函数的命名能够不运用驼峰命名法,而是以大写开端。
这儿回来的是CoroutineScope
,在前面文章咱们知道它是对CoroutineContext
的封装:
public interface CoroutineScope
public val coroutineContext: CoroutineContext
}
在CoroutineScope()
办法中,经过context[Job]
就能够取出保存在context
中的Job
目标,假如没有Job
目标的话,就创立一个Job
目标传入到context
中,这阐明一件事,每一个CoroutineScope
目标,它的context
当中必定存在一个Job
目标。
一起也阐明在调用CroutineScope()
办法时,也能够不传Job
目标。
接着咱们持续看launch
的源码:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
//注释1 承继父协程的上下文
val newContext = newCoroutineContext(context)
//注释2 创立协程目标
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
//注释3 开端协程
coroutine.start(start, coroutine, block)
return coroutine
}
这儿的注释1和3,分别在前面文章剖析launch
发动以及线程调度都剖析过了,现在轮到了注释2:
//Standalone翻译便是独立,即独立运转的协程
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
//承继协程抽象类,泛型为Unit,initParentJob为true
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}
//懒惰的协程
private class LazyStandaloneCoroutine(
parentContext: CoroutineContext,
block: suspend CoroutineScope.() -> Unit
//承继至上面的类,initParentJob还是为true
) : StandaloneCoroutine(parentContext, active = false) {
private val continuation = block.createCoroutineUnintercepted(this, this)
override fun onStart() {
continuation.startCoroutineCancellable(this)
}
}
能够发现StandaloneCoroutine
是AbstractCoroutine
的子类,在前面文章中咱们说过这个能够当作是代表协程的抽象类,在调用其结构函数时,第二个参数initParentJob
参数,一向为true
,其实便是代表了协程创立今后,需求初始化协程的父子联系。
AbstractCoroutine
结构函数如下:
public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
init {
if (initParentJob) initParentJob(parentContext[Job])
}
}
该类的承继联系,在上一篇文章中咱们要点剖析了承继Continuation
分支的,主要是用来调用intercepted()
来拦截其线程调度器,本篇文章要点便是其JobSupport
类。
这儿initParentJob
参数咱们从前面可知,这儿必为true
,即需求初始化父子联系,其间initParentJob()
函数界说在JobSupport
类中:
public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0
JobSupport
内容比较多,能够把它当作一个具体化的Job
完结,这是因为关于Job
的各种操作,都是在该类中完结的。
下面是initParentJob
办法:
//parent便是父协程的Job
protected fun initParentJob(parent: Job?) {
assert { parentHandle == null }
//当没有父协程时,不需求创立和父协程的联系
if (parent == null) {
parentHandle = NonDisposableHandle
return
}
//保证父协程已经发动了
parent.start()
//把当时Job添加到父Job中
val handle = parent.attachChild(this)
parentHandle = handle
// now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
if (isCompleted) {
handle.dispose()
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
}
上面代码比较简单,看注释即可,所以咱们能够把协程当作一颗N叉树,每一个协程都对应一个Job
目标,而每一个Job
能够有一个父Jo
b和多个多个子Job
。
结构化撤销
已然Job
的联系如上图中的N叉树,所以结构化撤销原理其实也便是事情传递了。
当某个Job
收到撤销事情时,需求告知其上下级。这个规划思路,就和咱们公司架构一样,当某个人需求告知重要事情时,能够先告知其部属,再告知其领导,经过循环迭代然后能够让整个公司都知道。
咱们能够想象出其撤销协程的代码应该如下:
fun Job.cancelJob() {
//告知子Job撤销
children.forEach {
cancelJob()
}
//告知父Job撤销
notifyParentCancel()
}
当然这是仅仅简化的伪代码,实在代码杂乱很多,可是原理差不多。
咱们先来看一下CoroutineScope
的cancel
函数的代码:
public fun CoroutineScope.cancel(cause: CancellationException? = null) {
val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
job.cancel(cause)
}
正常状况下,咱们调用scope.cancel()
时,一般都不会传递参数,假如要传递额外阐明参数,这儿必须是CancellationException
类型的。
在办法完结中,咱们会发现实在是调用Job
的cancel()
办法,该办法的完结便是在前面所说的JobSupport
类中:
//外部带原因的撤销,内部不能隐式调用
public override fun cancel(cause: CancellationException?) {
cancelInternal(cause ?: defaultCancellationException())
}
这个办法是供外部来调用的,这儿留意当cause
为空时,这儿在调用cancelInternal
时会传入一个默许的CancellationException
完结:
public open fun cancelInternal(cause: Throwable) {
cancelImpl(cause)
}
该办法由办法名能够看出是内部调用,这种规划思路,咱们在平常也能够运用,办法的拜访权限要严厉分隔,该办法的参数类型是Throwable
类型,会调用下面办法:
internal fun cancelImpl(cause: Any?): Boolean {
var finalState: Any? = COMPLETING_ALREADY
if (onCancelComplete) {
//1
finalState = cancelMakeCompleting(cause)
if (finalState === COMPLETING_WAITING_CHILDREN) return true
}
if (finalState === COMPLETING_ALREADY) {
//2
finalState = makeCancelling(cause)
}
return when {
finalState === COMPLETING_ALREADY -> true
finalState === COMPLETING_WAITING_CHILDREN -> true
finalState === TOO_LATE_TO_CANCEL -> false
else -> {
afterCompletion(finalState)
true
}
}
}
在该办法中,cause
的类型是Any?
,其实从源码注释咱们能够知道该参数可能是Throwable
,也可能是一个ParentJob
,第二种状况只会在cancelChild
办法被调用时传入。
并且该办法回来true
则表明反常被处理,不然表明没有被处理。
那么为什么该类中有这么多状况判断呢?原因非常简单,因为Job
的状况改变是一个持续进程,只要子Job
都撤销完结后,该Job
才能算完结了。
所以这儿会调用注释1的cancelMakeComplting
办法:
private fun cancelMakeCompleting(cause: Any?): Any? {
loopOnState { state ->
// 省掉部分
val finalState = tryMakeCompleting(state, proposedUpdate)
if (finalState !== COMPLETING_RETRY) return finalState
}
}
从办法名中的Completing
为完结、完整的意思就能够看出,这个进程是一个持续的进程,这儿有一个循环办法loopOnState
,咱们能够在日常项目中学习一下:
private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
while (true) {
block(state)
}
}
这儿的中心还是调用tryMakeCompleting
办法:
private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {
if (state !is Incomplete)
return COMPLETING_ALREADY
...
// 省掉部分
return COMPLETING_RETRY
}
...
return tryMakeCompletingSlowPath(state, proposedUpdate)
}
经过源码注释,咱们可知该办法会回来状况,并且是已完结的状况,比如这儿的COMPLETING_ALREADY
、COMPLETING_RETRY
等,一起在该办法平分出了2个分支。
一个是快速回来分支,当该Job
没有子Job
,能够立即回来。当有子Job
时,才会调用tryMakeCompletingSlowPath
办法,这也是简化函数逻辑的一种常见手段,办法如下:
private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
// 省掉部分
notifyRootCause?.let { notifyCancelling(list, it) }
return finalizeFinishingState(finishing, proposedUpdate)
}
这儿代码调用比较杂乱,咱们能够不用重视,终究会调用notifyCancelling
办法,这个才是最要害的代码。
前面为什么调用一个撤销要附带这么多状况判断,也是因为Job
需求办理协程的状况,即只要子Job
都完结时,父Job
才算完结,所以这是一个持续进程。
咱们看一下这个中心办法:
private fun notifyCancelling(list: NodeList, cause: Throwable) {
onCancelling(cause)
// 1,告知子Job
notifyHandlers<JobCancellingNode>(list, cause)
// 2,告知父Job
cancelParent(cause)
}
这个办法和咱们前面所说的伪代码逻辑基本共同了,咱们分别来看看其间的逻辑:
//告知子Job进行撤销
private inline fun <reified T: JobNode> notifyHandlers(list: NodeList, cause: Throwable?) {
var exception: Throwable? = null
list.forEach<T> { node ->
try {
//调用每个子Job的invoke办法
node.invoke(cause)
} catch (ex: Throwable) {
exception?.apply { addSuppressedThrowable(ex) } ?: run {
exception = CompletionHandlerException("Exception in completion handler $node for $this", ex)
}
}
}
exception?.let { handleOnCompletionException(it) }
}
这儿便是遍历当时Job
的子Job
,并且将撤销的case
传递曩昔,这儿的invoke()
终究会调用ChildHandleNode
的invoke()
办法:
//这儿是Node类型,也旁边面阐明了Job是树结构
internal class ChildHandleNode(
@JvmField val childJob: ChildJob
) : JobCancellingNode(), ChildHandle {
override val parent: Job get() = job
//调用parentCancelled办法
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
}
//JobSupport中完结
public final override fun parentCancelled(parentJob: ParentJob) {
cancelImpl(parentJob)
}
而这儿代码终究会调用cancelImpl()
办法,即对应了前面所说的该办法参数可能是一个Job
,一起也阐明这是一个递归调用,一向会调用到没有子Job
的Job
。
咱们接着看一下如何告知父Job
:
private fun cancelParent(cause: Throwable): Boolean {
if (isScopedCoroutine) return true
//是否是CancellationException反常
val isCancellation = cause is CancellationException
val parent = parentHandle
if (parent === null || parent === NonDisposableHandle) {
return isCancellation
}
// 1
return parent.childCancelled(cause) || isCancellation
}
留意注释1的回来值,这个回来值是有意义的,回来true
代表父协程处理了反常,而回来false
,代表父协程没有处理反常。
该办法代码如下:
public open fun childCancelled(cause: Throwable): Boolean {
//特别处理撤销反常
if (cause is CancellationException) return true
return cancelImpl(cause) && handlesException
}
这儿咱们发现当反常是CancellationException
的时候,协程是会进行特别处理的。一般来说,父协程会疏忽子协程的撤销反常,当是其他反常时,那么父协程就会呼应子协程的撤销了。这时又会调用cancelImpl()
,来持续递归调用。
这儿咱们再结合前面文章所说的协程反常处理,咱们就说过关于CancellationException
反常要特别处理,一般都是要抛出去,这儿咱们就能够看到原因了,原来协程的结构化撤销,是需求依靠这个反常的。
这也就阐明一件事,当出现CancellationException
反常时,只会向下传达,来到达结构化撤销的效果;可是当是其他反常时,则会双向传递,如下图:
SupervisorJob
原理
在之前文章,咱们说过一个特别的Job
,便是SupervisorJob
,它能够避免子协程反常的蔓延,这时咱们就能够知道其完结原理了:
//顶层函数当结构函数运用
public fun SupervisorJob(parent: Job? = null) : CompletableJob = SupervisorJobImpl(parent)
//这儿一向回来false
private class SupervisorJobImpl(parent: Job?) : JobImpl(parent) {
override fun childCancelled(cause: Throwable): Boolean = false
}
根据上面源码剖析,这个childCancelled
办法是用来对上陈述的,这儿直接回来false
且不处理,也便是不论是什么反常都不会蔓延到其他兄弟Job
。
总结
本篇文章触及的代码跳转较多,咱们做个总结:
- 每次创立
CoroutineScope
的时候,它的内部会保证CoroutineContext
当中一定有Job
元素,而CoroutineScope
便是经过这个Job
目标来办理协程的。 - 在咱们经过
launch
、async
创立协程的时候,会一起创立AbstractCoroutine
的子类,在它的initParentJob()
办法中,会树立父子联系。每个协程都会对应一个Job
,而每个Job
都会有一个父Job
,多个子Job
。终究他们会构成一个N叉树的结构。 - 因为协程是一个N叉树的结构,因而协程的撤销事情以及反常传达,也会按照这个结构进行传递。每个
Job
撤销的时候,都会告知自己的子Job
和父Job
,终究以递归的形式传递给每一个子协程。 - 协程向上撤销父
Job
的时候,还利用了职责链形式,保证撤销事情能够一步步传递到顶层的协程。这儿还有一个细节便是,默许状况下,父协程会疏忽子协程的CancellationException
。 - 关于其他反常,父协程不只会呼应,还会造成其他兄弟
Job
出现反常,所以这儿能够运用SupervisorJob
来阻断反常的向上传递。