拜读
# Kotlin协程之再次读懂协程工作原理-作者:苍耳叔叔
示例
协程中suspend
的挂机机制使得异步wait\notify变成一如同步的丝滑体验,而关于异步咱们通常的做法是监听+回调
的CPS方案,但假如嵌套得过深,会形成回调地域和栈帧溢出。那协程的的挂起究竟是怎么做的呢
Continuation Passing Style(续体传递风格): 约定一种编程标准,函数不直接回来结果值,而是在函数最终一个参数方位传入一个 callback 函数参数,并在函数履行完成时经过 callback 来处理结果。
fun getId(): Int {
println("getId")
return 1
}
suspend fun getName(id: Int) {
withContext(Dispatchers.Default) { println("getName by id $id") }
}
suspend fun getAge(id: Int) {
withContext(Dispatchers.Default) { println("getAge by id $id") }
}
fun main() {
runBlocking {
val id = getId()
delay(300L)
getName(id)
getAge(id)
/*suspendCancellableCoroutine<Unit> { continuation ->
// 这是另一个检验,可先疏忽
println("???")
continuation.resume(Unit)
}*/
println("already stop")
}
}
/* console输出
getId
getName by id 1
getAge by id 1
already stop
*/
协程的启动
从launch
办法开始,咱们以默许调用链深入,找到了该办法startCoroutineCancellable
,其他withContext
、async
也都迥然不同,可自行发散探究
// 示例
GlobalScope.launch {
}
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
// 简单看一下这个类,: JobSupport(active), Job, Continuation<T>, CoroutineScope
// 完成了Continuation接口,其主要办法resumeWith
// 前面咱们讲了CPS的界说,这儿留个心眼,也便是意味着coroutine可能会作为续体,且resumeWith极有可能是唤起的回调
StandaloneCoroutine(newContext, active = true)
// 留意这儿,它把自己传进去了
coroutine.start(start, coroutine, block)
return coroutine
}
// 这是上面 coroutine.start(start, coroutine, block)的调用
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
// 这儿调用了CoroutineStart.invoke,需要留意
// 然后方才讲了receiver是它coroutine,这回又传了个this,这儿参数类型是Continuation
start(block, receiver, this)
}
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
// 得,将coroutine看作两种类型,持续递传。但这儿是协程体调用的办法
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
现在看到了协程预备启动的地方了,先进行了create
创立、intercepted
线程调度(不打开)、resumeCancellableWith
运转。但牢记,此时调用办法的是协程体
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
// 留意看回来值,也便是说又有一个续体出来了
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
// 会走这个,至于为什么会走,等后边剖析解码的时分就知道承继关系了
// 这儿传入了续体(协程),需留心
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}
// 最终都会走resumeWith,留意是对续体接口的扩展
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
从上咱们得知协程内部的履行依赖于Continuation续体接口,那这段代码中有个疑问create
回来的新的续体在哪呢,接着看下去
解码
以下内容源于GlobalScope.launch
解码成java文件后的内容,咱们发现了create
函数,但其应该是BaseContinuationImpl
办法,这儿分明实例化的是Function2
BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object var1) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure(var1);
return Unit.INSTANCE;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 3, (Object)null);
经过AndroidStudio自带的Kotlin Bytecode
检查字节码得知,这个多出来的东西PersonTestKt$main$2
承继于SuspendLambda
,完成了Function2
接口罢了,而追踪承继咱们得到:SuspendLambda>>>ContinuationImpl>>>BaseContinuationImpl
L1
LINENUMBER 33 L1
GETSTATIC kotlinx/coroutines/GlobalScope.INSTANCE : Lkotlinx/coroutines/GlobalScope;
CHECKCAST kotlinx/coroutines/CoroutineScope
ACONST_NULL
ACONST_NULL
NEW com/wjf/self_demo/PersonTestKt$main$2
DUP
ACONST_NULL
INVOKESPECIAL com/wjf/self_demo/PersonTestKt$main$2.<init> (Lkotlin/coroutines/Continuation;)V
CHECKCAST kotlin/jvm/functions/Function2
ICONST_3
ACONST_NULL
INVOKESTATIC kotlinx/coroutines/BuildersKt.launch$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/Job;
POP
L2
final class com/wjf/self_demo/PersonTestKt$main$2 extends kotlin/coroutines/jvm/internal/SuspendLambda implements kotlin/jvm/functions/Function2 {
原来协程体本身完成了续体接口(且承继于BaseContinuationImpl
),而协程本身也是个续体(但承继于AbstractCoroutine
是协程,但完成了续体接口),并且协程体在create
时,是把外部续体传入了的并持有了的,在类BaseContinuationImpl
中即成员变量completion
。从解码的内容来看,kotlin关于suspend关键词是进行了字节码增强的,均会被转化为SuspendLambda
。那么剩余resume
启动了,结合上面解码出来的.java
看,先调用invokeSuspend
完成其内部逻辑,完成后会调用parent也便是外部续体(协程)的resumeWith
进行通知
// BaseContinuationImpl
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// 这块需要搞清楚协程体和协程对象的承继关系,仅协程体,也便是Block闭包内的完成是BaseContinuationImpl,而其协程发起本身是个AbstractCoroutine
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
那么到这,其实协程的挂起恢复完成大约有了思路,编译时会进行一定程度的封装,以经过CPS办法进行回调
以达到同步作用,那么它是否也会存在CPS的缺点呢?接着看
状况机
还记得示例代码吗,这边重新贴一下。相同的,还是凭借Kotlin Bytecode
看看这时都生成了些什么
fun getId(): Int {
println("getId")
return 1
}
suspend fun getName(id: Int) {
withContext(Dispatchers.Default) { println("getName by id $id") }
}
suspend fun getAge(id: Int) {
withContext(Dispatchers.Default) { println("getAge by id $id") }
}
fun main() {
runBlocking {
val id = getId()
delay(300L)
getName(id)
getAge(id)
/*suspendCancellableCoroutine<Unit> { continuation ->
// 这是另一个检验,可先疏忽
println("???")
continuation.resume(Unit)
}*/
println("already stop")
}
}
/* console输出
getId
getName by id 1
getAge by id 1
already stop
*/
咱们看协程内的主体内容,留意看代码注释和label
的状况轮转
BuildersKt.runBlocking$default((CoroutineContext)null, (Function2)(new Function2((Continuation)null) {
int I$0;
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
label26: {
int id;
Object var4;
label25: {
var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
// 第一个挂起点
/*
val id = getId()
delay(300L)
*/
id = PersonTestKt.getId();
this.I$0 = id;
// label是状况轮转的要点,即下次再进来的话就走下一个分支,会break出去
this.label = 1;
// 假如它是个挂起函数的话,那就先return完毕,等待其内部完毕后再唤醒,这块下面会再贴代码解说
// 留意,这儿将本身协程体作为续体传入了进去,也便是假如调用了this.resumeWith,就会再次调用invokeSuspend,状况就动起来了
if (DelayKt.delay(300L, this) == var4) {
return var4;
}
break;
case 1:
id = this.I$0;
ResultKt.throwOnFailure($result);
break;
case 2:
id = this.I$0;
ResultKt.throwOnFailure($result);
break label25;
case 3:
ResultKt.throwOnFailure($result);
break label26;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
this.I$0 = id;
this.label = 2;
// 这儿也是依样画葫芦,后边相同的就不再解说了
if (PersonTestKt.getName(id, this) == var4) {
return var4;
}
}
this.label = 3;
if (PersonTestKt.getAge(id, this) == var4) {
return var4;
}
}
String var3 = "already stop";
System.out.println(var3);
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 1, (Object)null);
这个状况嵌套轮转来替代回调递归,确实是个好办法。先看它怎么区别是否需要挂起并进行return
退出
public static final Object getName(final int id, @NotNull Continuation $completion) {
// 这个$completion是外部的协程体,传给了withContext,但其结构中并没有看到第三个参数
Object var10000 = BuildersKt.withContext((CoroutineContext)Dispatchers.getDefault(), (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object var1) {
Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure(var1);
String var2 = "getName by id " + id;
System.out.println(var2);
return Unit.INSTANCE;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), $completion);
// 这个解说一下,认为本身需要挂起,那就会回来本身,不然外部就持续履行,代表不需要挂起
// 如同这么解说有点笼统,大约知道啥意思就行了
return var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? var10000 : Unit.INSTANCE;
}
最终咱们进行验证。首要,withContext
会invoke
办法,suspendCoroutineUninterceptedOrReturn
内的闭包,创立一个DispatchedCoroutine
协程,闭包block
创立本身续体,履行create>>>resume>>>invokeSuspend>>>complete.resume
完成后通知协程DispatchedCoroutine
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
val coroutine = DispatchedCoroutine(newContext, uCont)
block.startCoroutineCancellable(coroutine, coroutine)
}
}
// Obtains the current continuation instance inside suspend functions and either suspends currently running coroutine or returns result immediately without suspension.
public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
throw NotImplementedError("Implementation of suspendCoroutineUninterceptedOrReturn is intrinsic")
}
而DispatchedCoroutine的父类是ScopeCoroutine,当resumeWith呼应后会调用到uCont.resumeWith,那这个uCont
究竟是什么,上面可能看的有点迷糊,咋啥都没干就抛反常了?但想一件事!withContext
解码的入参里,是有外层协程体传入的(并且是插桩加进去的额定入参),不可能不用啊,假如这个便是它的话,全部就都变得入情入理了。结合源码中的英文注释,似乎咱们的猜测没有问题,那全部就此完毕
internal open class ScopeCoroutine<in T>(
context: CoroutineContext,
@JvmField val uCont: Continuation<T> // unintercepted continuation
) : AbstractCoroutine<T>(context, true), CoroutineStackFrame {
final override val callerFrame: CoroutineStackFrame? get() = uCont as CoroutineStackFrame?
final override fun getStackTraceElement(): StackTraceElement? = null
final override val isScopedCoroutine: Boolean get() = true
internal val parent: Job? get() = parentContext[Job]
override fun afterCompletion(state: Any?) {
// Resume in a cancellable way by default when resuming from another context
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
override fun afterResume(state: Any?) {
// Resume direct because scope is already in the correct context
uCont.resumeWith(recoverResult(state, uCont))
}
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
}
完毕!撒花
这块内容非常绕,容易把自己绕进去。假如看完了还没有彻底看懂建议再看一遍,这一遍牢记一个概念:
协程
是续体,协程体
也是续体,辨明协程
和协程体
,协程
包裹协程体
。假如看懂了,等待点赞+收藏+重视三连