开启成长之旅!这是我参与「日新计划 12 月更文挑战」的第19天,点击查看活动概况

1.CoroutineScope

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
fun coroutineScopeTest() {
    val scope = CoroutineScope(Job())
    scope.launch {
    }
    scope.async {
    }
}

asynclaunch的扩展接收者都是CoroutineScope,这就意味着他们等价于CoroutineScope的成员办法,假如要调用就必须先获取到CoroutineScope的目标。

public interface CoroutineScope {
	/**
	 * 此效果域的上下文
	 * Context被效果域封装,用于完结作为效果域扩展的协程构建器
	 * 不主张在普通代码中访问此特点,除非访问[Job]实例以取得高档用法
	 */
    public val coroutineContext: CoroutineContext
}

CoroutineScope是一个接口,这个接口所做的也仅仅对CoroutineContext做了一层封装而已。CoroutineScope最大的效果便是能够方便的批量的操控协程,例如结构化并发。

2.CoroutineScope与结构化并发

fun coroutineScopeTest() {
	val scope = CoroutineScope(Job())
	scope.launch {
		launch {
			delay(1000000L)
			logX("ChildLaunch 1")
		}
		logX("Hello 1")
		delay(1000000L)
		logX("Launch 1")
	}
	scope.launch {
		launch {
			delay(1000000L)
			logX("ChildLaunch 2")
		}
		logX("Hello 2")
		delay(1000000L)
		logX("Launch 2")
	}
	Thread.sleep(1000L)
	scope.cancel()
}
//输出成果:
//================================
//Hello 2 
//Thread:DefaultDispatcher-worker-2
//================================
//================================
//Hello 1 
//Thread:DefaultDispatcher-worker-1
//================================

上面的代码完结了结构化,仅仅创立了CoroutineScope(Job())和利用launch发动了几个协程就完结了结构化,结构如图所示,那么它的父子结构是怎么树立的?

【Kotlin回顾】19.Kotlin协程—CoroutineScope是如何管理协程的

3.父子联系是怎么树立的

这儿要先阐明一下为什么CoroutineScope是一个接口,可是在创立的时分却能够以结构函数的办法运用。在Kotlin中的命名规则是以【驼峰法】为主的,在特别状况下是能够打破这个规则的,CoroutineScope便是一个特别的状况,它是一个顶层函数但它发挥的效果却是结构函数,相同的还有Job(),它也是顶层函数,在Kotlin中当顶层函数被用作结构函数的时分首字母都是大写的。

再来看一下CoroutineScope作为结构函数运用时的源码

/**
 * 创立一个[CoroutineScope],包装给定的协程[context]。
 * 
 * 假如给定的[context]不包括[Job]元素,则创立一个默认的' Job() '。
 * 
 * 这样,任何子协程在这个范围或[撤销][协程]失利。就像在[coroutineScope]块中一样,
 * 效果域自身会撤销效果域的所有子效果域。
 */
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
	ContextScope(if (context[Job] != null) context else context + Job())

结构函数的CoroutineScope传入一个参数,这个参数假如包括Job元素则直接运用,假如不包括Job则会创立一个新的Job,这就阐明每一个coroutineScope目标中的 Context中必定会存在一个Job目标。而在创立一个CoroutineScope目标时这个Job()是一定要传入的,由于CoroutineScope便是经过这个Job()目标办理协程的。

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

上面的代码是launch的源码,剖析一下LazyStandaloneCoroutineStandaloneCoroutine

private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
    override fun handleJobException(exception: Throwable): Boolean {
        handleCoroutineException(context, exception)
        return true
    }
}
private class LazyStandaloneCoroutine(
    parentContext: CoroutineContext,
    block: suspend CoroutineScope.() -> Unit
) : StandaloneCoroutine(parentContext, active = false) {
    private val continuation = block.createCoroutineUnintercepted(this, this)
    override fun onStart() {
        continuation.startCoroutineCancellable(this)
    }
}

StandaloneCoroutineAbstractCoroutine子类,AbstractCoroutine协程的抽象类, 里面的参数initParentJob = true表明协程创立之后需求初始化协程的父子联系。LazyStandaloneCoroutineStandaloneCoroutine的子类,active=false使命它是以懒加载的办法创立协程。

public abstract class AbstractCoroutine<in T>(
	parentContext: CoroutineContext,
	initParentJob: Boolean,
	active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
	init {
		/**
		 * 在上下文中的父协程和当时协程之间树立父子联系
		 * 假如父协程现已被撤销他可能导致当时协程也被撤销
		 * 假如协程从onCancelled或许onCancelling内部操作其状况,
		 * 那么此时树立父子联系是风险的
		 */
		if (initParentJob) initParentJob(parentContext[Job])
	}
}

AbstractCoroutine是一个抽象类他承继了JobSupport,而JobSupportJob的详细完结。

init函数中依据initParentJob判别是否树立父子联系,initParentJob的默认值是true因而if中的initParentJob()函数是一定会履行的,这儿的parentContext[Job]取出的的Job便是在Demo中传入的Job

initParentJobJobSupport中的办法,由于AbstractCoroutine承继自JobSupport,所以进入JobSupport剖析这个办法。

public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 {
	final override val key: CoroutineContext.Key<*> get() = Job
	/**
	 * 初始化父类的Job
	 * 在所有初始化之后最多调用一次
	 */
	protected fun initParentJob(parent: Job?) {
		assert { parentHandle == null }
		//①
		if (parent == null) {
			parentHandle = NonDisposableHandle
			return
		}
		//②
		parent.start() // 确保父协程现已发动
		@Suppress("DEPRECATION")
		//③
		val handle = parent.attachChild(this)
		parentHandle = handle
		// 查看注册的状况
		if (isCompleted) {
			handle.dispose()
			parentHandle = NonDisposableHandle 
		}
	}
}

上面的源码initParentJob中添加了三处注释,现在分别对这三处注释进行剖析:

  • if (parent == null): 这儿是对是否存在父Job的判别,假如不存在则不再进行后边的作业,也就谈不上树立父子联系了。由于在Demo中传递了Job()因而这儿的父Job是存在的,所以代码能够继续履行。
  • parent.start(): 这儿确保parent对应的Job发动了;
  • parent.attachChild(this): 这儿便是将子Job添加到父Job中,使其成为parent的子Job这儿其实便是树立了父子联系。

用一句话来归纳这个联系便是:每一个协程都有一个Job,每一个Job又有一个父Job和多个子Job,能够看做是一个树状结构。这个联系能够用下面这张图表明:

【Kotlin回顾】19.Kotlin协程—CoroutineScope是如何管理协程的

4.结构化是怎么撤销的

结构化能够被创立的一起CoroutineScope还提供了可撤销的函数,Demo中经过scope.cancel()撤销了协程,它的流程又是怎样的呢?先从scope.cancel中的cancel看起

/**
 * 撤销这个scope,包括当时Job和子Job
 * 假如没有Job,可抛出反常IllegalStateException
 */
public fun CoroutineScope.cancel(cause: CancellationException? = null) {
    val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
    job.cancel(cause)
}

scope.cancel又是经过job.cancel撤销的,这个cancel详细完结是在JobSupport

public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 {
	...
	public override fun cancel(cause: CancellationException?) {
		cancelInternal(cause ?: defaultCancellationException())
	}
	public open fun cancelInternal(cause: Throwable) {
		cancelImpl(cause)
	}
	/**
	 * 当cancelChild被调用的时分cause是Throwable或许ParentJob
	 * 假如反常现已被处理则回来true,不然回来false
	 */
	internal fun cancelImpl(cause: Any?): Boolean {
		var finalState: Any? = COMPLETING_ALREADY
		if (onCancelComplete) {
			// 确保它正在完结,假如回来状况是 cancelMakeCompleting 阐明它现已完结
			finalState = cancelMakeCompleting(cause)
			if (finalState === COMPLETING_WAITING_CHILDREN) return true
		}
		if (finalState === COMPLETING_ALREADY) {
			//转换到撤销状况,当完结时调用afterCompletion
			finalState = makeCancelling(cause)
		}
		return when {
			finalState === COMPLETING_ALREADY -> true
			finalState === COMPLETING_WAITING_CHILDREN -> true
			finalState === TOO_LATE_TO_CANCEL -> false
			else -> {
				afterCompletion(finalState)
				true
			}
		}
	}
	/**
	 * 假如没有需求协程体完结的任务回来true并立即进入完结状况等候子类完结
	 * 这儿代表的是当时Job是否有协程体需求履行
	 */
	internal open val onCancelComplete: Boolean get() = false
}

job.cancel终究调用的是JobSupport中的cancelImpl。这儿它分为两种状况,判别依据是onCancelComplete,代表的便是当时Job是否有协程体需求履行,假如没有则回来true。这儿的Job是自己创立的且没有需求履行的协程代码因而回来成果是true,所以就履行cancelMakeCompleting表达式。

private fun cancelMakeCompleting(cause: Any?): Any? {
	loopOnState { state ->
		...
		val finalState = tryMakeCompleting(state, proposedUpdate)
		if (finalState !== COMPLETING_RETRY) return finalState
	}
}
private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {
	...
	return tryMakeCompletingSlowPath(state, proposedUpdate)
}
private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
	//获取状况列表或提升为列表以正确操作子列表
	val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY
	...
	notifyRootCause?.let { notifyCancelling(list, it) }
	...
	return finalizeFinishingState(finishing, proposedUpdate)
}

进入cancelMakeCompleting后经过多次流转终究会调用tryMakeCompletingSlowPath中的notifyCancelling,在这个函数中才是履行子Job和父Job撤销的终究流程

private fun notifyCancelling(list: NodeList, cause: Throwable) {
	//首先撤销子Job
	onCancelling(cause)
	//告诉子Job
	notifyHandlers<JobCancellingNode>(list, cause)
	// 之后撤销父Job
	cancelParent(cause) // 试探性撤销——假如没有parent也没联系
}
private inline fun <reified T: JobNode> notifyHandlers(list: NodeList, cause: Throwable?) {
	var exception: Throwable? = null
	list.forEach<T> { node ->
		try {
			node.invoke(cause)
		} catch (ex: Throwable) {
			exception?.apply { addSuppressedThrowable(ex) } ?: run {
				exception =  CompletionHandlerException("Exception in completion handler $node for $this", ex)
			}
		}
	}
	exception?.let { handleOnCompletionException(it) }
}

notifyHandlers中的流程便是遍历当时Job的子Job,并将撤销的cause传递曩昔,这儿的invoke()终究会调用 ChildHandleNodeinvoke()办法

public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 {
	...
	internal class ChildHandleNode(
		@JvmField val childJob: ChildJob
	) : JobCancellingNode(), ChildHandle {
		override val parent: Job get() = job
		override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
		override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
	}
	public final override fun parentCancelled(parentJob: ParentJob) {
		cancelImpl(parentJob)
	}
}

childJob.parentCancelled(job)的调用终究调用的是JobSupport中的parentCanceled()函数,然后又回到了cancelImpl()中,也便是 Job 撤销的进口函数。这实际上就相当于在做递归调用

Job撤销完结后接着便是撤销父Job了,进入到cancelParent()函数中

/**
 * 撤销Job时调用的办法,以便可能将撤销传播到父类。
 * 假如父协程负责处理反常,则回来' true ',不然回来' false '。
 */ 
private fun cancelParent(cause: Throwable): Boolean {
	// Is scoped coroutine -- don't propagate, will be rethrown
	if (isScopedCoroutine) return true
	/* 
	* CancellationException被认为是“正常的”,当子协程发生它时父协程通常不会被撤销。
    * 这允许父协程撤销它的子协程(通常状况下),而自身不会被撤销,
    * 除非子协程在其完结期间溃散并发生其他反常。
    */
	val isCancellation = cause is CancellationException
	val parent = parentHandle
	if (parent === null || parent === NonDisposableHandle) {
		return isCancellation
	}
	// 职责链形式
	return parent.childCancelled(cause) || isCancellation
}
/**
 * 在这个办法中,父类决定是否撤销自己(例如在重大故障上)以及是否处理子类的反常。
 * 假如反常被处理,则回来' true ',不然回来' false '(调用者负责处理反常)
 */
public open fun childCancelled(cause: Throwable): Boolean {
	if (cause is CancellationException) return true
	return cancelImpl(cause) && handlesException
}

cancelParent的回来成果运用了职责链形式, 假如回来【true】表明父协程处理了反常,回来【false】则表明父协程没有处理反常。

当反常是CancellationException时假如是子协程发生的父协程不会撤销,或许说父协程会疏忽子协程的撤销反常,假如是其他反常父协程就会响应子协程的撤销了。

5.总结

  • 每次创立CoroutineScope时都会确保CoroutineContext中一定存在Job元素,而CoroutineScope便是经过Job来办理协程的;
  • 每次经过launchasync发动(创立)协程时都会创立AbstractCoroutine的子类,然后经过initParentJob()函数树立协程的父子联系。每个协程都会对应一个Job,每个Job都会由一个父Job以及多个子Job,这是一个N叉树结构。
  • 由于是一个树结构因而协程的撤销以及反常的传播都是按照这个结构进行传递。当撤销Job时都会告诉自己的父Job和子Job,撤销子Job终究是以递归的办法传递给每一个Job。协程在向上撤销父Job时经过职责链形式一步一步的传递到最顶层的协程,一起假如子Job发生CancellationException反常时父Job会将其疏忽,假如是其他反常父Job则会响应这个反常。
  • 关于CancellationException引起的撤销只会向下传递撤销子协程;关于其他反常引起的撤销既向上传递也向下传递,终究会使所有的协程被撤销。