拜读

# Kotlin协程之再次读懂协程工作原理-作者:苍耳叔叔

示例

协程中suspend的挂机机制使得异步wait\notify变成一如同步的丝滑体验,而关于异步咱们通常的做法是监听+回调的CPS方案,但假如嵌套得过深,会形成回调地域和栈帧溢出。那协程的的挂起究竟是怎么做的呢

Continuation Passing Style(续体传递风格): 约定一种编程标准,函数不直接回来结果值,而是在函数最终一个参数方位传入一个 callback 函数参数,并在函数履行完成时经过 callback 来处理结果。

fun getId(): Int {
    println("getId")
    return 1
}
suspend fun getName(id: Int) {
    withContext(Dispatchers.Default) { println("getName by id $id") }
}
suspend fun getAge(id: Int) {
    withContext(Dispatchers.Default) { println("getAge by id $id") }
}
fun main() {
    runBlocking {
        val id = getId()
        delay(300L)
        getName(id)
        getAge(id)
        /*suspendCancellableCoroutine<Unit> { continuation ->
            // 这是另一个检验,可先疏忽
            println("???")
            continuation.resume(Unit)
        }*/
        println("already stop")
    }
}
/* console输出
    getId
    getName by id 1
    getAge by id 1
    already stop
    */

协程的启动

launch办法开始,咱们以默许调用链深入,找到了该办法startCoroutineCancellable,其他withContextasync也都迥然不同,可自行发散探究

// 示例
GlobalScope.launch {
}
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        // 简单看一下这个类,: JobSupport(active), Job, Continuation<T>, CoroutineScope
        // 完成了Continuation接口,其主要办法resumeWith
        // 前面咱们讲了CPS的界说,这儿留个心眼,也便是意味着coroutine可能会作为续体,且resumeWith极有可能是唤起的回调
        StandaloneCoroutine(newContext, active = true)
    // 留意这儿,它把自己传进去了
    coroutine.start(start, coroutine, block)
    return coroutine
}
// 这是上面 coroutine.start(start, coroutine, block)的调用
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
    // 这儿调用了CoroutineStart.invoke,需要留意
    // 然后方才讲了receiver是它coroutine,这回又传了个this,这儿参数类型是Continuation
    start(block, receiver, this)
}
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
    when (this) {
        // 得,将coroutine看作两种类型,持续递传。但这儿是协程体调用的办法
        DEFAULT -> block.startCoroutineCancellable(receiver, completion)
        ATOMIC -> block.startCoroutine(receiver, completion)
        UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
        LAZY -> Unit // will start lazily
    }

现在看到了协程预备启动的地方了,先进行了create创立、intercepted线程调度(不打开)、resumeCancellableWith运转。但牢记,此时调用办法的是协程体

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    }
// 留意看回来值,也便是说又有一个续体出来了
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        // 会走这个,至于为什么会走,等后边剖析解码的时分就知道承继关系了
        // 这儿传入了续体(协程),需留心
        create(receiver, probeCompletion)
    else {
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        }
    }
}
// 最终都会走resumeWith,留意是对续体接口的扩展
public fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)
}

从上咱们得知协程内部的履行依赖于Continuation续体接口,那这段代码中有个疑问create回来的新的续体在哪呢,接着看下去

解码

以下内容源于GlobalScope.launch解码成java文件后的内容,咱们发现了create函数,但其应该是BaseContinuationImpl办法,这儿分明实例化的是Function2

BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
   int label;
   @Nullable
   public final Object invokeSuspend(@NotNull Object var1) {
      Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      switch(this.label) {
      case 0:
         ResultKt.throwOnFailure(var1);
         return Unit.INSTANCE;
      default:
         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      }
   }
   @NotNull
   public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
      Intrinsics.checkNotNullParameter(completion, "completion");
      Function2 var3 = new <anonymous constructor>(completion);
      return var3;
   }
   public final Object invoke(Object var1, Object var2) {
      return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
   }
}), 3, (Object)null);

经过AndroidStudio自带的Kotlin Bytecode检查字节码得知,这个多出来的东西PersonTestKt$main$2承继于SuspendLambda,完成了Function2接口罢了,而追踪承继咱们得到:SuspendLambda>>>ContinuationImpl>>>BaseContinuationImpl

L1
    LINENUMBER 33 L1
    GETSTATIC kotlinx/coroutines/GlobalScope.INSTANCE : Lkotlinx/coroutines/GlobalScope;
    CHECKCAST kotlinx/coroutines/CoroutineScope
    ACONST_NULL
    ACONST_NULL
    NEW com/wjf/self_demo/PersonTestKt$main$2
    DUP
    ACONST_NULL
    INVOKESPECIAL com/wjf/self_demo/PersonTestKt$main$2.<init> (Lkotlin/coroutines/Continuation;)V
    CHECKCAST kotlin/jvm/functions/Function2
    ICONST_3
    ACONST_NULL
    INVOKESTATIC kotlinx/coroutines/BuildersKt.launch$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/Job;
    POP
L2
final class com/wjf/self_demo/PersonTestKt$main$2 extends kotlin/coroutines/jvm/internal/SuspendLambda implements kotlin/jvm/functions/Function2 {

原来协程体本身完成了续体接口(且承继于BaseContinuationImpl),而协程本身也是个续体(但承继于AbstractCoroutine是协程,但完成了续体接口),并且协程体在create时,是把外部续体传入了的并持有了的,在类BaseContinuationImpl中即成员变量completion。从解码的内容来看,kotlin关于suspend关键词是进行了字节码增强的,均会被转化为SuspendLambda。那么剩余resume启动了,结合上面解码出来的.java看,先调用invokeSuspend完成其内部逻辑,完成后会调用parent也便是外部续体(协程)的resumeWith进行通知

// BaseContinuationImpl
public final override fun resumeWith(result: Result<Any?>) {
    // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
    var current = this
    var param = result
    while (true) {
        // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
        // can precisely track what part of suspended callstack was already resumed
        probeCoroutineResumed(current)
        with(current) {
            val completion = completion!! // fail fast when trying to resume continuation without completion
            val outcome: Result<Any?> =
                try {
                    val outcome = invokeSuspend(param)
                    if (outcome === COROUTINE_SUSPENDED) return
                    Result.success(outcome)
                } catch (exception: Throwable) {
                    Result.failure(exception)
                }
            releaseIntercepted() // this state machine instance is terminating
            if (completion is BaseContinuationImpl) {
                // 这块需要搞清楚协程体和协程对象的承继关系,仅协程体,也便是Block闭包内的完成是BaseContinuationImpl,而其协程发起本身是个AbstractCoroutine
                // unrolling recursion via loop
                current = completion
                param = outcome
            } else {
                // top-level completion reached -- invoke and return
                completion.resumeWith(outcome)
                return
            }
        }
    }
}

那么到这,其实协程的挂起恢复完成大约有了思路,编译时会进行一定程度的封装,以经过CPS办法进行回调以达到同步作用,那么它是否也会存在CPS的缺点呢?接着看

状况机

还记得示例代码吗,这边重新贴一下。相同的,还是凭借Kotlin Bytecode看看这时都生成了些什么

fun getId(): Int {
    println("getId")
    return 1
}
suspend fun getName(id: Int) {
    withContext(Dispatchers.Default) { println("getName by id $id") }
}
suspend fun getAge(id: Int) {
    withContext(Dispatchers.Default) { println("getAge by id $id") }
}
fun main() {
    runBlocking {
        val id = getId()
        delay(300L)
        getName(id)
        getAge(id)
        /*suspendCancellableCoroutine<Unit> { continuation ->
            // 这是另一个检验,可先疏忽
            println("???")
            continuation.resume(Unit)
        }*/
        println("already stop")
    }
}
/* console输出
    getId
    getName by id 1
    getAge by id 1
    already stop
    */

咱们看协程内的主体内容,留意看代码注释和label的状况轮转

BuildersKt.runBlocking$default((CoroutineContext)null, (Function2)(new Function2((Continuation)null) {
   int I$0;
   int label;
   @Nullable
   public final Object invokeSuspend(@NotNull Object $result) {
      label26: {
         int id;
         Object var4;
         label25: {
            var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               // 第一个挂起点
               /*
                   val id = getId()
                   delay(300L)
                   */
               id = PersonTestKt.getId();
               this.I$0 = id;
               // label是状况轮转的要点,即下次再进来的话就走下一个分支,会break出去
               this.label = 1;
               // 假如它是个挂起函数的话,那就先return完毕,等待其内部完毕后再唤醒,这块下面会再贴代码解说
               // 留意,这儿将本身协程体作为续体传入了进去,也便是假如调用了this.resumeWith,就会再次调用invokeSuspend,状况就动起来了
               if (DelayKt.delay(300L, this) == var4) {
                  return var4;
               }
               break;
            case 1:
               id = this.I$0;
               ResultKt.throwOnFailure($result);
               break;
            case 2:
               id = this.I$0;
               ResultKt.throwOnFailure($result);
               break label25;
            case 3:
               ResultKt.throwOnFailure($result);
               break label26;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
            this.I$0 = id;
            this.label = 2;
            // 这儿也是依样画葫芦,后边相同的就不再解说了
            if (PersonTestKt.getName(id, this) == var4) {
               return var4;
            }
         }
         this.label = 3;
         if (PersonTestKt.getAge(id, this) == var4) {
            return var4;
         }
      }
      String var3 = "already stop";
      System.out.println(var3);
      return Unit.INSTANCE;
   }
   @NotNull
   public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
      Intrinsics.checkNotNullParameter(completion, "completion");
      Function2 var3 = new <anonymous constructor>(completion);
      return var3;
   }
   public final Object invoke(Object var1, Object var2) {
      return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
   }
}), 1, (Object)null);

这个状况嵌套轮转来替代回调递归,确实是个好办法。先看它怎么区别是否需要挂起并进行return退出

public static final Object getName(final int id, @NotNull Continuation $completion) {
    // 这个$completion是外部的协程体,传给了withContext,但其结构中并没有看到第三个参数
   Object var10000 = BuildersKt.withContext((CoroutineContext)Dispatchers.getDefault(), (Function2)(new Function2((Continuation)null) {
      int label;
      @Nullable
      public final Object invokeSuspend(@NotNull Object var1) {
         Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
         switch(this.label) {
         case 0:
            ResultKt.throwOnFailure(var1);
            String var2 = "getName by id " + id;
            System.out.println(var2);
            return Unit.INSTANCE;
         default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
         }
      }
      @NotNull
      public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
         Intrinsics.checkNotNullParameter(completion, "completion");
         Function2 var3 = new <anonymous constructor>(completion);
         return var3;
      }
      public final Object invoke(Object var1, Object var2) {
         return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
      }
   }), $completion);
   // 这个解说一下,认为本身需要挂起,那就会回来本身,不然外部就持续履行,代表不需要挂起
   // 如同这么解说有点笼统,大约知道啥意思就行了
   return var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? var10000 : Unit.INSTANCE;
}

最终咱们进行验证。首要,withContextinvoke办法,suspendCoroutineUninterceptedOrReturn内的闭包,创立一个DispatchedCoroutine协程,闭包block创立本身续体,履行create>>>resume>>>invokeSuspend>>>complete.resume完成后通知协程DispatchedCoroutine

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
        val coroutine = DispatchedCoroutine(newContext, uCont)
        block.startCoroutineCancellable(coroutine, coroutine)
    }
}
// Obtains the current continuation instance inside suspend functions and either suspends currently running coroutine or returns result immediately without suspension.
public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T {
    contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
    throw NotImplementedError("Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic")
}

DispatchedCoroutine的父类是ScopeCoroutine,当resumeWith呼应后会调用到uCont.resumeWith,那这个uCont究竟是什么,上面可能看的有点迷糊,咋啥都没干就抛反常了?但想一件事!withContext解码的入参里,是有外层协程体传入的(并且是插桩加进去的额定入参),不可能不用啊,假如这个便是它的话,全部就都变得入情入理了。结合源码中的英文注释,似乎咱们的猜测没有问题,那全部就此完毕

internal open class ScopeCoroutine<in T>(
    context: CoroutineContext,
    @JvmField val uCont: Continuation<T> // unintercepted continuation
) : AbstractCoroutine<T>(context, true), CoroutineStackFrame {
    final override val callerFrame: CoroutineStackFrame? get() = uCont as CoroutineStackFrame?
    final override fun getStackTraceElement(): StackTraceElement? = null
    final override val isScopedCoroutine: Boolean get() = true
    internal val parent: Job? get() = parentContext[Job]
    override fun afterCompletion(state: Any?) {
        // Resume in a cancellable way by default when resuming from another context
        uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
    }
    override fun afterResume(state: Any?) {
        // Resume direct because scope is already in the correct context
        uCont.resumeWith(recoverResult(state, uCont))
    }
    public final override fun resumeWith(result: Result<T>) {
        val state = makeCompletingOnce(result.toState())
        if (state === COMPLETING_WAITING_CHILDREN) return
        afterResume(state)
    }
}

完毕!撒花

这块内容非常绕,容易把自己绕进去。假如看完了还没有彻底看懂建议再看一遍,这一遍牢记一个概念:协程是续体,协程体也是续体,辨明协程协程体协程包裹协程体。假如看懂了,等待点赞+收藏+重视三连