概述
关于协程的创立,以及挂起和康复,之前有写过一篇文章 Kotlin协程之深入了解协程作业原理 收拾这个流程,最近再看这篇文章的时分,感觉看起来比较费劲,不是说写得有问题,仅仅看起来比较臃肿。假如想再复习这块的常识,或许需求看几遍后才干懂,所以想别的再收拾一篇文章写写协程发动,挂起和康复的原理,适合在读完上篇文章后再看看,这篇文章的目的在于希望读完后能够明晰明了地了解 Kotlin 这部分的原理,进步效率。Kotlin 协程系列:
- Kotlin协程之根底运用
- Kotlin协程之深入了解协程作业原理
- Kotlin协程之协程撤销与反常处理
Kotlin 因为本身灵活的语法和特性,导致有些时分盯梢它的源码时,容易跟着跟着就迷路了,记得我刚开端尝试阅览协程源码的时分,也是头大了一圈,后边 Kotlin 用的看的多了,现在再阅览就显得轻松了不少。
前置常识
在阅览 Kotlin 源码之前,能够先了解一些前置常识。
Function
Function 是 Kotlin 对函数类型的封装,关于函数类型,它会被编译成 FunctionX 系列的类:
// 0 个参数
public interface Function0<out R> : Function<R> {
public operator fun invoke(): R
}
// 1 个参数
public interface Function1<in P1, out R> : Function<R> {
public operator fun invoke(p1: P1): R
}
// X 个参数
Kotlin 供给了从 Function0 到 Function22 之间的接口,这意味着咱们的 lambda 函数最多能够支撑 22 个参数,别的 Function 接口有一个 invoke 操作符重载,因此咱们能够直接经过 ()
调用 lambda 函数:
val sum = { a: Int, b: Int ->
a + b
}
sum(10, 12)
sum.invoke(10, 12)
编译成 Java 代码后:
Function2 sum = (Function2)null.INSTANCE;
sum.invoke(10, 12);
sum.invoke(10, 12);
// lambda 编译后的类
final class KotlinTest$main$sum$1 extends Lambda implements Function2<Integer, Integer, Integer> {
public static final KotlinTest$main$sum$1 INSTANCE = new KotlinTest$main$sum$1();
KotlinTest$main$sum$1() {
super(2);
}
@Override // kotlin.jvm.functions.Function2
public /* bridge */ /* synthetic */ Integer invoke(Integer num, Integer num2) {
return invoke(num.intValue(), num2.intValue());
}
public final Integer invoke(int a, int b) {
return Integer.valueOf(a + b);
}
}
能够看到关于 lambda 函数,在编译后会生成一个完成 Function 接口的类,并在运用 lambda 函数时创立一个单例目标来调用,创立目标的进程是编译器主动生成的代码。
而关于协程里的 lambda 代码块,也会为其创立一个目标,它完成 FunctionX 接口,并承继 SuspendLambda 类,不相同的地方在于它会主动增加一个 Continuation 类型的参数。
Continuation Passing Style(CPS)
Continuation Passing Style(续体传递风格): 约定一种编程标准,函数不直接回来成果值,而是在函数终究一个参数位置传入一个 callback 函数参数,并在函数履行完结时经过 callback 来处理成果。回调函数 callback 被称为续体(Continuation),它决议了程序接下来的行为,整个程序的逻辑经过一个个 Continuation 拼接在一起。
Kotlin 协程实质便是运用 CPS 来完成对进程的操控,并处理了 CPS 会产生的问题(如回调阴间,栈空间占用)
- Kotlin suspend 挂起函数写法与一般函数相同,但编译器会对 suspend 关键字的函数做 CPS 改换,这便是咱们常说的用看起来同步的办法写出异步的代码,消除回调阴间(callback hell)。
- 别的为了防止栈空间过大的问题, Kotlin 编译器并没有把代码转换成函数回调的形式,而是运用状况机模型。每两个挂起点之间能够看为一个状况,每次进入状况机时都有一个当时的状况,然后履行该状况对应的代码;假如程序履行结束则回来成果值,否则回来一个特殊值,表明从这个状况退出并等待下次进入。相当于创立了一个可复用的回调,每次都运用这同一个回调,依据不同状况来履行不同的代码。
Continuation
Kotlin 续体有两个接口: Continuation 和 CancellableContinuation, 望文生义 CancellableContinuation 是一个能够撤销的 Continuation。
Continuation 成员:
-
val context: CoroutineContext
: 当时协程的 CoroutineContext 上下文 -
fun resumeWith(result: Result<T>)
: 传递 result 康复协程
CancellableContinuation 成员:
-
isActive, isCompleted, isCancelled
: 表明当时 Continuation 的状况 -
fun cancel(cause: Throwable? = null)
: 可选经过一个反常 cause 来撤销当时 Continuation 的履行
能够将 Continuation 看成是在挂起点康复后需求履行的代码封装(经过之前的文章能够知道是经过状况机完成的),比方说对如下逻辑:
suspend fun request() = suspendCoroutine<Response> {
val response = doRequest()
it.resume(response)
}
fun test() = runBlocking {
val response = request()
handle(response)
}
用下面的伪代码简略描述 Continuation 的作业:
// 假装是 Continuation 接口
interface Continuation<T> {
fun resume(t: T)
}
fun request(continuation: Continuation<Response>) {
val response = doRequest()
continuation.resume(response)
}
fun test() {
request(object :Continuation<Response>{
override fun resume(response: Response) {
handle(response)
}
})
}
关于 suspend 关键词润饰的挂起函数,编译器会为其增加一个 Continuation 续体类型的参数(相当于 CPS 中的回调),能够经过这个 Continuation 续体目标的 resume 办法回来成果值来康复协程的履行。
协程创立与发动
SuspendLambda
Kotlin 编译时会将 lambda 协程代码块编译成 SuspendLambda 的子类:
fun main() {
GlobalScope.launch {
val id = getId()
val avatar = getAvatar(id)
println("${Thread.currentThread().name} - $id - $avatar")
}
}
对应的字节码能够看到:
final class Main$main$1 extends kotlin/coroutines/jvm/internal/SuspendLambda implements kotlin/jvm/functions/Function2
SuspendLambda 完成了 Continuation 续体接口,其 resume 办法能够康复协程的履行;别的它将协程体封装成 SuspendLambda 目标,其内以状况机的形式消除回调阴间,并完成逻辑的次序履行。
承继联系
- Continuation: 续体,康复协程的履行
- BaseContinuationImpl: 完成 resumeWith(Result) 办法,操控状况机的履行,定义了 invokeSuspend 抽象办法
- ContinuationImpl: 增加 intercepted 阻拦器,完成线程调度等
- SuspendLambda: 封装协程体代码块
- 协程体代码块生成的子类: 完成 invokeSuspend 办法,其内完成状况机流通逻辑
这下子,是不是就明晰了许多?那咱们接下来看协程是怎样开端发动的。
协程发动流程
CoroutineScope.launch
从 CoroutineScope.launch
开端盯梢协程发动流程:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
// newContext = scope效果域上下文 + context参数上下文 + Dispatchers.Default(未指定则增加)
val newContext = newCoroutineContext(context)
// 创立协程目标
val coroutine = if (start.isLazy) {
LazyStandaloneCoroutine(newContext, block)
} else {
StandaloneCoroutine(newContext, active = true)
}
// 发动协程
coroutine.start(start, coroutine, block)
return coroutine
}
// 发动协程
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
start(block, receiver, this)
}
上面 coroutine.start
的调用涉及到运算符重载,实践上会调到 CoroutineStart.invoke()
办法:
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
咱们能够注意下 completion 参数,它是一个续体 Continuation 类型,此刻传入的实参为 StandaloneCoroutine/LazyStandaloneCoroutine 目标,在协程体的逻辑履行完后会调用到其 resume 办法(CPS),做一些收尾作业,比方说修改状况等。
此刻 receiver 和 completion 都是 launch() 中创立的 StandaloneCoroutine 协程目标。接着往下看:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) = runSafely(completion) {
// 重新创立 SuspendLambda 子类目标
createCoroutineUnintercepted(receiver, completion)
// 调用阻拦器逻辑,进行线程调度等
.intercepted()
// 真实履行协程逻辑
.resumeCancellableWith(Result.success(Unit), onCancellation)
}
创立SuspendLambda
看看上面 createCoroutineUnintercepted 中的代码:
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}
咱们在前面说过,这个协程体会被编译成 SuspendLambda 的子类,其也是 BaseContinuationImpl 的子类目标,因此会走上面的 create() 办法,经过 completion 续体参数创立一个新的 SuspendLambda 目标,这是之前说的 协程的三层包装 里的第二层包装,它持有的 completion 目标是第一层封装(AbstractCoroutine)。
所以在协程发动进程中针对一个协程体会创立两个 SuspendLambda 的子类目标:
- 调用
launch()
时创立第一个,传入 null 作为参数,作为一个一般的 Function 目标运用 - 调用
create()
时创立第二个,传入 completion 续体作为参数
BuildersKt.launch$default(/*...*/ (Function2)(new Function2((Continuation)null))
线程调度
接着调用
SuspendLambda.intercepted()
办法履行阻拦器逻辑,从上下文中获取阻拦器(Dispatcher调度器)阻拦当时 continuation 目标,将其包装成 DispatchedContinuation 类型,这便是协程的第三层包装,封装了线程调度等逻辑,其 continuation 参数便是第二层包装(SuspendLambda)实例。
关于线程调度的详细逻辑,后边再单独写篇文章收拾,此处略过。
发动协程
在经过 SuspendLambda 目标创立了 DispatchedContinuation 续体后,接着履行其 resumeCancellableWith() 办法,详细履行代码不贴出了,终究会调用到 continuation.resumeWith(result)
办法,而这个 continuation 便是之前传入的第二层封装 SuspendLambda 目标,其 resumeWith() 办法在父类 BaseContinuationImpl 中:
// BaseContinuationImpl
public final override fun resumeWith(result: Result<Any?>) {
// ...
val outcome = invokeSuspend(param)
// ...
}
上面的 invokeSuspend() 是一个抽象办法,它的完成在编译器生成的 SuspendLambda 子类中,详细逻辑是经过状况机来履行协程体中的逻辑,详细见下章解析。
到这里咱们 launch() 里的协程体逻辑就开端真实履行了。
协程挂起与康复
协程的发动,挂起和康复有两个关键办法: invokeSuspend()
和 resumeWith(Result)
。invokeSuspend() 办法是对协程代码块的封装,内部参加状况机机制将整个逻辑分为多块,分隔点便是每个挂起点。协程发动时会先调用一次 invokeSuspend() 函数触发协程体的开端履行,后边每逢调用到一个挂起函数时,挂起函数会回来 COROUTINE_SUSPENDED 标识,然后 return 停掉 invokeSuspend() 函数的履行,即非堵塞挂起。编译器会为挂起函数主动增加一个 continuation 续体目标参数,表明调用它的那个协程代码块,在该挂起函数履行完结后,就会调用到续体 continuation.resumeWith() 办法来回来成果(或反常),而在 resumeWith() 中又调用了 invokeSuspend() 办法,其内依据状况机的状况来康复协程的履行。这便是整个协程的挂起和康复进程。
接下来看详细解析。
协程的状况机
在之前 协程的状况机 一文里曾经剖析过协程的状况机,并且贴出了对应的 Java 代码,剖析其状况的流通进程,这次换个思路来看看,对如下代码:
fun main() = CoroutineScope(Dispatchers.Main).launch {
println("label 0")
val isLogin = checkLogin() // suspend
println("label 1")
println(isLogin)
val login = login() // suspend
println("label 2")
println(login)
val id = getId() // suspend
println("label 3")
println(id)
}
关于协程体中的代码,首个挂起点前的代码可看为初始状况, 其后每两个挂起点之间都是一个新的状况,终究一个挂起点到结束是终究的状况。其对应的状况机伪代码如下,协程体被编译成 SuspendLambda 子类,它完成父类中的 invokeSuspend() 办法,是协程的真实履行逻辑:
final class KotlinTest$main$1 extends SuspendLambda implements Function2 {
int label = 0; // 状况码
public final Object invokeSuspend(Object result) {
switch(this.label) {
case 0:
println("label 0");
label = 1;
result = checkLogin(this); // this 是编译器增加的续体参数
if (result == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
break;
case 1:
// 此刻传入的 result 是 checkLogin() 的成果
println("label 1")
val isLogin = result;
println(isLogin)
label = 2;
result = login(this); // this 是编译器增加的续体参数
if (result == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
break;
case 2:
// 此刻传入的 result 是 login() 的成果
println("label 2")
val login = result;
println(login)
label = 3;
result = getId(this); // this 是编译器增加的续体参数
if (result == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
break;
case 3:
// 此刻传入的 result 是 getId() 的成果
println("label 3")
val id = result;
println(id)
return;
}
}
}
看上面每次调用 suspend 函数时都会传一个 this 参数(continuation),这个参数是编译器增加的续体参数,表明的是协程体自身,在 suspend 挂起函数履行结束后会调用 continuation.resumeWith() -> invokeSuspend(result)
来康复该状况机的履行。
协程挂起
上面给出了协程体 SuspendLambda.invokeSuspend() 办法的状况机伪代码,那再看下 SuspendLambda 父类 BaseContinuationImpl 中的 resumeWith() 办法:
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
with(current) {
val outcome: Result<Any?> = try {
// invokeSuspend() 履行续体下一个状况的逻辑
val outcome = invokeSuspend(param)
// 假如续体里调用到了挂起函数,则直接 return
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
// 关于 launch 发动的协程体,传入的 completion 是 AbstractCoroutine 子类目标
completion.resumeWith(outcome)
return
}
}
}
}
}
咱们说过协程发动后会调用到上面这个 resumeWith() 办法,接着调用其 invokeSuspend() 办法:
- 当 invokeSuspend() 回来 COROUTINE_SUSPENDED 后,就直接 return 终止履行了,此刻协程被挂起。
- 当 invokeSuspend() 回来非 COROUTINE_SUSPENDED 后,说明协程体履行结束了,关于 launch 发动的协程体,传入的 completion 是 AbstractCoroutine 子类目标,终究会调用其 AbstractCoroutine.resumeWith() 办法做一些状况改变之类的收尾逻辑。至此协程便履行结束了。
协程康复
这里咱们接着看上面第一条:协程履行到挂起函数被挂起后,当这个挂起函数履行结束后是怎样康复协程的,以下面挂起函数为例:
private suspend fun login() = withContext(Dispatchers.IO) {
Thread.sleep(1000)
return@withContext true
}
经过反编译能够看到上面挂起函数中的函数体也被编译成了 SuspendLambda 的子类,创立其实例时也需求传入 Continuation 续体参数(调用该挂起函数的协程所在续体)。贴下 withContext 的源码:
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// compute new context
val oldContext = uCont.context
val newContext = oldContext + context
// always check for cancellation of new context
newContext.ensureActive()
// FAST PATH #1 -- new context is the same as the old one
if (newContext === oldContext) {
val coroutine = ScopeCoroutine(newContext, uCont)
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
// FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val coroutine = UndispatchedCoroutine(newContext, uCont)
// There are changes in the context, so this thread needs to be updated
withCoroutineContext(newContext, null) {
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
// SLOW PATH -- use new dispatcher
val coroutine = DispatchedCoroutine(newContext, uCont)
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
}
首要调用了 suspendCoroutineUninterceptedOrReturn 办法,看注释知道能够经过它来获取到当时的续体目标 uCont, 接着有几条分支调用,但终究都是会经过续体目标来创立挂起函数体对应的 SuspendLambda 目标,并履行其 invokeSuspend() 办法,在其履行结束后调用 uCont.resume() 来康复协程,详细逻辑大家感兴趣能够自己跟代码,与前面迥然不同。
至于其他的顶层挂起函数如 await()
, suspendCoroutine()
, suspendCancellableCoroutine()
等,其内部也是经过 suspendCoroutineUninterceptedOrReturn() 来获取到当时的续体目标,以便在挂起函数体履行结束后,能经过这个续体目标康复协程履行。
协程库没有直接供给创立续体目标的办法,一般都是经过 suspendCoroutineUninterceptedOrReturn() 函数获取的,感兴趣的同学能够看看这个办法的注释:
Obtains the current continuation instance inside suspend functions and either suspends currently running coroutine or returns result immediately without suspension...
。
总结
Kotlin 协程实质便是运用 CPS 来完成对进程的操控,并处理了 CPS 会产生的问题(如回调阴间,栈空间占用)。
Kotlin suspend 挂起函数写法与一般函数相同,但编译器会对 suspend 关键字的函数做 CPS 改换;Kotlin 编译器并没有把代码转换成函数回调的形式,而是运用状况机模型,消除 callback hell, 处理栈空间占用问题。
行将协程代码块编译成 SuspendLambda 子类,完成 invokeSuspend() 办法。
invokeSuspend() 办法是对协程代码块的封装,内部参加状况机机制将整个逻辑分为多块,分隔点便是每个挂起点。协程发动时会先调用一次 invokeSuspend() 函数触发协程体的开端履行,后边每逢调用到一个挂起函数时,挂起函数会回来 COROUTINE_SUSPENDED 标识,然后 return 停掉 invokeSuspend() 函数的履行,即非堵塞挂起。编译器会为挂起函数主动增加一个 continuation 续体目标参数,表明调用它的那个协程代码块,在该挂起函数履行完结后,就会调用到续体 continuation.resumeWith() 办法来回来成果(或反常),而在 resumeWith() 中又调用了 invokeSuspend() 办法,其内依据状况机的状况来康复协程的履行。
Kotlin 协程中存在三层包装,每层包装都持有上层包装的引证,用来履行其 resumeWith() 办法做一些处理:
- 第一层包装: launch & async 回来的 Job, Deferred 承继自 AbstractCoroutine, 里边封装了协程的状况,供给了 cancel 等接口;
- 第二层包装: 编译器生成的 SuspendLambda 子类,封装了协程的真实履行逻辑,其承继联系为 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl, 它的 completion 参数便是第一层包装实例;
- 第三层包装: DispatchedContinuation, 封装了线程调度逻辑,它的 continuation 参数便是第二层包装实例。
这三层包装都完成了 Continuation 续体接口,经过代理模式将协程的各层包装组合在一起,每层担任不同的功用。
下图的 resumeWith() 或许表明 resume(), 也或许表明 resumeCancellableWith() 等系列办法:
博文链接
文中内容如有错误欢迎指出,共同进步!觉得不错的同学留个赞再走哈~