前语:只要在那崎岖的小路上不畏艰险英勇攀爬的人,才有期望达到光芒的顶点。 ——马克思
前语
经过前面两篇协程的学习,我相信咱们对协程的运用现已非常了解了。本着知其然更要知其之所以然的心态,很想知道它里边是怎样可以让异步代码同步化的?协程它是如何完结线程的调度的?协程的挂起和康复实质是什么?今日在这儿逐个为咱们解答。
整个Kotlin 协程学习分为三部曲,本文是第三篇:
Kotlin 协程实战进阶(一、筑基篇)
Kotlin 协程实战进阶(二、进阶篇)
Kotlin 协程实战进阶(三、原理篇)
(本文需求前面两篇文章协程的知识点作为基础)
本文大纲
协程最中心的便是挂起与康复,可是这两个名称在一定程度上面迷惑了咱们,由于这两个名词并不可以让咱们在源码上面和它的完结原理有清晰的认知。
协程的挂起实质上是办法的挂起,而办法的挂起实质上是 return
,协程的康复实质上办法的康复,而康复的实质是 callback
回调。
可是咱们在Kotlin协程源码里边看不到 return
和 callback
回调的,其实这些都是kotlin编译器帮咱们做了,单单看kotlin的源码是看不出所以然的,需求反编译成Java文件,才干看到实质之处。
经过AS的工具栏中 Tools
->kotlin
->show kotlin ByteCode
,得到的是java字节码,需求再点击Decompile
按钮反编译成java源码:
一、协程首要结构
1.suspend fun
再来复习一下挂起函数:
-
suspend
是 Kotlin 协程最中心的关键字; - 运用
suspend
关键字润饰的函数叫作挂起函数
,挂起函数
只能在协程体内或许在其他挂起函数
内调用; - 被关键字
suspend
润饰的办法在编译阶段,编译器会修正办法的签名,包含回来值,润饰符,入参,办法体完结。
@GET("users/{login}")
suspend fun getUserSuspend(@Path("login") login: String): User
将上面的挂起函数反编译:
@GET("users/{login}")
@Nullable
Object getUserSuspend(@Path("login") @NotNull String var1, @NotNull Continuation var2);
- 反编译后你会发现多了一个
Continuation
参数(它便是callback
),也便是说调用挂起函数的时分需求传递一个Continuation
,仅仅传递这个参数是由编译器悄然传,而不是咱们传递的。这便是挂起函数为什么只能在协程或许其他挂起函数中履行,由于只要挂起函数或许协程中才有Continuation
。 - 可是编译器怎样判别哪些办法需求
callback
呢?便是经过suspend
关键字来区其他。suspend
润饰的办法会在编译期间被Kotlin编译器做特殊处理,编译器会认为一旦一个办法增加suspend
关键字,有可能会导致协程暂停往下履行,所以此刻会给办法传递要给Continuation
。等办法履行完结后,经过Continuation
回调回去,然后让协程康复,持续往下履行。 - 它还把回来值
User
改成了Object
。
2.Continuation
Continuation
是 Kotlin 协程中非常重要的一个概念,它表明一个挂起点之后的连续操作
。
//Continuation接口表明挂起点之后的连续,该挂起点回来类型为“T”的值。
public interface Continuation<in T> {
//对应这个Continuation的协程上下文
public val context: CoroutineContext
//康复相应协程的履行,传递一个成功或失败的成果作为最终一个挂起点的回来值。
public fun resumeWith(result: Result<T>)
}
//将[value]作为最终一个挂起点的回来值,康复相应协程的履行。
fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
//康复相应协程的履行,以便在最终一个挂起点之后从头抛出[反常]。
fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
resumeWith(Result.failure(exception))
Continuation
有一个 resumeWith
函数可以接纳 Result 类型的参数。在成果成功获取时,调用resumeWith(Result.success(value))
或许调用拓宽函数resume(value)
;出现反常时,调用resumeWith(Result.failure(exception))
或许调用拓宽函数resumeWithException(exception)
,这便是 Continuation
的康复调用。
Continuation
相似于网络恳求回调Callback
,也是一个恳求成功或失败的回调:
public interface Callback {
//恳求失败回调
void onFailure(Call call, IOException e);
//恳求成功回调
void onResponse(Call call, Response response) throws IOException;
}
3.SuspendLambda
suspend{}
其实便是协程的本体,它是协程实在履行的逻辑,会创立一个SuspendLambda
类,它是Continuation
的完结类。
二、协程的创立流程
1.协程的创立
规范库给咱们供给的创立协程最原始的api:
public fun <T> (suspend () -> T).createCoroutine(
completion: Continuation<T>
): Continuation<Unit>
public fun <R, T> (suspend R.() -> T).createCoroutine(
receiver: R,
completion: Continuation<T>
): Continuation<Unit>
协程创立后会有两个 Contiunation
,需求分清楚:
-
completion
: 表明协程本体,协程履行完需求一个Continuation
实例在康复时调用; -
Contiunation<Unit>
: 它是创立出来的协程的载体,(suspend () -> T)
函数会被传给该实例作为协程的实践履行体。
这两个Contiunation
是不同的东西。传进来的 completion
实践上是协程的本体,协程履行完需求一个 Contiunation
回调履行的,所以它叫 completion
;还有一个回来的 Contiunation
,它便是协程创立出来的载体,当它里边一切的resume都履行完结之后就会调用上面的 completion
的resumeWith()
办法康复协程。
2.协程的效果域
在Androidx的Activity中模仿创立网络恳求,经过这个例子来深挖协程的原理:
lifecycleScope.launch(Dispatchers.IO) {
val result = requestUserInfo()
tvName.text = result
}
/**
* 模仿恳求,2秒后回来数据
*/
suspend fun requestUserInfo(): String {
delay(2000)
return "result form userInfo"
}
跟进lifecycleScope
源码:
val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
get() = lifecycle.coroutineScope
val Lifecycle.coroutineScope: LifecycleCoroutineScope
get() {
while (true) {
······
//相关声明周期的效果域完结类
val newScope = LifecycleCoroutineScopeImpl(
this,
SupervisorJob() + Dispatchers.Main.immediate
)
//注册生命周期
newScope.register()
return newScope
}
}
效果域也是其实便是为协程界说的效果范围,为了保证一切的协程都会被追踪,Kotlin 不允许在没有运用CoroutineScope
的情况下发动新的协程。
lifecycleScope经过lifecycle
,SupervisorJob()
,Dispatchers.Main
创立一个LifecycleCoroutineScopeImpl
,它是一个相关宿主生命周期的效果域。CoroutineScope
绑定到这个LifecycleOwner
的Lifecycle
。当宿主被销毁时,这个效果域也被撤销。
3.协程的发动
进入launch()
:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
//创立新的上下文
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
//推迟履行的协程
LazyStandaloneCoroutine(newContext, block) else
//独立的协程
StandaloneCoroutine(newContext, active = true)
//发动协程
coroutine.start(start, coroutine, block)
//回来coroutine,coroutine中完结了job接口
return coroutine
}
参数 block: suspend CoroutineSope.() -> Unit
表明协程代码,实践上便是闭包代码块。
这儿边做了三件事:
- 依据context参数创立一个新的协程上下文
CoroutineContext
; - 创立
Coroutine
,假如发动形式为Lazy
则创立LazyStandaloneCoroutine
,不然创立StandaloneCoroutine
; -
coroutine.start()
发动协程。
newCoroutineContext
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
//将效果域的上下文与传入的参数合并为新的上下文
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
为新协程创立上下文。经过 +
将效果域的上下文 coroutineContext
与传入的上下文 context
合并为新的上下文。它在没有指定其他调度器或 ContinuationInterceptor
时则默许运用 Dispatchers.Default
。
StandaloneCoroutine
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
override fun handleJobException(exception: Throwable): Boolean {
//处理反常
handleCoroutineException(context, exception)
return true
}
}
public abstract class AbstractCoroutine<in T>(
protected val parentContext: CoroutineContext,
active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
······
//发动协程
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
//依据[start]参数发动协程
start(block, receiver, this)
}
}
假如不指定发动形式,则默许运用 CoroutineStart.DEFAULT
,创立一个独立协程 StandaloneCoroutine
,而
StandaloneCoroutine
承继了 AbstractCoroutine
类,并重写了父类的 handleJobException()
办法。AbstractCoroutine
用于在协程构建器中完结协程的笼统基类,
完结了 Continuation
、 Job
和 CoroutineScope
等接口。所以 AbstractCoroutine
自身也是一个 Continuation
。
coroutine.start()
start(block, receiver, this)
从上面的源码中,协程发动 coroutine.start()
办法是 AbstractCoroutine
类中完结的,这儿涉及到运算符重载,然后该办法实践上会调用 CoroutineStart#invoke()
办法 ,并把代码块和接纳者、completion
等参数传到 CoroutineStart
中。
public enum class CoroutineStart {
//···
fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit
}
}
运用此协程战略将相应的块 [block]
作为协程发动。这儿的 [block]
便是协程里边履行的代码块。
- block: 协程里边履行的代码块;
- receiver: 接纳者;
-
completion: 协程的本体,协程履行完需求一个
Continuation
实例在康复时调用。
在上面 AbstractCoroutine
咱们看到 completion
传递的是 this
,也便是 AbstractCoroutine
自己,也便是 Coroutine
协程自身。所以这个 completion
便是协程本体。(这是Continuation三层包装的第一层包装)
接着进入 startCoroutineCancellable()
,可以以可撤销的办法发动协程,以便在等候调度时撤销协程:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// 1.创立一个没有被阻拦的 Continuation
createCoroutineUnintercepted(receiver, completion)
// 2.增加阻拦器
.intercepted()
// 3.履行协程,也是调用continuation.resumeWith(result)
.resumeCancellableWith(Result.success(Unit))
}
这儿首要做了三件事:
- 创立一个新的
Continuation
。 - 给
Continuation
加上ContinuationInterceptor
阻拦器,也是线程调度的关键。 -
resumeCancellableWith
终究调用continuation.resumeWith(result)
履行协程。
4.创立Continuation<Unit>
createCoroutineUnintercepted()
每次调用此函数时,都会创立一个新的可暂停核算实例。
经过回来的 Continuation<Unit>
实例上调用 resumeWith(Unit)
开端履行创立的协程。
#IntrinsicsJvm.kt
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {//回来Continuation<Unit>,它便是协程的载体
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
//假如调用者是 `BaseContinuationImpl` 或许其子类
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}
要点:创立并回来一个未阻拦的Continuation,它便是协程的载体。(这是Continuation三层包装的第二层包装)(suspend () -> T)
函数会被传给该实例作为协程的实践履行体。这个 Continuation
封装了协程的代码运转逻辑和康复接口,下面会讲到。
由于this便是(suspend () -> T)
,SuspendLambda
又是 BaseContinuationImpl
的完结类,则履行 create()
办法创立协程载体:
abstract class BaseContinuationImpl {
//···
public open fun create(completion: Continuation<*>): Continuation<Unit> {
throw UnsupportedOperationException("create(Continuation) has not been overridden")
}
public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
}
//···
}
create()
是 BaseContinuationImpl
类中的一个公开办法。那么是谁完结了这个办法呢? 看看 SuspendLambda
与 BaseContinuationImpl
与 Continuation
之间的联系讲解。
5.SuspendLambda及其父类
上面说到 suspend{}
便是 (suspend R.() -> T)
,它是协程实在需求履行的逻辑,传入的lambda表达式被编译成了承继 SuspendLambda
的子类,SuspendLambda
是 Continuation
的完结类。
internal abstract class SuspendLambda(
public override val arity: Int,
completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
constructor(arity: Int) : this(arity, null)
public override fun toString(): String =
if (completion == null)
Reflection.renderLambdaToString(this) //这是 lambda
else
super.toString() //这是 continuation
}
而SuspendLambda
承继自 ContinuationImpl
:
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
) : BaseContinuationImpl(completion) {
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
protected override fun releaseIntercepted() { ··· }
}
ContinuationImpl
又承继自BaseContinuationImpl
,SuspendLambda
的 resume()
办法的具体完结为 BaseContinuationImpl
的 resumeWith()
办法:
internal abstract class BaseContinuationImpl(
//每个BaseContinuationImpl实例都会引证一个完结Continuation,用来在当时状况机流通完毕时康复这个Continuation
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
//resumeWith() 中经过循环由里到外康复Continuation
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!! //completion为null则会抛出反常
val outcome: Result<Any?> =
try {
// 1.调用 invokeSuspend 办法履行,履行协程的实在运算逻辑
val outcome = invokeSuspend(param)
// 2.假如现已挂起则提前完毕
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
//当invokeSuspend办法没有回来COROUTINE_SUSPENDED,那么当时状况机流通完毕,即当时suspend办法履行完毕,释放阻拦
releaseIntercepted()
if (completion is BaseContinuationImpl) {
//3.假如 completion 是 BaseContinuationImpl,内部还有suspend办法,则会进入循环递归
current = completion
param = outcome
} else {
//4.不然是最顶层的completion,则会调用resumeWith康复上一层而且return
// 这儿实践调用的是其父类 AbstractCoroutine 的 resumeWith 办法
completion.resumeWith(outcome)
return
}
}
}
}
//·····
}
这儿首要做了四件事:
- 调用
invokeSuspend
办法,履行协程的实在运算逻辑,并回来一个成果; - 假如
outcome
是COROUTINE_SUSPENDED
,代码块里边履行了挂起办法,则持续挂起; - 假如
completion
是BaseContinuationImpl
,内部还有suspend办法,则会进入循环递归,持续履行挂起; - 假如
completion
不是BaseContinuationImpl
,则实践调用父类AbstractCoroutine
的resumeWith
办法。
接下来再来看 AbstractCoroutine
的 resumeWith
完结:
public abstract class AbstractCoroutine<in T>(
protected val parentContext: CoroutineContext,
active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
//以指定的成果完结履行协程
public final override fun resumeWith(result: Result<T>) {
// 1. 获取当时协程的技术状况
val state = makeCompletingOnce(result.toState())
// 2. 假如当时还在等候完结,阐明还有子协程没有完毕
if (state === COMPLETING_WAITING_CHILDREN) return
// 3. 履行完毕康复的办法,默许为空
afterResume(state)
}
//···
}
其间一类 completion
是 BaseContinuationImpl
,每个实例就代表一个suspend办法状况机。resumeWith()
封装了协程的运算逻辑,用以协程的发动和康复;而另一类 completion
是 AbstractCoroutine
,首要是担任保护协程的状况和办理,它的resumeWith
则是完结协程,康复调用者协程。
其承继联系为: SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation
。
因此 create()
办法创立的 Continuation
是一个 SuspendLambda
目标。
回到上面的 createCoroutineUnintercepted()
办法:
//IntrinsicsJvm.kt
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {//回来Continuation<Unit>,它便是协程的载体
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
//假如调用者是 `BaseContinuationImpl` 或许其子类
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}
其实这段代码是在JVM平台中找到的,在 IntrinsicsJvm.kt
类中,可是在 Android 源码中是这姿态的:
fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<kotlin.Unit> { /* compiled code */ }
compiled code
就现已提示里边的代码是编译后的代码。
前面也说到了,在协程源码里边是看不到完整的协程原理的,有一部分代码是kotlin编译器处理的,所以在研讨协程的运转流程时,单单看kotlin的源码是看不出实质的,需求反编译成Java文件,看看反编译之后的代码被修正成什么姿态了。
6.Function
的创立
将协程模仿网络恳求的代码反编译:
fun getData() {
lifecycleScope.launch {
val result = requestUserInfo()
tvName.text = result
}
}
反编译后的代码如下(代码有删减),你会发现发生了巨大的改变,而这些作业都是kotlin编译器帮咱们完结的:
public final void getData() {
BuildersKt.launch$default((CoroutineScope)LifecycleOwnerKt.getLifecycleScope(this), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label; // 初始值为0
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
//···
Object var10000 = requestUserInfo(this); //履行挂起函数
//···
String result = (String)var10000;
CoroutinesActivity.this.getTvName().setText((CharSequence)result);
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkParameterIsNotNull(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 3, (Object)null);
}
lifecycleScope.launch {}
在反编译后增加了 CoroutineScope
,CoroutineContext
,CoroutineStart
,Function2
,3,Object
等参数,这些都是kotlin编译器帮咱们做了。这儿便是最顶层的completion处理协程挂起与康复的当地。 这儿一旦康复了,那么阐明整个协程康复了。
这儿创立了一个Function2
,里边有三个重要的办法:
-
invokeSuspend(result): 里边履行
luanch{}
里边的代码,所以它履行了协程实在的运转逻辑; -
create(value, completion):经过传递的
completion
参数创立一个Function2
并回来,实践是一个Continuation
; -
invoke(var1, var2): 复写了
Funtion.invoke()
办法,经过传递过来的参数链式调用create().invokeSuspend()
。
-
从上面知道
(suspend R.() -> T)
是BaseContinuationImpl
的完结类,所以会走onCreate()
办法创立Continuation
, 经过completion
参数创立一个新的Function2
,作为Continuation
回来,这便是创立出来的协程载体; -
然后调用
resumeWith()
发动协程,那么就会履行BaseContinuationImpl
的resumeWith()
办法,此刻就会履行invokeSuspend()
办法,履行协程实在的运转逻辑。
协程的创立流程如下:
三、 协程的挂起与康复
协程作业的中心便是它内部的状况机,invokeSuspend()
函数。 requestUserInfo()
办法是一个挂起函数,这儿经过反编译它来论述协程状况机的原理,逆向分析协程的挂起和康复。
1.办法的挂起
//延时2000毫秒,回来一个String成果
suspend fun requestUserInfo(): String {
delay(2000)
return "result form userInfo"
}
反编译后的代码如下(代码有删减),相同发现发生了巨大的改变,而这些作业都是kotlin编译器帮咱们完结的:
//1.函数回来值由String变成Object,入参也增加了Continuation参数
public final Object requestUserInfo(@NotNull Continuation completion) {
//2.经过completion创立一个ContinuationImpl,而且复写了invokeSuspend()
Object continuation = new ContinuationImpl(completion) {
Object result;
int label; //初始值为0
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return requestUserInfo(this);//又调用了requestUserInfo()办法
}
};
Object $result = (continuation).result;
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
//状况机
//3.办法被康复的时分又会走到这儿,第一次进入case 0分支,label的值从0变为1,第二次进入就会走case 1分支
switch(continuation.label) {
case 0:
ResultKt.throwOnFailure($result);
continuation.label = 1;
//4.delay()办法被suspend润饰,传入一个continuation回调,回来一个object成果。这个成果要么是`COROUTINE_SUSPENDED`,不然便是实在成果。
Object delay = DelayKt.delay(2000L, continuation)
if (delay == var4) {//假如是COROUTINE_SUSPENDED则直接return,就不会往下履行了,requestUserInfo()被暂停了。
return var4;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return "result form userInfo";
}
上面首要过程为:
- 函数回来值由
String
变成Object
,函数没有入参的编译后也增加了Continuation
参数。本来需求咱们做的callback
,现在编译器帮咱们完结了。 - 依据
completion
创立了一个ContinuationImpl
,复写了invokeSuspend()
办法,在这个办法里边它又调用了requestUserInfo()
办法,这儿又调用了一次自己(是不是很奇特),而且把continuation
传递进去。 - 在 switch 句子中,
label
的默许初始值为 0,第一次会进入case 0
分支,delay()
是一个挂起函数,传入上面的continuation
参数,会有一个Object
类型的回来值。这个成果要么是COROUTINE_SUSPENDED
,不然便是实在成果。 -
DelayKt.delay(2000, continuation)
的回来成果假如是COROUTINE_SUSPENDED
, 则直接 return ,那么办法履行就被完毕了,办法就被挂起了。
这便是挂起的实在原理。所以函数即使被 suspend
润饰了,可是也未必会挂起。需求里边的代码编译后有回来值为 COROUTINE_SUSPENDED
这样的符号位才干够,所以程序履行到 case 0
的时分就 return 了。那就意味着办法被暂停了,那么协程也被暂停了。所以说协成的挂起实践上是办法的挂起,办法的挂起实质是 return。
2.COROUTINE_SUSPENDED
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
//履行已暂停,而且不会当即回来任何成果
public val COROUTINE_SUSPENDED: Any get() = CoroutineSingletons.COROUTINE_SUSPENDED
internal enum class CoroutineSingletons { COROUTINE_SUSPENDED, UNDECIDED, RESUMED }
在 var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED()
中, COROUTINE_SUSPENDED
便是一个枚举常量,表明协程现已挂起,而且不会当即回来任何成果。那么 DelayKt.delay()
回来值是 COROUTINE_SUSPENDED
就被 return 了。
跟进 DelayKT
看看 COROUTINE_SUSPENDED
是如何被获取的:
找到 DelayKT
类(注意:不是Delay.kt
哈,别搞错了),Decomplie to java
反编译成java源码:
public final class DelayKt {
//增加了Object回来值,而且追加了一个Continuation参数,
@Nullable
public static final Object delay(long timeMillis, @NotNull Continuation $completion) {
if (timeMillis <= 0L) {
return Unit.INSTANCE;
} else {
//···
getDelay(cont.getContext()).scheduleResumeAfterDelay(timeMillis, cont);
Object var10000 = cancellable$iv.getResult();
//···
return var10000;
}
}
}
可以看到 DelayKt.delay()
增加了 Object
回来值,而且追加了一个 completion
参数,这个回来值是 var10000
,它是在 cancellable$iv.getResult()
得到的:
@PublishedApi
internal fun getResult(): Any? {
setupCancellation()
//测验挂起,假如回来TRUE则回来COROUTINE_SUSPENDED
if (trySuspend()) return COROUTINE_SUSPENDED
//···
return getSuccessfulResult(state)
}
trySuspend()
测验把办法挂起,假如回来 true 则回来 COROUTINE_SUSPENDED
:
private val _decision = atomic(UNDECIDED)
private fun trySuspend(): Boolean {
//循环遍历里边的值,而_decision初始值为UNDECIDED,那么第一次肯定回来true
_decision.loop { decision ->
when (decision) {
//回来true,而且把当时状况更改为SUSPENDED,代表它以经被挂起了
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
RESUMED -> return false
else -> error("Already suspended")
}
}
}
trySuspend()
里边循环遍历了 _decision
的值, _decision
初始值为 UNDECIDED
,那么第一次会进入UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
分支,这儿就回来true了,而且把当时状况更改为 SUSPENDED
,代表着它现已被挂起了。
//办法的状况只要在调用tryResume时才会把状况更改为RESUMED
private fun tryResume(): Boolean {
_decision.loop { decision ->
when (decision) {
//回来true,而且把状况改为RESUMED
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
SUSPENDED -> return false
else -> error("Already resumed")
}
}
}
这个办法的状况会在它康复的时分调用 tryResume()
把状况更改为 RESUMED
。这便是决议计划状况机:
那么 trySuspend()
回来 true
则 getResult()
回来了 COROUTINE_SUSPENDED
枚举常量,那么 DelayKt.delay()
就回来了 COROUTINE_SUSPENDED
,所以下面的判别条件就会满足,会直接return。delay()
办法是一个实在在正的挂起函数,可以导致协程被暂停。
所以 requestUserInfo()
办法在 delay(2000)
被暂停了,在协程中调用,那么协程也就暂停了,后边的成果 result form userInfo
也没有被回来。所以这便是被 suspend
润饰的函数不一定能导致协程被挂起,还需求里边的完结经过编译之后有回来值而且为 COROUTINE_SUSPENDED
才干够。
3.办法的康复
持续回到 requestUserInfo()
分析康复的原理:
@Nullable
public final Object requestUserInfo(@NotNull Continuation completion) {
//···
Object continuation = new ContinuationImpl(completion) {
Object result;
int label; //初始值为0
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return requestUserInfo(this);//又调用了requestUserInfo()办法
}
};
Object $result = (continuation).result;
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
//办法被康复的时分又会走到这儿,第一次进入case 0,第二次 ontinuation.label = 1,所以能持续履行下去回来成果
switch(continuation.label) {
case 0:
ResultKt.throwOnFailure($result);
continuation.label = 1;
//delay()办法被suspend润饰,传入一个continuation回调,回来一个object成果。这个成果要么是`COROUTINE_SUSPENDED`,不然便是实在成果。
Object delay = DelayKt.delay(2000L, continuation)
if (delay == var4) {//假如是COROUTINE_SUSPENDED则直接return,就不会往下履行了。
return var4;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return "result form userInfo";
}
- 由于
delay()
是io
操作,在2000毫米后就会经过传递给它的continuation
回调回来。 - 回调到
ContinuationImpl
这个类里边的resumeWith()
办法,会再次调用invokeSuspend()
办法,从而再次调用requestUserInfo()
办法。 - 它又会进入switch句子,由于第一次在
case 0
时把label = 1
赋值为1,所以这次会进入case 1
分支,而且回来了成果result form userInfo
。 - 而且
requestUserInfo()
的回来值作为invokeSuspend()
的回来值回来。从头被履行的时分就代表着办法被康复了。
那么 invokeSuspend()
办法是怎样被触发回调的呢?它拿到回来值有什么用呢?
上面说到 ContinuationImpl
承继自 BaseContinuationImpl
,而它又完结了 continuation
接口而且复写了 resumeWith()
办法,里边就调用了 val outcome = invokeSuspend(param)
办法。(源码有删减)
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
//这个完结是终究的,用于展开 resumeWith 递归。
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
// 1.调用 invokeSuspend()办法履行,履行协程的实在运算逻辑,拿到回来值
val outcome = invokeSuspend(param)
// 2.假如回来的还是COROUTINE_SUSPENDED则提前完毕
if (outcome == COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
if (completion is BaseContinuationImpl) {
//3.假如 completion 是 BaseContinuationImpl,内部还有suspend办法,则会进入循环递归,持续履行和康复
current = completion
param = outcome
} else {
//4.不然是最顶层的completion,则会调用resumeWith康复上一层而且return
// 这儿实践调用的是其父类 AbstractCoroutine 的 resumeWith 办法
completion.resumeWith(outcome)
return
}
}
}
}
实践上任何一个挂起函数它在康复的时分都会调到 BaseContinuationImpl
的 resumeWith()
办法里边。
-
一但
invokeSuspend()
办法被履行,那么requestUserInfo()
又会再次被调用,invokeSuspend()
就会拿到requestUserInfo()
的回来值,在ContinuationImpl
里边依据val outcome = invokeSuspend()
的回来值来判别咱们的requestUserInfo()
办法康复了之后的操作。 -
假如
outcome
是COROUTINE_SUSPENDED
常量,阐明你即使被康复了,履行了一下,if (outcome == COROUTINE_SUSPENDED) return
可是立马又被挂起了,所以又 return 了。 -
假如本次康复
outcome
是一个正常的成果,就会走到completion.resumeWith(outcome)
,当时被挂起的办法现已被履行完了,实践调用的是其父类AbstractCoroutine
的resumeWith
办法,那么协程就康复了。
咱们知道 requestUserInfo()
肯定是会被协程调用的(从上面反编译代码知道会传递一个Continuation completion
参数),requestUserInfo()
办法康复完了就会让协程completion.resumeWith()
去康复,所以说协程的康复实质上是办法的康复。
这是在android studio傍边经过反编译kotlin源码来分析协程挂起与康复的流程。流程图如下:
4.在协程中运转的挂起与康复
那么 requestUserInfo()
办法在协程里边履行的整个挂起和康复流程是怎样样的呢?
fun getData() {
lifecycleScope.launch {
val result = requestUserInfo()
tvName.text = result
}
}
//模仿恳求,2秒后回来数据
suspend fun requestUserInfo(): String {
delay(2000)
return "result form userInfo"
}
反编译代码:
public final void getData() {
BuildersKt.launch$default((CoroutineScope)LifecycleOwnerKt.getLifecycleScope(this), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label; // 初始值为0
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); //挂起状况
Object var10000;
//状况机
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
this.label = 1; //修正label
var10000 = requestUserInfo(this); //履行挂起函数
if (var10000 == var3) {
return var3; //假如var10000是COROUTINE_SUSPENDED则直接挂起协程
}
break;
case 1:
ResultKt.throwOnFailure($result);
var10000 = $result; //回来实践成果
break;
//···
}
String result = (String)var10000;
CoroutinesActivity.this.getTvName().setText((CharSequence)result);
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkParameterIsNotNull(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 3, (Object)null);
}
@Nullable
public final Object requestUserInfo(@NotNull Continuation completion) {
//···
Object continuation = new ContinuationImpl(completion) {
Object result;
int label; //初始值为0
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return requestUserInfo(this); //又调用了requestUserInfo()办法
}
};
Object $result = (continuation).result;
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
//状况机
//办法被康复的时分又会走到这儿,第一次进入case 0,第二次 ontinuation.label = 1,所以就打印了日志输出
switch(label) {
case 0:
ResultKt.throwOnFailure($result);
continuation.label = 1;
//delay()办法被suspend润饰,传入一个continuation回调,回来一个object成果。这个成果要么是`COROUTINE_SUSPENDED`,不然便是实在成果。
Object delay = DelayKt.delay(2000L, continuation)
if (delay == var4) {//假如是COROUTINE_SUSPENDED则直接return,就不会往下履行了,requestUserInfo()被暂停了。
return var4;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return "result form userInfo";
}
- 可以看到协程里边反编译后的代码和
requestUserInfo()
办法反编译后的代码相似,Function2
里边也复写了invokeSuspend()
办法,状况机也相似, - 在
case 0
处判别requestUserInfo()
回来值是否为COROUTINE_SUSPENDED
, 假如是则挂起协程。咱们在上面分析知道,requestUserInfo()
第一次回来的值是COROUTINE_SUSPENDED
,所以requestUserInfo()
被挂起了,协程也被挂起了。所以说协程的挂起实践上是办法的挂起。 - 协程康复的原理也和
requestUserInfo()
康复的原理大致相同。在调用requestUserInfo(this)
的时分把Continuation
传递了进去。 - 那么
requestUserInfo()
函数2000毫秒后在康复时将成果经过invokeSuspend()
回调给上一层completion
的resumeWith()
里边,那么协程的invokeSuspend(result)
便是被回调。 - 经过状况机流通履行之前挂起逻辑之后的代码。此刻
lable = 1
进入case 1
赋值给var10000
,然后履行剩余的代码。 所以requestUserInfo()
办法康复后,调用它的协程也跟着康复了,所以说协程的康复实质上是办法的康复。
四、协程的调度
1.协程阻拦
协程的线程调度是经过阻拦器完结的,回到前面的 startCoroutineCancellable
:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R,
completion: Continuation<T>) =
runSafely(completion) {
// 创立一个没有被阻拦的 Coroutine
createCoroutineUnintercepted(receiver, completion)
// 增加阻拦器
.intercepted()
// 履行协程
.resumeCancellableWith(Result.success(Unit))
}
看看 intercepted()
的具体完结:
//运用[ContinuationInterceptor]阻拦Continuation
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
阻拦器在每次(康复)履行协程体的时分都会阻拦协程本体SuspendLambda
。interceptContinuation()
办法中阻拦了一个Continuation<T>
而且再回来一个Continuation<T>
,阻拦到Continuation
后就可以做一些作业,比如线程切换等。
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
@Transient
private var intercepted: Continuation<Any?>? = null // 阻拦到的Continuation
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
//······
}
而 interceptContinuation()
办法的完结是在 CoroutineDispatcher
中,它是一切协程调度程序完结扩展的基类:
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
//是否需求调度
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
//将可运转块的履行分派到给定上下文中的另一个线程上
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
//经过DispatchedContinuation回来包装原始continuation的 continuation
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
}
注意:[block]
是一个 Runnable
类型。
假如传递了协程调度器,那么协程中的闭包代码块就决议了所运转的线程环境,CoroutineDispatcher
有三个重要的办法:
- isDispatchNeeded():协程的发动需不需求分发到其他线程上面去。
- dispatch(): 将可运转块的履行分派到给定上下文中的另一个线程上,由子类去完结具体的调度。
-
interceptContinuation:阻拦协程本体,包装成一个
DispatchedContinuation
。
阻拦协程本体,包装成一个 DispatchedContinuation
,它在履行使命的时分会经过 needDispatch()
来判别本次协程发动需不需求分发到其他线程上面,假如回来了true,那么就会调用子类的 dispatch(runnable)
办法,来完结协程的本次发动作业,假如回来false,就会由 CoroutineDispatcher
在当时线程马上履行。
2.协程分发
在截获的 Continuation
上调用resume(Unit)
保证协程和完结的履行都发生在由 ContinuationInterceptor
建立的调用上下文中。而阻拦后的 continuation
被 DispatchedContinuation
包装了一层:(这是Continuation三层包装的第三层包装)
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
//·····
override val delegate: Continuation<T>
get() = this
inline fun resumeCancellable(value: T) {
// 是否需求线程调度
if (dispatcher.isDispatchNeeded(context)) {
// 将协程的运算分发到另一个线程
dispatcher.dispatch(context, this)
} else {
// 假如不需求调度,当即康复在在当时线程履行协程运算
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
}
}
override fun resumeWith(result: Result<T>) {
// 是否需求线程调度
if (dispatcher.isDispatchNeeded(context)) {
// 将协程的运算分发到另一个线程
dispatcher.dispatch(context, this)
} else {
// 假如不需求调度,当即康复在当时线程履行协程运算
continuation.resumeWith(result)
}
}
//·····
}
DispatchedContinuation
阻拦了协程的发动和康复,分别是resumeCancellable(Unit)
和重写的resumeWith(Result)
。
当需求分发时,就调用 dispatcher
的 dispatch(context, this)
办法,this是一个 DispatchedTask
:
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
public final override fun run() {
//·····
withContinuationContext(continuation, delegate.countOrElement) {
//康复协程履行 终究调用resumeWith
continuation.resume(getSuccessfulResult(state))
}
//·····
}
}
DispatchedTask
实践上是一个Runnable
。
- 当需求线程调度时,则在调度后会调用
DispatchedContinuation.continuation.resumeWith()
来发动协程,其间continuation
是SuspendLambda
实例; - 当不需求线程调度时,则直接调用
continuation.resumeWith()
来直接发动协程。
也便是说对创立的 Continuation
的 resumeWith()
增加阻拦操作,阻拦协程的运转操作:
分别分析一下四种调度形式的具体完结:
3.Dispatchers.Unconfined
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
internal object Unconfined : CoroutineDispatcher() {
//回来false, 不阻拦协程
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
override fun dispatch(context: CoroutineContext, block: Runnable) {
//····
}
}
Dispatchers.Unconfined
:对应的是Unconfined
,它里边的isDispatchNeeded()
回来的是false,不限于任何特定线程的协程调度程序。那么它的父类ContinuationInterceptor
就不会把本次使命的调度交给子类来履行,而是由父类在当时线程马上履行。
4.Dispatchers.Main
Dispatchers.Main
承继自 MainCoroutineDispatcher
经过 MainDispatcherLoader.dispatcher
完结调度器:
public actual object Dispatchers {
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
}
MainDispatcherLoader
经过工厂形式创立 MainCoroutineDispatcher
:
internal object MainDispatcherLoader {
@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
private fun loadMainDispatcher(): MainCoroutineDispatcher {
//···
val factories = FastServiceLoader.loadMainDispatcherFactory()
//···
return factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories)
}
}
public fun tryCreateDispatcher(factories: List<MainDispatcherFactory>): MainCoroutineDispatcher =
try {
createDispatcher(factories)
} catch (cause: Throwable) {
//假如出现反常则创立一个MissingDispatcher
createMissingDispatcher(cause, hintOnError())
}
MainDispatcherFactory
是一个接口,经过完结类来创立Dispatcher
:
public interface MainDispatcherFactory {
//子类完结
public fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher
}
internal class AndroidDispatcherFactory : MainDispatcherFactory {
//由AndroidDispatcherFactory创立HandlerContext,可以看到它是一个主线程调度器
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")
//···
}
咱们看到了AndroidDispatcherFactory
, Looper.getMainLooper()
,Main
等关键字,毫无疑问这便是主线程调度器:
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
//···
override val immediate: HandlerContext = _immediate ?:
HandlerContext(handler, name, true).also { _immediate = it }
//invokeImmediately默许为false, Looper.myLooper() != handler.looper判别当时线程looper
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
//···
}
它们三者的承继联系:
HandlerContext
->HandlerDispatcher
->MainCoroutineDispatcher()
->CoroutineDispatcher
。
它里边的isDispatchNeeded()
回来的是true,当协程发动的时分则由HandlerContext
来分发,而它里边的分发作业是经过 handler.post(runnable)
分发给主线程来完结的。在康复的时分也是经过Dispatchers.Main
这个调度器来康复。当完结使命之后就会经过HandlerDispatcher
把协程中的代码再次切换到主线程。
5.Dispatchers.IO
public actual object Dispatchers {
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO
}
DefaultScheduler
协程调度器的默许调度器,是一个线程调度器,履行堵塞使命,此调度程序与Dispatcher.Default
调度程序同享线程。
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
val IO: CoroutineDispatcher = LimitingDispatcher(
this,
systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
"Dispatchers.IO",
TASK_PROBABLY_BLOCKING
)
//···
}
private class LimitingDispatcher(
private val dispatcher: ExperimentalCoroutineDispatcher,
//···
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {
override val executor: Executor
get() = this
override fun execute(command: Runnable) = dispatch(command, false)
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
private fun dispatch(block: Runnable, tailDispatch: Boolean) {
var taskToSchedule = block
while (true) {
//没有超过限制,当即分发使命
if (inFlight <= parallelism) {
dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
return
}
//使命超过限制,则加入等候行列
queue.add(taskToSchedule)
//···
}
}
}
dispatcher.dispatchWithContext()
当即分发使命由ExperimentalCoroutineDispatcher
完结:
public open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int,
private val maxPoolSize: Int,
//···
) : ExecutorCoroutineDispatcher() {
override val executor: Executor
get() = coroutineScheduler
private var coroutineScheduler = createScheduler()
//分发
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
//···
internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
coroutineScheduler.dispatch(block, context, tailDispatch)
}
//创立调度器
private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
//···
}
ExperimentalCoroutineDispatcher
将使命分发到coroutineScheduler.dispatch()
完结。
CoroutineScheduler
便是一个线程池Executor。
//协程调度器的首要方针是在作业线程上分配调度的协程,包含 CPU 密集型使命和堵塞使命。
internal class CoroutineScheduler(
val corePoolSize: Int,
val maxPoolSize: Int,
//···
) : Executor, Closeable {
override fun execute(command: Runnable) = dispatch(command)
//调度可运转块的履行,并提示调度程序该块是否可以履行堵塞操作(IO、体系调用、确定原语等)
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
//1.构建Task,Task完结了Runnable接口
val task = createTask(block, taskContext)
//2.取当时线程转为Worker目标,Worker是一个承继自Thread的类,循环履行使命
val currentWorker = currentWorker()
//3.增加使命到本地行列
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
//4.notAdded不为null,则再将notAdded(Task)增加到大局行列中
if (!addToGlobalQueue(notAdded)) {
throw RejectedExecutionException("$schedulerName was terminated")
}
}
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
//5.创立Worker并开端履行该线程
signalCpuWork()
} else {
// Increment blocking tasks anyway
signalBlockingWork(skipUnpark = skipUnpark)
}
}
}
上面代码首要做了以下几件事:
- 首先是经过
Runnable
构建了一个Task
,这个Task
其实也是完结了Runnable
接口; - 将当时线程取出来转换成
Worker
,这个Worker
是承继自Thread
的一个类; - 将
task
提交到本地行列中; - 假如
task
提交到本地行列的过程中没有成功,那么会增加到大局行列中; - 创立
Worker
线程,并开端履行使命。
class Worker private constructor() : Thread() {
//woeker行列
val localQueue: WorkQueue = WorkQueue()
override fun run() = runWorker()
private fun runWorker() {
var rescanned = false
while (!isTerminated && state != WorkerState.TERMINATED) {
val task = findTask(mayHaveLocalTasks)
//找到使命,履行使命,重复循环
if (task != null) {
executeTask(task)
continue
} else {
mayHaveLocalTasks = false
}
}
}
private fun executeTask(task: Task) {
val taskMode = task.mode
idleReset(taskMode)
beforeTask(taskMode)
runSafely(task)
afterTask(taskMode)
}
}
//履行使命
fun runSafely(task: Task) {
try {
task.run()
}
//···
}
run办法直接调用的runWorker()
,在里边是一个while循环,不断从行列中取Task
来履行,调用task.run()
。
- 从本地行列或许大局行列中取出
Task
。 - 履行这个task,终究其实便是调用这个
Runnable
的run办法。
也便是说,在Worker
这个线程中,履行了这个Runnable
的run办法。还记得这个Runnable
是谁么?它便是上面咱们看过的DispatchedTask
,这儿的run办法履行的便是协程使命,那这块具体的run办法的完结逻辑,咱们应该到DispatchedTask
中去找。
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
public final override fun run() {
//·····
withContinuationContext(continuation, delegate.countOrElement) {
//康复协程履行,终究调用resumeWith
continuation.resume(getSuccessfulResult(state))
}
//·····
}
}
run办法履行continuation.resume
康复协程履行。最终经过executor.execute()
发动线程池。
internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {
override fun dispatch(context: CoroutineContext, block: Runnable) {
try {
executor.execute(wrapTask(block))
} catch (e: RejectedExecutionException) {
unTrackTask()
DefaultExecutor.enqueue(block)
}
}
}
6.Dispatchers.Default
假如不指定调度器,则会默许 DefaultScheduler
,它实践和Dispatchers.IO
是同一个线程调度器,这个是线程调度器:
public actual object Dispatchers {
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
}
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
if (useCoroutinesScheduler) DefaultScheduler else CommonPool
假如指定了调度器则运用 CommonPool
,表明同享线程的公共池作为核算密集型使命的协程调度程序。
internal object CommonPool : ExecutorCoroutineDispatcher() {
override val executor: Executor
get() = pool ?: getOrCreatePoolSync()
//履行使命
override fun dispatch(context: CoroutineContext, block: Runnable) {
(pool ?: getOrCreatePoolSync()).execute(wrapTask(block))
}
//创立一个固定大小的线程池
private fun createPlainPool(): ExecutorService {
val threadId = AtomicInteger()
return Executors.newFixedThreadPool(parallelism) {
Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
}
}
CommonPool
中也是创立了一个固定大小的线程池,dispatch()
经过execute()
履行协程使命。
总结如下:
类型 | 调度器完结类 | 阐明 |
---|---|---|
Dispatchers.Main | HandlerContext | 它里边的isDispatchNeeded() 回来的是true,当协程发动的时分则由HandlerDispatcher来分发,而它里边的分发作业是经过 handler.post(runnable) 来完结的。 |
Dispatchers.IO | DefaultScheduler | 它是线程调度器,它里边的isDispatchNeeded() 回来的是true,而它调度使命的时分是经过 executors.execute(runnable) 来履行runnable使命。也便是把协程中的代码块运转到IO线程。 |
Dispatchers.Default | DefaultScheduler,CommonPool | 假如不指定调度器,则会默许DefaultScheduler,它实践和Dispatchers.IO 是同一个线程调度器;假如指定调度器,则是CommonPool同享线程池。isDispatchNeeded()都是true,经过 executors.execute(runnable) 来履行runnable使命。 |
Dispatchers.Unconfined | Unconfined | 它里边的isDispatchNeeded() 回来的是false,那么它的父类ContinuationInterceptor 就不会把本次使命的调度交给子类来履行,而是由父类在当时线程马上履行。 |
五、总结
1.协程的三层包装
经过一步步的分析,慢慢发现协程其实有三层包装:
-
常用的
launch
和async
回来的Job
、Deferred
,里边封装了协程状况,供给了撤销协程接口,而它们的实例都是承继自AbstractCoroutine
,它是协程的第一层包装。 -
第二层包装是编译器生成的
SuspendLambda
的子类,封装了协程的实在运算逻辑,承继自BaseContinuationImpl
,包含了第一层包装,其间completion
便是协程的第一层包装。 -
第三层包装是协程的线程调度时的
DispatchedContinuation
,封装了线程调度逻辑,包含了协程的第二层包装。
三层包装都完结了Continuation
接口,经过署理形式将协程的各层包装组合在一同,每层担任不同的功用。
2.协程的挂起与康复原理
-
在研讨协程原理时需求反编译成Java文件,才干看到实质之处。由于有一部分代码是kotlin编译器生成的,在协程源码里是看不出来的。
-
每个挂起点对应于一个case分支(状况机),每调用一次
label
加1;label
的默许初始值为 0,第一次会进入case 0
分支,挂起函数在回来COROUTINE_SUSPENDED
时直接 return ,那么办法履行就被完毕了,办法就被挂起了。 -
协程体内的代码都是经过
continuation.resumeWith()
调用;获取到实在成果后,回调到ContinuationImpl
这个类里边的resumeWith()
办法,会再次调用invokeSuspend(result)
办法,进入状况机case分支,回来实在成果,办法康复后,接着康复协程。 -
所以说,协程的挂起实质上是办法的挂起,而办法的挂起实质上是
return
,协程的康复实质上办法的康复,而康复的实质是callback
回调。
3.协程的调度原理
- 阻拦器在每次(康复)履行协程体的时分都会阻拦协程本体
SuspendLambda
,然后会经过协程分发器的interceptContinuation()
办法阻拦了一个Continuation<T>
而且再回来一个Continuation<T>
。 - 把阻拦的代码块封装为使命
DispatchedContinuation
,会经过CoroutineDispatcher
的needDispatch()
来判别需不需求分发,由子类的dispatch(runnable)
办法来完结协程的本次调度作业。
4.协程面试常见问题
- 面试官:什么是协程?
协程是一种解决方案,是一种解决嵌套,并发,弱化线程概念的方案。能让多个使命之间更好协作,可以以同步的办法完结异步作业,将异步代码像同步代码相同直观。
- 面试官:协程与线程有什么区别?
协程就像轻量级的线程,协程是依赖于线程,一个线程中可以创立多个协程。协程挂起时不会堵塞线程。线程进程都是同步机制,而协程则是异步。
- 面试官:协程的调度原理
依据创立协程指定调度器HandlerContext
,DefaultScheduler
,UnconfinedDispatcher
来履行使命,以解决协程中的代码运转在那个线程上。HandlerContext
经过handler.post(runnable)
分发到主线程,DefaultScheduler
实质是经过excutor.excute(runnable)
分发到IO线程。
- 面试官:协程是线程结构吗?
协程的实质是编译时return+callback,只不过在调度使命时供给了可以运转在IO线程的调度器和主线程的调度器。把协程称为线程结构不行精确。
- 面试官:什么时分运用协程?
多使命并发流程操控场景,流程操控比较简单,不会涉及线程堵塞和唤醒,性能比Java并发操控手段高。
点重视,不迷路
好了各位,以上便是这篇文章的全部内容了,很感谢您阅览这篇文章。我是suming,感谢支撑和认可,您的点赞便是我创作的最大动力。山水有相逢,咱们下篇文章见!
本人水平有限,文章不免会有错误,请批评指正,不胜感激 !
Kotlin协程学习三部曲:
- 《Kotlin 协程实战进阶(一、筑基篇)》
- 《Kotlin 协程实战进阶(二、进阶篇)》
- 《Kotlin 协程实战进阶(三、原理篇)》
参阅链接:
- Kotlin官网
- 《深化了解Kotlin协程》
- 慕课网之《新版Kotlin从入门到精通》
- 慕课网之《大白话分析Kotlin协程机制》
- Kotlin Coroutines(协程) 彻底解析(二),深化了解协程的挂起、康复与调度》