前语:只要在那崎岖的小路上不畏艰险英勇攀爬的人,才有期望达到光芒的顶点。 ——马克思

前语

经过前面两篇协程的学习,我相信咱们对协程的运用现已非常了解了。本着知其然更要知其之所以然的心态,很想知道它里边是怎样可以让异步代码同步化的?协程它是如何完结线程的调度的?协程的挂起和康复实质是什么?今日在这儿逐个为咱们解答。

整个Kotlin 协程学习分为三部曲,本文是第三篇:

Kotlin 协程实战进阶(一、筑基篇)

Kotlin 协程实战进阶(二、进阶篇)

Kotlin 协程实战进阶(三、原理篇)

(本文需求前面两篇文章协程的知识点作为基础)

本文大纲

Kotlin协程原理.png

协程最中心的便是挂起与康复,可是这两个名称在一定程度上面迷惑了咱们,由于这两个名词并不可以让咱们在源码上面和它的完结原理有清晰的认知。

协程的挂起实质上是办法的挂起,而办法的挂起实质上是 return,协程的康复实质上办法的康复,而康复的实质是 callback 回调。

可是咱们在Kotlin协程源码里边看不到 returncallback 回调的,其实这些都是kotlin编译器帮咱们做了,单单看kotlin的源码是看不出所以然的,需求反编译成Java文件,才干看到实质之处。

经过AS的工具栏中 Tools->kotlin->show kotlin ByteCode,得到的是java字节码,需求再点击Decompile按钮反编译成java源码:

image.png

一、协程首要结构

1.suspend fun

再来复习一下挂起函数:

  • suspend 是 Kotlin 协程最中心的关键字;
  • 运用suspend关键字润饰的函数叫作挂起函数挂起函数只能在协程体内或许在其他挂起函数内调用;
  • 被关键字 suspend 润饰的办法在编译阶段,编译器会修正办法的签名,包含回来值,润饰符,入参,办法体完结。
@GET("users/{login}")
suspend fun getUserSuspend(@Path("login") login: String): User

将上面的挂起函数反编译:

@GET("users/{login}")
@Nullable
Object getUserSuspend(@Path("login") @NotNull String var1, @NotNull Continuation var2);
  1. 反编译后你会发现多了一个Continuation参数(它便是 callback),也便是说调用挂起函数的时分需求传递一个Continuation,仅仅传递这个参数是由编译器悄然传,而不是咱们传递的。这便是挂起函数为什么只能在协程或许其他挂起函数中履行,由于只要挂起函数或许协程中才有Continuation
  2. 可是编译器怎样判别哪些办法需求 callback 呢?便是经过 suspend 关键字来区其他。suspend 润饰的办法会在编译期间被Kotlin编译器做特殊处理,编译器会认为一旦一个办法增加 suspend 关键字,有可能会导致协程暂停往下履行,所以此刻会给办法传递要给 Continuation。等办法履行完结后,经过 Continuation 回调回去,然后让协程康复,持续往下履行。
  3. 它还把回来值 User改成了 Object

2.Continuation

Continuation 是 Kotlin 协程中非常重要的一个概念,它表明一个挂起点之后的连续操作

//Continuation接口表明挂起点之后的连续,该挂起点回来类型为“T”的值。
public interface Continuation<in T> {
    //对应这个Continuation的协程上下文
    public val context: CoroutineContext
    //康复相应协程的履行,传递一个成功或失败的成果作为最终一个挂起点的回来值。
    public fun resumeWith(result: Result<T>)
}
//将[value]作为最终一个挂起点的回来值,康复相应协程的履行。
fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))
//康复相应协程的履行,以便在最终一个挂起点之后从头抛出[反常]。
fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
    resumeWith(Result.failure(exception))

Continuation 有一个 resumeWith 函数可以接纳 Result 类型的参数。在成果成功获取时,调用resumeWith(Result.success(value))或许调用拓宽函数resume(value);出现反常时,调用resumeWith(Result.failure(exception))或许调用拓宽函数resumeWithException(exception),这便是 Continuation 的康复调用。

Continuation相似于网络恳求回调Callback,也是一个恳求成功或失败的回调:

public interface Callback {
  //恳求失败回调
  void onFailure(Call call, IOException e);
  //恳求成功回调
  void onResponse(Call call, Response response) throws IOException;
}

3.SuspendLambda

suspend{}其实便是协程的本体,它是协程实在履行的逻辑,会创立一个SuspendLambda类,它是Continuation的完结类。

QQ图片20210724173917.png

二、协程的创立流程

1.协程的创立

规范库给咱们供给的创立协程最原始的api:

public fun <T> (suspend () -> T).createCoroutine(
    completion: Continuation<T>
): Continuation<Unit>
public fun <R, T> (suspend R.() -> T).createCoroutine(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> 

协程创立后会有两个 Contiunation,需求分清楚:

  • completion:     表明协程本体,协程履行完需求一个 Continuation 实例康复时调用
  • Contiunation<Unit>: 它是创立出来的协程的载体(suspend () -> T) 函数会被传给该实例作为协程的实践履行体

这两个Contiunation是不同的东西。传进来的 completion 实践上是协程的本体,协程履行完需求一个 Contiunation 回调履行的,所以它叫 completion;还有一个回来的 Contiunation ,它便是协程创立出来的载体,当它里边一切的resume都履行完结之后就会调用上面的 completionresumeWith()办法康复协程。

2.协程的效果域

在Androidx的Activity中模仿创立网络恳求,经过这个例子来深挖协程的原理:

lifecycleScope.launch(Dispatchers.IO) {
    val result = requestUserInfo()
    tvName.text = result
}
/**
 * 模仿恳求,2秒后回来数据
 */
suspend fun requestUserInfo(): String {
    delay(2000)
    return "result form userInfo"
}

跟进lifecycleScope源码:

val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
    get() = lifecycle.coroutineScope
val Lifecycle.coroutineScope: LifecycleCoroutineScope
    get() {
        while (true) {
            ······
            //相关声明周期的效果域完结类
            val newScope = LifecycleCoroutineScopeImpl(
                this,
                SupervisorJob() + Dispatchers.Main.immediate
            )
            //注册生命周期
            newScope.register()
            return newScope
        }
    }

效果域也是其实便是为协程界说的效果范围,为了保证一切的协程都会被追踪,Kotlin 不允许在没有运用CoroutineScope的情况下发动新的协程。
lifecycleScope经过lifecycle,SupervisorJob(),Dispatchers.Main创立一个LifecycleCoroutineScopeImpl,它是一个相关宿主生命周期的效果域。CoroutineScope绑定到这个LifecycleOwnerLifecycle 。当宿主被销毁时,这个效果域也被撤销。

3.协程的发动

进入launch()

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    //创立新的上下文
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        //推迟履行的协程
        LazyStandaloneCoroutine(newContext, block) else
        //独立的协程
        StandaloneCoroutine(newContext, active = true)
        //发动协程
    coroutine.start(start, coroutine, block)
    //回来coroutine,coroutine中完结了job接口
    return coroutine
}

参数 block: suspend CoroutineSope.() -> Unit 表明协程代码,实践上便是闭包代码块。
这儿边做了三件事:

  1. 依据context参数创立一个新的协程上下文CoroutineContext
  2. 创立Coroutine,假如发动形式为Lazy则创立LazyStandaloneCoroutine,不然创立StandaloneCoroutine
  3. coroutine.start()发动协程。

newCoroutineContext

public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    //将效果域的上下文与传入的参数合并为新的上下文
    val combined = coroutineContext + context
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers.Default else debug
}

为新协程创立上下文。经过 + 将效果域的上下文 coroutineContext 与传入的上下文 context 合并为新的上下文。它在没有指定其他调度器或 ContinuationInterceptor 时则默许运用 Dispatchers.Default

StandaloneCoroutine

private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
    override fun handleJobException(exception: Throwable): Boolean {
        //处理反常
        handleCoroutineException(context, exception)
        return true
    }
}
public abstract class AbstractCoroutine<in T>(
    protected val parentContext: CoroutineContext,
    active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
    ······
    //发动协程
    public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        initParentJob()
        //依据[start]参数发动协程
        start(block, receiver, this)
    }
}

假如不指定发动形式,则默许运用 CoroutineStart.DEFAULT,创立一个独立协程 StandaloneCoroutine,而
StandaloneCoroutine 承继了 AbstractCoroutine 类,并重写了父类的 handleJobException() 办法。AbstractCoroutine 用于在协程构建器中完结协程的笼统基类,
完结了 ContinuationJobCoroutineScope 等接口。所以 AbstractCoroutine 自身也是一个 Continuation

Coroutine承继联系.pngcoroutine.start()

start(block, receiver, this)

从上面的源码中,协程发动 coroutine.start() 办法是 AbstractCoroutine 类中完结的,这儿涉及到运算符重载,然后该办法实践上会调用 CoroutineStart#invoke() 办法 ,并把代码块和接纳者、completion等参数传到 CoroutineStart 中。

public enum class CoroutineStart {
    //···
    fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
    when (this) {
        DEFAULT -> block.startCoroutineCancellable(receiver, completion)
        ATOMIC -> block.startCoroutine(receiver, completion)
        UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
        LAZY -> Unit
    }
}

运用此协程战略将相应的块 [block] 作为协程发动。这儿的 [block] 便是协程里边履行的代码块。

  • block:    协程里边履行的代码块;
  • receiver:   接纳者;
  • completion:  协程的本体,协程履行完需求一个 Continuation 实例康复时调用

在上面 AbstractCoroutine 咱们看到 completion 传递的是 this ,也便是 AbstractCoroutine 自己,也便是 Coroutine 协程自身。所以这个 completion 便是协程本体。(这是Continuation三层包装的第一层包装)

接着进入 startCoroutineCancellable(),可以以可撤销的办法发动协程,以便在等候调度时撤销协程:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
        // 1.创立一个没有被阻拦的 Continuation
        createCoroutineUnintercepted(receiver, completion)
            // 2.增加阻拦器
            .intercepted()
            // 3.履行协程,也是调用continuation.resumeWith(result)
            .resumeCancellableWith(Result.success(Unit))
    }

这儿首要做了三件事:

  1. 创立一个新的 Continuation
  2. Continuation 加上 ContinuationInterceptor 阻拦器,也是线程调度的关键。
  3. resumeCancellableWith终究调用 continuation.resumeWith(result) 履行协程。

4.创立Continuation<Unit>

createCoroutineUnintercepted() 每次调用此函数时,都会创立一个新的可暂停核算实例。
经过回来的 Continuation<Unit> 实例上调用 resumeWith(Unit) 开端履行创立的协程。

#IntrinsicsJvm.kt
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {//回来Continuation<Unit>,它便是协程的载体
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        //假如调用者是 `BaseContinuationImpl` 或许其子类
        create(receiver, probeCompletion)
    else {
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        }
    }
}

要点创立并回来一个未阻拦的Continuation,它便是协程的载体。(这是Continuation三层包装的第二层包装)(suspend () -> T) 函数会被传给该实例作为协程的实践履行体。这个 Continuation 封装了协程的代码运转逻辑和康复接口,下面会讲到。

由于this便是(suspend () -> T)SuspendLambda 又是 BaseContinuationImpl 的完结类,则履行 create() 办法创立协程载体:

abstract class BaseContinuationImpl {
    //···
    public open fun create(completion: Continuation<*>): Continuation<Unit> {
        throw UnsupportedOperationException("create(Continuation) has not been overridden")
    }
    public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
        throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
    }
    //···
}

create()BaseContinuationImpl 类中的一个公开办法。那么是谁完结了这个办法呢? 看看 SuspendLambdaBaseContinuationImplContinuation 之间的联系讲解。

5.SuspendLambda及其父类

上面说到 suspend{} 便是 (suspend R.() -> T) ,它是协程实在需求履行的逻辑,传入的lambda表达式被编译成了承继 SuspendLambda 的子类,SuspendLambdaContinuation 的完结类。

internal abstract class SuspendLambda(
    public override val arity: Int,
    completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
    constructor(arity: Int) : this(arity, null)
    public override fun toString(): String =
        if (completion == null)
            Reflection.renderLambdaToString(this) //这是 lambda
        else
            super.toString() //这是 continuation
}

SuspendLambda 承继自 ContinuationImpl

internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
) : BaseContinuationImpl(completion) {
    private var intercepted: Continuation<Any?>? = null
    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
    protected override fun releaseIntercepted() { ···  }
}

ContinuationImpl又承继自BaseContinuationImplSuspendLambda 的 resume() 办法的具体完结为 BaseContinuationImpl 的 resumeWith() 办法:

internal abstract class BaseContinuationImpl(
    //每个BaseContinuationImpl实例都会引证一个完结Continuation,用来在当时状况机流通完毕时康复这个Continuation
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    //resumeWith() 中经过循环由里到外康复Continuation
    public final override fun resumeWith(result: Result<Any?>) {
        var current = this
        var param = result
        while (true) {
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! //completion为null则会抛出反常
                val outcome: Result<Any?> =
                    try {
                        // 1.调用 invokeSuspend 办法履行,履行协程的实在运算逻辑
                        val outcome = invokeSuspend(param)
                        // 2.假如现已挂起则提前完毕
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                //当invokeSuspend办法没有回来COROUTINE_SUSPENDED,那么当时状况机流通完毕,即当时suspend办法履行完毕,释放阻拦
                releaseIntercepted()
                if (completion is BaseContinuationImpl) {
                    //3.假如 completion 是 BaseContinuationImpl,内部还有suspend办法,则会进入循环递归
                    current = completion
                    param = outcome
                } else {
                    //4.不然是最顶层的completion,则会调用resumeWith康复上一层而且return
                    // 这儿实践调用的是其父类 AbstractCoroutine 的 resumeWith 办法
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
    //·····
}

这儿首要做了四件事:

  1. 调用 invokeSuspend 办法,履行协程的实在运算逻辑,并回来一个成果;
  2. 假如 outcomeCOROUTINE_SUSPENDED ,代码块里边履行了挂起办法,则持续挂起;
  3. 假如 completionBaseContinuationImpl,内部还有suspend办法,则会进入循环递归,持续履行挂起;
  4. 假如 completion 不是 BaseContinuationImpl,则实践调用父类 AbstractCoroutineresumeWith 办法。

接下来再来看 AbstractCoroutineresumeWith 完结:

public abstract class AbstractCoroutine<in T>(
    protected val parentContext: CoroutineContext,
    active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
    //以指定的成果完结履行协程
    public final override fun resumeWith(result: Result<T>) {
        // 1. 获取当时协程的技术状况
        val state = makeCompletingOnce(result.toState())
        // 2. 假如当时还在等候完结,阐明还有子协程没有完毕
        if (state === COMPLETING_WAITING_CHILDREN) return
        // 3. 履行完毕康复的办法,默许为空
        afterResume(state)
    }
    //···
}

其间一类 completionBaseContinuationImpl,每个实例就代表一个suspend办法状况机。resumeWith()封装了协程的运算逻辑,用以协程的发动和康复;而另一类 completionAbstractCoroutine,首要是担任保护协程的状况和办理,它的resumeWith则是完结协程,康复调用者协程。

其承继联系为: SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation

suspendLambda承继联系.png
因此 create() 办法创立的 Continuation 是一个 SuspendLambda 目标。
回到上面的 createCoroutineUnintercepted() 办法:

//IntrinsicsJvm.kt
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {//回来Continuation<Unit>,它便是协程的载体
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        //假如调用者是 `BaseContinuationImpl` 或许其子类
        create(receiver, probeCompletion)
    else {
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        }
    }
}

其实这段代码是在JVM平台中找到的,在 IntrinsicsJvm.kt 类中,可是在 Android 源码中是这姿态的:

fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<kotlin.Unit> { /* compiled code */ }

compiled code 就现已提示里边的代码是编译后的代码。

前面也说到了,在协程源码里边是看不到完整的协程原理的,有一部分代码是kotlin编译器处理的,所以在研讨协程的运转流程时,单单看kotlin的源码是看不出实质的,需求反编译成Java文件,看看反编译之后的代码被修正成什么姿态了。

6.Function 的创立

将协程模仿网络恳求的代码反编译:

fun getData() {
    lifecycleScope.launch {
        val result = requestUserInfo()
        tvName.text = result
    }
}

反编译后的代码如下(代码有删减),你会发现发生了巨大的改变,而这些作业都是kotlin编译器帮咱们完结的:

public final void getData() {
   BuildersKt.launch$default((CoroutineScope)LifecycleOwnerKt.getLifecycleScope(this), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
      int label; // 初始值为0
      @Nullable
      public final Object invokeSuspend(@NotNull Object $result) {
         //···
         Object var10000 = requestUserInfo(this); //履行挂起函数
         //···
         String result = (String)var10000;
         CoroutinesActivity.this.getTvName().setText((CharSequence)result);
         return Unit.INSTANCE;
      }
      @NotNull
      public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
         Intrinsics.checkParameterIsNotNull(completion, "completion");
         Function2 var3 = new <anonymous constructor>(completion);
         return var3;
      }
      public final Object invoke(Object var1, Object var2) {
         return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
      }
   }), 3, (Object)null);
}

lifecycleScope.launch {} 在反编译后增加了 CoroutineScopeCoroutineContextCoroutineStartFunction2,3,Object等参数,这些都是kotlin编译器帮咱们做了。这儿便是最顶层的completion处理协程挂起与康复的当地。 这儿一旦康复了,那么阐明整个协程康复了。

这儿创立了一个Function2,里边有三个重要的办法:

  • invokeSuspend(result):  里边履行luanch{}里边的代码,所以它履行了协程实在的运转逻辑;
  • create(value, completion):经过传递的completion参数创立一个 Function2 并回来,实践是一个Continuation
  • invoke(var1, var2):    复写了Funtion.invoke() 办法,经过传递过来的参数链式调用create().invokeSuspend()
  1. 从上面知道 (suspend R.() -> T)BaseContinuationImpl 的完结类,所以会走 onCreate() 办法创立 Continuation, 经过 completion 参数创立一个新的 Function2,作为Continuation回来,这便是创立出来的协程载体;

  2. 然后调用 resumeWith() 发动协程,那么就会履行BaseContinuationImplresumeWith() 办法,此刻就会履行 invokeSuspend() 办法,履行协程实在的运转逻辑

协程的创立流程如下:

协程的创立流程图.png

三、 协程的挂起与康复

协程作业的中心便是它内部的状况机,invokeSuspend() 函数。 requestUserInfo() 办法是一个挂起函数,这儿经过反编译它来论述协程状况机的原理,逆向分析协程的挂起和康复。

1.办法的挂起

//延时2000毫秒,回来一个String成果
suspend fun requestUserInfo(): String {
    delay(2000)
    return "result form userInfo"
}

反编译后的代码如下(代码有删减),相同发现发生了巨大的改变,而这些作业都是kotlin编译器帮咱们完结的:

//1.函数回来值由String变成Object,入参也增加了Continuation参数
public final Object requestUserInfo(@NotNull Continuation completion) {
   //2.经过completion创立一个ContinuationImpl,而且复写了invokeSuspend()
   Object continuation = new ContinuationImpl(completion) {
       Object result;
       int label; //初始值为0
       @Nullable
       public final Object invokeSuspend(@NotNull Object $result) {
          this.result = $result;
          this.label |= Integer.MIN_VALUE;
          return requestUserInfo(this);//又调用了requestUserInfo()办法
       }
    };
   Object $result = (continuation).result;
   Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
   //状况机  
   //3.办法被康复的时分又会走到这儿,第一次进入case 0分支,label的值从0变为1,第二次进入就会走case 1分支
   switch(continuation.label) {
       case 0:
          ResultKt.throwOnFailure($result);
          continuation.label = 1;
          //4.delay()办法被suspend润饰,传入一个continuation回调,回来一个object成果。这个成果要么是`COROUTINE_SUSPENDED`,不然便是实在成果。
          Object delay = DelayKt.delay(2000L, continuation)
          if (delay == var4) {//假如是COROUTINE_SUSPENDED则直接return,就不会往下履行了,requestUserInfo()被暂停了。
             return var4;
          }
          break;
       case 1:
          ResultKt.throwOnFailure($result);
          break;
       default:
          throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
       }
   return "result form userInfo";
}

上面首要过程为:

  1. 函数回来值由 String 变成 Object,函数没有入参的编译后也增加了 Continuation 参数。本来需求咱们做的 callback,现在编译器帮咱们完结了。
  2. 依据 completion 创立了一个 ContinuationImpl ,复写了 invokeSuspend() 办法,在这个办法里边它又调用了 requestUserInfo() 办法,这儿又调用了一次自己(是不是很奇特),而且把 continuation 传递进去。
  3. 在 switch 句子中,label 的默许初始值为 0,第一次会进入 case 0 分支,delay() 是一个挂起函数,传入上面的 continuation 参数,会有一个 Object 类型的回来值。这个成果要么是COROUTINE_SUSPENDED,不然便是实在成果。
  4. DelayKt.delay(2000, continuation)的回来成果假如是 COROUTINE_SUSPENDED, 则直接 return ,那么办法履行就被完毕了,办法就被挂起了。

这便是挂起的实在原理。所以函数即使被 suspend 润饰了,可是也未必会挂起。需求里边的代码编译后有回来值为 COROUTINE_SUSPENDED 这样的符号位才干够,所以程序履行到 case 0 的时分就 return 了。那就意味着办法被暂停了,那么协程也被暂停了。所以说协成的挂起实践上是办法的挂起,办法的挂起实质是 return。

2.COROUTINE_SUSPENDED

Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
//履行已暂停,而且不会当即回来任何成果
public val COROUTINE_SUSPENDED: Any get() = CoroutineSingletons.COROUTINE_SUSPENDED
internal enum class CoroutineSingletons { COROUTINE_SUSPENDED, UNDECIDED, RESUMED }

var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED()中, COROUTINE_SUSPENDED 便是一个枚举常量,表明协程现已挂起,而且不会当即回来任何成果。那么 DelayKt.delay() 回来值是 COROUTINE_SUSPENDED 就被 return 了。

跟进 DelayKT 看看 COROUTINE_SUSPENDED 是如何被获取的:
image.png

找到 DelayKT 类(注意:不是Delay.kt哈,别搞错了),Decomplie to java反编译成java源码:

public final class DelayKt {
   //增加了Object回来值,而且追加了一个Continuation参数,
   @Nullable
   public static final Object delay(long timeMillis, @NotNull Continuation $completion) {
      if (timeMillis <= 0L) {
         return Unit.INSTANCE;
      } else {
         //···
         getDelay(cont.getContext()).scheduleResumeAfterDelay(timeMillis, cont);
         Object var10000 = cancellable$iv.getResult();
         //···
         return var10000;
      }
   }
}

可以看到 DelayKt.delay() 增加了 Object 回来值,而且追加了一个 completion 参数,这个回来值是 var10000 ,它是在 cancellable$iv.getResult() 得到的:

@PublishedApi
internal fun getResult(): Any? {
    setupCancellation()
    //测验挂起,假如回来TRUE则回来COROUTINE_SUSPENDED
    if (trySuspend()) return COROUTINE_SUSPENDED
    //···
    return getSuccessfulResult(state)
}

trySuspend() 测验把办法挂起,假如回来 true 则回来 COROUTINE_SUSPENDED:

private val _decision = atomic(UNDECIDED)
private fun trySuspend(): Boolean {
    //循环遍历里边的值,而_decision初始值为UNDECIDED,那么第一次肯定回来true
    _decision.loop { decision ->
        when (decision) {
            //回来true,而且把当时状况更改为SUSPENDED,代表它以经被挂起了
            UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
            RESUMED -> return false
            else -> error("Already suspended")
        }
    }
}

trySuspend() 里边循环遍历了 _decision 的值, _decision 初始值为 UNDECIDED,那么第一次会进入UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true分支,这儿就回来true了,而且把当时状况更改为 SUSPENDED,代表着它现已被挂起了。

//办法的状况只要在调用tryResume时才会把状况更改为RESUMED
private fun tryResume(): Boolean {
    _decision.loop { decision ->
        when (decision) {
            //回来true,而且把状况改为RESUMED
            UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
            SUSPENDED -> return false
            else -> error("Already resumed")
        }
    }
}

这个办法的状况会在它康复的时分调用 tryResume() 把状况更改为 RESUMED这便是决议计划状况机

image.png

那么 trySuspend() 回来 truegetResult() 回来了 COROUTINE_SUSPENDED 枚举常量,那么 DelayKt.delay() 就回来了 COROUTINE_SUSPENDED,所以下面的判别条件就会满足,会直接return。delay() 办法是一个实在在正的挂起函数,可以导致协程被暂停。

所以 requestUserInfo() 办法在 delay(2000) 被暂停了,在协程中调用,那么协程也就暂停了,后边的成果 result form userInfo 也没有被回来。所以这便是被 suspend 润饰的函数不一定能导致协程被挂起,还需求里边的完结经过编译之后有回来值而且为 COROUTINE_SUSPENDED 才干够。

3.办法的康复

持续回到 requestUserInfo() 分析康复的原理:

@Nullable
public final Object requestUserInfo(@NotNull Continuation completion) {
   //···
   Object continuation = new ContinuationImpl(completion) {
       Object result;
       int label; //初始值为0
       @Nullable
       public final Object invokeSuspend(@NotNull Object $result) {
          this.result = $result;
          this.label |= Integer.MIN_VALUE;
          return requestUserInfo(this);//又调用了requestUserInfo()办法
       }
    };
   Object $result = (continuation).result;
   Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
   //办法被康复的时分又会走到这儿,第一次进入case 0,第二次 ontinuation.label = 1,所以能持续履行下去回来成果
   switch(continuation.label) {
       case 0:
          ResultKt.throwOnFailure($result);
          continuation.label = 1;
          //delay()办法被suspend润饰,传入一个continuation回调,回来一个object成果。这个成果要么是`COROUTINE_SUSPENDED`,不然便是实在成果。
          Object delay = DelayKt.delay(2000L, continuation)
          if (delay == var4) {//假如是COROUTINE_SUSPENDED则直接return,就不会往下履行了。
             return var4;
          }
          break;
       case 1:
          ResultKt.throwOnFailure($result);
          break;
       default:
          throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
       }
   return "result form userInfo";
}
  1. 由于 delay()io 操作,在2000毫米后就会经过传递给它的 continuation 回调回来。
  2. 回调到 ContinuationImpl 这个类里边的 resumeWith() 办法,会再次调用 invokeSuspend() 办法,从而再次调用 requestUserInfo() 办法。
  3. 它又会进入switch句子,由于第一次在 case 0 时把 label = 1 赋值为1,所以这次会进入 case 1 分支,而且回来了成果result form userInfo
  4. 而且 requestUserInfo() 的回来值作为 invokeSuspend() 的回来值回来。从头被履行的时分就代表着办法被康复了

那么 invokeSuspend() 办法是怎样被触发回调的呢?它拿到回来值有什么用呢?

上面说到 ContinuationImpl 承继自 BaseContinuationImpl,而它又完结了 continuation 接口而且复写了 resumeWith() 办法,里边就调用了 val outcome = invokeSuspend(param) 办法。(源码有删减)

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    //这个完结是终究的,用于展开 resumeWith 递归。
    public final override fun resumeWith(result: Result<Any?>) {
        var current = this
        var param = result
        while (true) {
            with(current) {
                val completion = completion!!
                val outcome: Result<Any?> =
                    try {
                        // 1.调用 invokeSuspend()办法履行,履行协程的实在运算逻辑,拿到回来值
                        val outcome = invokeSuspend(param)
                        // 2.假如回来的还是COROUTINE_SUSPENDED则提前完毕
                        if (outcome == COROUTINE_SUSPENDED) return 
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                if (completion is BaseContinuationImpl) {
                    //3.假如 completion 是 BaseContinuationImpl,内部还有suspend办法,则会进入循环递归,持续履行和康复
                    current = completion
                    param = outcome
                } else {
                    //4.不然是最顶层的completion,则会调用resumeWith康复上一层而且return
                    // 这儿实践调用的是其父类 AbstractCoroutine 的 resumeWith 办法
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

实践上任何一个挂起函数它在康复的时分都会调到 BaseContinuationImplresumeWith() 办法里边。

  1. 一但 invokeSuspend() 办法被履行,那么 requestUserInfo() 又会再次被调用, invokeSuspend() 就会拿到 requestUserInfo() 的回来值,在 ContinuationImpl 里边依据 val outcome = invokeSuspend() 的回来值来判别咱们的 requestUserInfo() 办法康复了之后的操作。

  2. 假如 outcomeCOROUTINE_SUSPENDED 常量,阐明你即使被康复了,履行了一下, if (outcome == COROUTINE_SUSPENDED) return 可是立马又被挂起了,所以又 return 了。

  3. 假如本次康复 outcome 是一个正常的成果,就会走到 completion.resumeWith(outcome),当时被挂起的办法现已被履行完了,实践调用的是其父类 AbstractCoroutineresumeWith 办法,那么协程就康复了。

咱们知道 requestUserInfo() 肯定是会被协程调用的(从上面反编译代码知道会传递一个Continuation completion参数),requestUserInfo() 办法康复完了就会让协程completion.resumeWith()去康复,所以说协程的康复实质上是办法的康复。

这是在android studio傍边经过反编译kotlin源码来分析协程挂起与康复的流程。流程图如下:

协程挂起与康复.png

4.在协程中运转的挂起与康复

那么 requestUserInfo() 办法在协程里边履行的整个挂起和康复流程是怎样样的呢?

fun getData() {
    lifecycleScope.launch {
        val result = requestUserInfo()
        tvName.text = result
    }
}
//模仿恳求,2秒后回来数据
suspend fun requestUserInfo(): String {
    delay(2000)
    return "result form userInfo"
}

反编译代码:

public final void getData() {
   BuildersKt.launch$default((CoroutineScope)LifecycleOwnerKt.getLifecycleScope(this), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
      int label; // 初始值为0
      @Nullable
      public final Object invokeSuspend(@NotNull Object $result) {
         Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); //挂起状况
         Object var10000;
         //状况机
         switch(this.label) {
         case 0:
            ResultKt.throwOnFailure($result);
            this.label = 1; //修正label
            var10000 = requestUserInfo(this); //履行挂起函数
            if (var10000 == var3) {
               return var3; //假如var10000是COROUTINE_SUSPENDED则直接挂起协程
            }
            break;
         case 1:
            ResultKt.throwOnFailure($result);
            var10000 = $result; //回来实践成果
            break;
           //···
         }
         String result = (String)var10000;
         CoroutinesActivity.this.getTvName().setText((CharSequence)result);
         return Unit.INSTANCE;
      }
      @NotNull
      public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
         Intrinsics.checkParameterIsNotNull(completion, "completion");
         Function2 var3 = new <anonymous constructor>(completion);
         return var3;
      }
      public final Object invoke(Object var1, Object var2) {
         return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
      }
   }), 3, (Object)null);
}
@Nullable
public final Object requestUserInfo(@NotNull Continuation completion) {
   //···
   Object continuation = new ContinuationImpl(completion) {
       Object result;
       int label; //初始值为0
       @Nullable
       public final Object invokeSuspend(@NotNull Object $result) {
          this.result = $result;
          this.label |= Integer.MIN_VALUE;
          return requestUserInfo(this); //又调用了requestUserInfo()办法
       }
    };
   Object $result = (continuation).result;
   Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
   //状况机
   //办法被康复的时分又会走到这儿,第一次进入case 0,第二次 ontinuation.label = 1,所以就打印了日志输出
   switch(label) {
       case 0:
          ResultKt.throwOnFailure($result);
          continuation.label = 1;
          //delay()办法被suspend润饰,传入一个continuation回调,回来一个object成果。这个成果要么是`COROUTINE_SUSPENDED`,不然便是实在成果。
          Object delay = DelayKt.delay(2000L, continuation)
          if (delay == var4) {//假如是COROUTINE_SUSPENDED则直接return,就不会往下履行了,requestUserInfo()被暂停了。
             return var4;
          }
          break;
       case 1:
          ResultKt.throwOnFailure($result);
          break;
       default:
          throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
       }
   return "result form userInfo";
}
  1. 可以看到协程里边反编译后的代码和 requestUserInfo() 办法反编译后的代码相似,Function2 里边也复写了invokeSuspend() 办法,状况机也相似,
  2. case 0 处判别 requestUserInfo() 回来值是否为COROUTINE_SUSPENDED, 假如是则挂起协程。咱们在上面分析知道, requestUserInfo() 第一次回来的值是COROUTINE_SUSPENDED,所以 requestUserInfo() 被挂起了,协程也被挂起了。所以说协程的挂起实践上是办法的挂起。
  3. 协程康复的原理也和 requestUserInfo() 康复的原理大致相同。在调用 requestUserInfo(this) 的时分把 Continuation 传递了进去。
  4. 那么 requestUserInfo() 函数2000毫秒后在康复时将成果经过 invokeSuspend() 回调给上一层 completionresumeWith() 里边,那么协程的 invokeSuspend(result) 便是被回调。
  5. 经过状况机流通履行之前挂起逻辑之后的代码。此刻 lable = 1 进入 case 1 赋值给 var10000,然后履行剩余的代码。 所以 requestUserInfo() 办法康复后,调用它的协程也跟着康复了,所以说协程的康复实质上是办法的康复。

协程的挂起与康复流程简图.png

四、协程的调度

1.协程阻拦

协程的线程调度是经过阻拦器完结的,回到前面的 startCoroutineCancellable:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, 
completion: Continuation<T>) =
    runSafely(completion) {
        // 创立一个没有被阻拦的 Coroutine
        createCoroutineUnintercepted(receiver, completion)
            // 增加阻拦器
            .intercepted()
            // 履行协程 
            .resumeCancellableWith(Result.success(Unit))
    }

看看 intercepted() 的具体完结:

//运用[ContinuationInterceptor]阻拦Continuation
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
   (this as? ContinuationImpl)?.intercepted() ?: this

阻拦器在每次(康复)履行协程体的时分都会阻拦协程本体SuspendLambdainterceptContinuation()办法中阻拦了一个Continuation<T>而且再回来一个Continuation<T>,阻拦到Continuation后就可以做一些作业,比如线程切换等。

internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    @Transient
    private var intercepted: Continuation<Any?>? = null // 阻拦到的Continuation
    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
    //······
}

interceptContinuation() 办法的完结是在 CoroutineDispatcher 中,它是一切协程调度程序完结扩展的基类:

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    //是否需求调度
    public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
    //将可运转块的履行分派到给定上下文中的另一个线程上
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)
    //经过DispatchedContinuation回来包装原始continuation的 continuation
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
}

注意:[block] 是一个 Runnable 类型。

假如传递了协程调度器,那么协程中的闭包代码块就决议了所运转的线程环境,CoroutineDispatcher 有三个重要的办法:

  • isDispatchNeeded():协程的发动需不需求分发到其他线程上面去。
  • dispatch():      将可运转块的履行分派到给定上下文中的另一个线程上,由子类去完结具体的调度。
  • interceptContinuation:阻拦协程本体,包装成一个 DispatchedContinuation

阻拦协程本体,包装成一个 DispatchedContinuation,它在履行使命的时分会经过 needDispatch() 来判别本次协程发动需不需求分发到其他线程上面,假如回来了true,那么就会调用子类的 dispatch(runnable) 办法,来完结协程的本次发动作业,假如回来false,就会由 CoroutineDispatcher 在当时线程马上履行。

2.协程分发

在截获的 Continuation 上调用resume(Unit) 保证协程和完结的履行都发生在由 ContinuationInterceptor 建立的调用上下文中。而阻拦后的 continuationDispatchedContinuation包装了一层:(这是Continuation三层包装的第三层包装)

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
    //·····
    override val delegate: Continuation<T>
        get() = this
    inline fun resumeCancellable(value: T) {
        // 是否需求线程调度
        if (dispatcher.isDispatchNeeded(context)) {
            // 将协程的运算分发到另一个线程
            dispatcher.dispatch(context, this)
        } else {
            // 假如不需求调度,当即康复在在当时线程履行协程运算
            withCoroutineContext(this.context, countOrElement) {
                continuation.resumeWith(result)
            }
        }
    }
    override fun resumeWith(result: Result<T>) {
        // 是否需求线程调度
        if (dispatcher.isDispatchNeeded(context)) {
            // 将协程的运算分发到另一个线程
            dispatcher.dispatch(context, this)
        } else {
            // 假如不需求调度,当即康复在当时线程履行协程运算
            continuation.resumeWith(result)
        }
    }
//·····
}

DispatchedContinuation阻拦了协程的发动和康复,分别是resumeCancellable(Unit)和重写的resumeWith(Result)

当需求分发时,就调用 dispatcher 的 dispatch(context, this) 办法,this是一个 DispatchedTask

internal abstract class DispatchedTask<in T>(
    @JvmField public var resumeMode: Int
) : SchedulerTask() {
    public final override fun run() {
        //·····
        withContinuationContext(continuation, delegate.countOrElement) {
            //康复协程履行 终究调用resumeWith
           continuation.resume(getSuccessfulResult(state))
       }
     //·····
    }
}

DispatchedTask 实践上是一个Runnable

  1. 当需求线程调度时,则在调度后会调用 DispatchedContinuation.continuation.resumeWith() 来发动协程,其间 continuationSuspendLambda 实例;
  2. 当不需求线程调度时,则直接调用 continuation.resumeWith() 来直接发动协程。

也便是说对创立的 ContinuationresumeWith() 增加阻拦操作,阻拦协程的运转操作:

协程的分发机制.png
分别分析一下四种调度形式的具体完结:

3.Dispatchers.Unconfined

public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
internal object Unconfined : CoroutineDispatcher() {
    //回来false, 不阻拦协程
    override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        //····
    }
}

Dispatchers.Unconfined:对应的是Unconfined,它里边的isDispatchNeeded()回来的是false,不限于任何特定线程的协程调度程序。那么它的父类ContinuationInterceptor就不会把本次使命的调度交给子类来履行,而是由父类在当时线程马上履行。

4.Dispatchers.Main

Dispatchers.Main 承继自 MainCoroutineDispatcher 经过 MainDispatcherLoader.dispatcher 完结调度器:

public actual object Dispatchers {
    @JvmStatic
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
}

MainDispatcherLoader 经过工厂形式创立 MainCoroutineDispatcher

internal object MainDispatcherLoader {
    @JvmField
    val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
    private fun loadMainDispatcher(): MainCoroutineDispatcher {
        //···
        val factories = FastServiceLoader.loadMainDispatcherFactory()
        //···
        return factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories)
    }
}
public fun tryCreateDispatcher(factories: List<MainDispatcherFactory>): MainCoroutineDispatcher =
    try {
        createDispatcher(factories)
    } catch (cause: Throwable) {
        //假如出现反常则创立一个MissingDispatcher
        createMissingDispatcher(cause, hintOnError())
    }

MainDispatcherFactory 是一个接口,经过完结类来创立Dispatcher

public interface MainDispatcherFactory {
    //子类完结
    public fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher
}
internal class AndroidDispatcherFactory : MainDispatcherFactory {
    //由AndroidDispatcherFactory创立HandlerContext,可以看到它是一个主线程调度器
    override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
        HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")
    //···
}

咱们看到了AndroidDispatcherFactory, Looper.getMainLooper(),Main 等关键字,毫无疑问这便是主线程调度器:

internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    //···
    override val immediate: HandlerContext = _immediate ?:
        HandlerContext(handler, name, true).also { _immediate = it }
    //invokeImmediately默许为false, Looper.myLooper() != handler.looper判别当时线程looper
    override fun isDispatchNeeded(context: CoroutineContext): Boolean {
        return !invokeImmediately || Looper.myLooper() != handler.looper
    }
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        handler.post(block)
    }
    //···
}

它们三者的承继联系:
HandlerContext->HandlerDispatcher->MainCoroutineDispatcher()->CoroutineDispatcher

它里边的isDispatchNeeded()回来的是true,当协程发动的时分则由HandlerContext来分发,而它里边的分发作业是经过 handler.post(runnable) 分发给主线程来完结的。在康复的时分也是经过Dispatchers.Main这个调度器来康复。当完结使命之后就会经过HandlerDispatcher把协程中的代码再次切换到主线程。

5.Dispatchers.IO

public actual object Dispatchers {
    @JvmStatic
    public val IO: CoroutineDispatcher = DefaultScheduler.IO
}

DefaultScheduler协程调度器的默许调度器,是一个线程调度器,履行堵塞使命,此调度程序与Dispatcher.Default调度程序同享线程。

internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
    val IO: CoroutineDispatcher = LimitingDispatcher(
        this,
        systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
        "Dispatchers.IO",
        TASK_PROBABLY_BLOCKING
    )
    //···
}
private class LimitingDispatcher(
    private val dispatcher: ExperimentalCoroutineDispatcher,
    //···
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {
    override val executor: Executor
        get() = this
    override fun execute(command: Runnable) = dispatch(command, false)
    override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
    private fun dispatch(block: Runnable, tailDispatch: Boolean) {
        var taskToSchedule = block
        while (true) {
            //没有超过限制,当即分发使命
            if (inFlight <= parallelism) {
                dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
                return
            }
            //使命超过限制,则加入等候行列
            queue.add(taskToSchedule)
            //···
        }
    }
}

dispatcher.dispatchWithContext()当即分发使命由ExperimentalCoroutineDispatcher完结:

public open class ExperimentalCoroutineDispatcher(
    private val corePoolSize: Int,
    private val maxPoolSize: Int,
    //···
) : ExecutorCoroutineDispatcher() {
    override val executor: Executor
        get() = coroutineScheduler
    private var coroutineScheduler = createScheduler()
    //分发
    override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
    //···
    internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
       coroutineScheduler.dispatch(block, context, tailDispatch)  
    }
    //创立调度器
    private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
    //···
}

ExperimentalCoroutineDispatcher将使命分发到coroutineScheduler.dispatch() 完结。
CoroutineScheduler 便是一个线程池Executor。

//协程调度器的首要方针是在作业线程上分配调度的协程,包含 CPU 密集型使命和堵塞使命。
internal class CoroutineScheduler(
    val corePoolSize: Int,
    val maxPoolSize: Int,
    //···
) : Executor, Closeable {
    override fun execute(command: Runnable) = dispatch(command)
    //调度可运转块的履行,并提示调度程序该块是否可以履行堵塞操作(IO、体系调用、确定原语等)
    fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
        //1.构建Task,Task完结了Runnable接口
        val task = createTask(block, taskContext)
        //2.取当时线程转为Worker目标,Worker是一个承继自Thread的类,循环履行使命
        val currentWorker = currentWorker()
        //3.增加使命到本地行列
        val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
        if (notAdded != null) {
            //4.notAdded不为null,则再将notAdded(Task)增加到大局行列中
            if (!addToGlobalQueue(notAdded)) {
                throw RejectedExecutionException("$schedulerName was terminated")
            }
        }
        if (task.mode == TASK_NON_BLOCKING) {
            if (skipUnpark) return
            //5.创立Worker并开端履行该线程
            signalCpuWork()
        } else {
            // Increment blocking tasks anyway
            signalBlockingWork(skipUnpark = skipUnpark)
        }
    }
}

上面代码首要做了以下几件事:

  1. 首先是经过Runnable构建了一个Task,这个Task其实也是完结了Runnable接口;
  2. 将当时线程取出来转换成Worker,这个Worker是承继自Thread的一个类;
  3. task提交到本地行列中;
  4. 假如task提交到本地行列的过程中没有成功,那么会增加到大局行列中;
  5. 创立Worker线程,并开端履行使命。
class Worker private constructor() : Thread() {
    //woeker行列
    val localQueue: WorkQueue = WorkQueue()
    override fun run() = runWorker()
    private fun runWorker() {
        var rescanned = false
        while (!isTerminated && state != WorkerState.TERMINATED) {
            val task = findTask(mayHaveLocalTasks)
            //找到使命,履行使命,重复循环
            if (task != null) {
                executeTask(task)
                continue
            } else {
                mayHaveLocalTasks = false
            }
        }
    }
    private fun executeTask(task: Task) {
        val taskMode = task.mode
        idleReset(taskMode)
        beforeTask(taskMode)
        runSafely(task)
        afterTask(taskMode)
    }
}
//履行使命
fun runSafely(task: Task) {
    try {
        task.run()
    }
    //···
}

run办法直接调用的runWorker(),在里边是一个while循环,不断从行列中取Task来履行,调用task.run()

  1. 从本地行列或许大局行列中取出Task
  2. 履行这个task,终究其实便是调用这个Runnable的run办法。

也便是说,在Worker这个线程中,履行了这个Runnable的run办法。还记得这个Runnable是谁么?它便是上面咱们看过的DispatchedTask,这儿的run办法履行的便是协程使命,那这块具体的run办法的完结逻辑,咱们应该到DispatchedTask中去找。

internal abstract class DispatchedTask<in T>(
    @JvmField public var resumeMode: Int
) : SchedulerTask() {
    public final override fun run() {
        //·····
        withContinuationContext(continuation, delegate.countOrElement) {
            //康复协程履行,终究调用resumeWith
           continuation.resume(getSuccessfulResult(state))
       }
     //·····
    }
}

run办法履行continuation.resume康复协程履行。最终经过executor.execute()发动线程池。

internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        try {
            executor.execute(wrapTask(block))
        } catch (e: RejectedExecutionException) {
            unTrackTask()
            DefaultExecutor.enqueue(block)
        }
    }
}

6.Dispatchers.Default

假如不指定调度器,则会默许 DefaultScheduler,它实践和Dispatchers.IO是同一个线程调度器,这个是线程调度器:

public actual object Dispatchers {
    public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
}
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
    if (useCoroutinesScheduler) DefaultScheduler else CommonPool

假如指定了调度器则运用 CommonPool,表明同享线程的公共池作为核算密集型使命的协程调度程序。

internal object CommonPool : ExecutorCoroutineDispatcher() {
    override val executor: Executor
        get() = pool ?: getOrCreatePoolSync()
    //履行使命
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        (pool ?: getOrCreatePoolSync()).execute(wrapTask(block))
    }
    //创立一个固定大小的线程池
    private fun createPlainPool(): ExecutorService {
        val threadId = AtomicInteger()
        return Executors.newFixedThreadPool(parallelism) {
                Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
    }
}

CommonPool中也是创立了一个固定大小的线程池,dispatch()经过execute()履行协程使命。

协程线程调度.png

总结如下:

类型 调度器完结类 阐明
Dispatchers.Main HandlerContext 它里边的isDispatchNeeded()回来的是true,当协程发动的时分则由HandlerDispatcher来分发,而它里边的分发作业是经过 handler.post(runnable) 来完结的。
Dispatchers.IO DefaultScheduler 它是线程调度器,它里边的isDispatchNeeded()回来的是true,而它调度使命的时分是经过 executors.execute(runnable) 来履行runnable使命。也便是把协程中的代码块运转到IO线程。
Dispatchers.Default DefaultSchedulerCommonPool 假如不指定调度器,则会默许DefaultScheduler,它实践和Dispatchers.IO是同一个线程调度器;假如指定调度器,则是CommonPool同享线程池。isDispatchNeeded()都是true,经过 executors.execute(runnable) 来履行runnable使命。
Dispatchers.Unconfined Unconfined 它里边的isDispatchNeeded()回来的是false,那么它的父类ContinuationInterceptor就不会把本次使命的调度交给子类来履行,而是由父类在当时线程马上履行。

五、总结

1.协程的三层包装

经过一步步的分析,慢慢发现协程其实有三层包装:

  • 常用的launchasync回来的JobDeferred,里边封装了协程状况,供给了撤销协程接口,而它们的实例都是承继自AbstractCoroutine,它是协程的第一层包装。

  • 第二层包装是编译器生成的 SuspendLambda 的子类,封装了协程的实在运算逻辑,承继自BaseContinuationImpl,包含了第一层包装,其间completion便是协程的第一层包装。

  • 第三层包装是协程的线程调度时的DispatchedContinuation,封装了线程调度逻辑,包含了协程的第二层包装。

三层包装都完结了Continuation接口,经过署理形式将协程的各层包装组合在一同,每层担任不同的功用。

image.png

2.协程的挂起与康复原理

  1. 在研讨协程原理时需求反编译成Java文件,才干看到实质之处。由于有一部分代码是kotlin编译器生成的,在协程源码里是看不出来的。

  2. 每个挂起点对应于一个case分支(状况机),每调用一次 label 加1;label 的默许初始值为 0,第一次会进入 case 0 分支,挂起函数在回来 COROUTINE_SUSPENDED 时直接 return ,那么办法履行就被完毕了,办法就被挂起了。

  3. 协程体内的代码都是经过 continuation.resumeWith() 调用;获取到实在成果后,回调到 ContinuationImpl 这个类里边的 resumeWith() 办法,会再次调用 invokeSuspend(result) 办法,进入状况机case分支,回来实在成果,办法康复后,接着康复协程。

  4. 所以说,协程的挂起实质上是办法的挂起,而办法的挂起实质上是 return,协程的康复实质上办法的康复,而康复的实质是 callback 回调。

3.协程的调度原理

  • 阻拦器在每次(康复)履行协程体的时分都会阻拦协程本体SuspendLambda,然后会经过协程分发器的 interceptContinuation() 办法阻拦了一个Continuation<T>而且再回来一个Continuation<T>
  • 把阻拦的代码块封装为使命 DispatchedContinuation ,会经过 CoroutineDispatcherneedDispatch() 来判别需不需求分发,由子类的 dispatch(runnable) 办法来完结协程的本次调度作业。

4.协程面试常见问题

  • 面试官:什么是协程?

协程是一种解决方案,是一种解决嵌套,并发,弱化线程概念的方案。能让多个使命之间更好协作,可以以同步的办法完结异步作业,将异步代码像同步代码相同直观。

  • 面试官:协程与线程有什么区别?

协程就像轻量级的线程,协程是依赖于线程,一个线程中可以创立多个协程。协程挂起时不会堵塞线程。线程进程都是同步机制,而协程则是异步。

  • 面试官:协程的调度原理

依据创立协程指定调度器HandlerContext,DefaultScheduler,UnconfinedDispatcher来履行使命,以解决协程中的代码运转在那个线程上。HandlerContext经过handler.post(runnable)分发到主线程,DefaultScheduler实质是经过excutor.excute(runnable)分发到IO线程。

  • 面试官:协程是线程结构吗?

协程的实质是编译时return+callback,只不过在调度使命时供给了可以运转在IO线程的调度器和主线程的调度器。把协程称为线程结构不行精确。

  • 面试官:什么时分运用协程?

多使命并发流程操控场景,流程操控比较简单,不会涉及线程堵塞和唤醒,性能比Java并发操控手段高。

点重视,不迷路


好了各位,以上便是这篇文章的全部内容了,很感谢您阅览这篇文章。我是suming,感谢支撑和认可,您的点赞便是我创作的最大动力。山水有相逢,咱们下篇文章见!

本人水平有限,文章不免会有错误,请批评指正,不胜感激 !

Kotlin协程学习三部曲:

  • 《Kotlin 协程实战进阶(一、筑基篇)》
  • 《Kotlin 协程实战进阶(二、进阶篇)》
  • 《Kotlin 协程实战进阶(三、原理篇)》

参阅链接:

  • Kotlin官网
  • 《深化了解Kotlin协程》
  • 慕课网之《新版Kotlin从入门到精通》
  • 慕课网之《大白话分析Kotlin协程机制》
  • Kotlin Coroutines(协程) 彻底解析(二),深化了解协程的挂起、康复与调度》

期望咱们能成为朋友,在 Github、 上一同分享知识,一同共勉!Keep Moving!