概述
接下来我会环绕一个问题讨论一下 withTimeout 的完结,这个问题便是:为什么 Thread.sleep 在 withTimeout 中无效?
1. 协程的撤销协作机制
在 Kotlin 的官方文档中,说到了协程的撤销是协作完结的,这也是为什么协程叫协程,协程的代码有必要进行协作才干被撤销,而 withTimeout 也是经过这套撤销协作机制完结的,比方下面的比如中,在打印第四个数字的时分,就在履行超时后抛出了反常。
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
上面这段代码的履行成果如下:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms
withTimeout
抛出的 TimeoutCancellationException
是 CancellationException
的子类,因为撤销仅仅一个反常,一切的资源都能够按照通常的方式封闭。假如需求在任何类型的超时发生时履行某些额外的操作,则能够将代码包装在 try{...} catch(){...}
块中。或许能够运用 withTimeoutOrNull
函数,该函数类似于 withTimeout
,但在超不时回来 null 而不是抛出反常:
val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
"Done" // will get cancelled before it produces this result
}
println("Result is $result")
测验代码 1 :withTimeout() 与 delay()
class ExampleUnitTest {
@Test
fun testWithTimeoutAndDelay() {
val latch = CountDownLatch(1)
GlobalScope.launch {
kotlin.runCatching {
// 20 秒内没有履行完的话就抛出超时撤销反常
withTimeout(20 * 1000) {
// 推迟 30 秒
delay(30 * 1000)
println("after delay")
latch.countDown()
}
}.onFailure {
println("failed: $it")
}
}
latch.await()
}
}
上面这段代码对应的 Java 代码如下:
public final class ExampleUnitTest {
@Test
public final void testWithTimeoutAndDelay() {
final Ref.ObjectRef latch = new Ref.ObjectRef();
latch.element = new CountDownLatch(1);
// launch 代码块
BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getUnconfined(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Throwable var3;
Object var12;
Throwable var10000;
label42: {
Result.Companion var13;
label41: {
Object var8 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
boolean var2;
boolean var10001;
switch (this.label) {
case 0:
// 假如 result 是 Result.Failure ,则抛出 Failure 的反常值
ResultKt.throwOnFailure($result);
Object var14;
try {
var13 = Result.Companion;
var2 = false;
// 创立并履行 withTimeout 代码块
Function2 var15 = (Function2)(new ExampleUnitTest$testWithTimeoutAndDelay$1$invokeSuspend$$inlined$runCatching$lambda$1((Continuation)null, this));
this.label = 1;
var14 = TimeoutKt.withTimeout(20000L, var15, this);
} catch (Throwable var10) {
var10000 = var10;
var10001 = false;
break label41;
}
if (var14 == var8) {
return var8;
}
break;
case 1:
var2 = false;
try {
ResultKt.throwOnFailure($result);
break;
} catch (Throwable var11) {
var10000 = var11;
var10001 = false;
break label41;
}
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
try {
var12 = Result.constructor-impl(Unit.INSTANCE);
break label42;
} catch (Throwable var9) {
var10000 = var9;
var10001 = false;
}
}
var3 = var10000;
var13 = Result.Companion;
var12 = Result.constructor-impl(ResultKt.createFailure(var3));
}
var10000 = Result.exceptionOrNull-impl(var12);
if (var10000 != null) {
var3 = var10000;
int var6 = false;
String var7 = "failed: " + var3;
System.out.println(var7);
}
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 2, (Object)null);
((CountDownLatch)latch.element).await();
}
}
// ExampleUnitTest$testWithTimeoutAndDelay$1$invokeSuspend$$inlined$runCatching$lambda$1.java
// ...
@DebugMetadata(
f = "ExampleUnitTest.kt",
l = {24},
i = {},
s = {},
n = {},
m = "invokeSuspend",
c = "com.example.myapplication.ExampleUnitTest$testWithTimeoutAndDelay$1$1$1"
)
// withTimeout 代码块
final class ExampleUnitTest$testWithTimeoutAndDelay$1$invokeSuspend$$inlined$runCatching$lambda$1 extends SuspendLambda implements Function2 {
// 状况机
int label;
// $FF: synthetic field
final <undefinedtype> this$0;
ExampleUnitTest$testWithTimeoutAndDelay$1$invokeSuspend$$inlined$runCatching$lambda$1(Continuation var1, Object var2) {
super(2, var1);
this.this$0 = var2;
}
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
// 假如 result 是 Result.Failure ,则抛出 Failure 的反常值
ResultKt.throwOnFailure($result);
// 迁移到下一个状况,下一次履行走 case 1
this.label = 1;
if (DelayKt.delay(30000L, this) == var2) {
return var2;
}
break;
case 1:
// 假如 result 是 Result.Failure ,则抛出 Failure 的反常值
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
String var4 = "after delay";
System.out.println(var4);
((CountDownLatch)this.this$0.$latch.element).countDown();
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
ExampleUnitTest$testWithTimeoutAndDelay$1$invokeSuspend$$inlined$runCatching$lambda$1 var3 = new ExampleUnitTest$testWithTimeoutAndDelay$1$invokeSuspend$$inlined$runCatching$lambda$1(completion, this.this$0);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((ExampleUnitTest$testWithTimeoutAndDelay$1$invokeSuspend$$inlined$runCatching$lambda$1)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}
在上面这段代码中,详细会抛出超时撤销反常的当地就在 ...$inlined$runCatching$lambda$1
匿名内部类的 invokeSuspend
办法的第二个 case 句子,能够看到,每一个续体(SuspendLambda)的状况机的第一行都是 ResultKt.throwOnFailure($result)
,这行代码会查看接收到的上一个续体的履行成果是否为 Failure
,假如是的话,则抛出反常,下面再来看下 Thread.sleep()
的 Java 代码的差异。
测验代码 2 :withTimeout() 与 Thread.sleep()
class ExampleUnitTe {
@Test
fun testWithTimeoutAndThreadSleep() {
val latch = CountDownLatch(1)
GlobalScope.launch(Dispatchers.Unconfined) {
kotlin.runCatching {
// 两秒钟后没有履行完结则抛出反常
withTimeout(2 * 1000) {
// 让当时线程进入堵塞状况
Thread.sleep(4 * 1000)
println("after sleep")
}
}.onFailure {
println("failed: $it")
latch.countDown()
}
}
latch.await()
println("end")
}
}
上面这段代码对应的 Java 代码如下。
public final class ExampleUnitTest2 {
@Test
public final void testWithTimeoutAndThreadSleep() {
final CountDownLatch latch = new CountDownLatch(1);
BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getUnconfined(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Throwable var3;
Object var12;
Throwable var10000;
label42: {
Result.Companion var13;
label41: {
Object var8 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
boolean var2;
boolean var10001;
switch (this.label) {
case 0:
ResultKt.throwOnFailure($result);
Object var14;
try {
var13 = Result.Companion;
var2 = false;
// 创立并履行 withTimeout 代码块
Function2 var15 = (Function2)(new ExampleUnitTest2$testWithTimeoutAndThreadSleep$1$1$1((Continuation)null));
this.label = 1;
var14 = TimeoutKt.withTimeout(2000L, var15, this);
} catch (Throwable var10) {
var10000 = var10;
var10001 = false;
break label41;
}
if (var14 == var8) {
return var8;
}
break;
case 1:
var2 = false;
try {
ResultKt.throwOnFailure($result);
break;
} catch (Throwable var11) {
var10000 = var11;
var10001 = false;
break label41;
}
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
try {
var12 = Result.constructor-impl(Unit.INSTANCE);
break label42;
} catch (Throwable var9) {
var10000 = var9;
var10001 = false;
}
}
var3 = var10000;
var13 = Result.Companion;
var12 = Result.constructor-impl(ResultKt.createFailure(var3));
}
var10000 = Result.exceptionOrNull-impl(var12);
if (var10000 != null) {
var3 = var10000;
int var6 = false;
String var7 = "failed: " + var3;
System.out.println(var7);
latch.countDown();
}
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 2, (Object)null);
latch.await();
String var2 = "end";
System.out.println(var2);
}
}
// ExampleUnitTest2$testWithTimeoutAndThreadSleep$1$1$1.java
// ...
@Metadata(
mv = {1, 8, 0},
k = 3,
d1 = {"\u0000\u000e\n\u0000\n\u0002\u0010\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\u0010\u0000\u001a\u00020\u0001*\u00020\u0002H\u008a@¢\u0006\u0004\b\u0003\u0010\u0004"},
d2 = {"<anonymous>", "", "Lkotlinx/coroutines/CoroutineScope;", "invoke", "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;"}
)
final class ExampleUnitTest2$testWithTimeoutAndThreadSleep$1$1$1 extends SuspendLambda implements Function2 {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object var1) {
Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure(var1);
Thread.sleep(4000L);
String var2 = "after sleep";
System.out.println(var2);
return Unit.INSTANCE;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}
ExampleUnitTest2$testWithTimeoutAndThreadSleep$1$1$1(Continuation var1) {
super(2, var1);
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
ExampleUnitTest2$testWithTimeoutAndThreadSleep$1$1$1 var3 = new ExampleUnitTest2$testWithTimeoutAndThreadSleep$1$1$1(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((ExampleUnitTest2$testWithTimeoutAndThreadSleep$1$1$1)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}
上面这段代码,因为没有调用 delay ,匿名内部类ExampleUnitTest2$testWithTimeoutAndThreadSleep$1$1$1
的状况机只要一种状况,而第一次履行第一个 case 句子的时分,还没有超时,后边也没有下一个状况持续查看续体(SuspendLambda)接收到的上一个续体(Continuation)的履行成果了。
代码对比:
使命提交与反常抛出流程
超时撤销反常抛出的流程大致如上,launch
代码块会创立独立协程 StandaloneCoroutine
,再由独立协程履行 withTimeout
代码块。withTimeout
会创立超时撤销协程 TimeoutCoroutine
,并经过事情轮询器 DefaultExecutor
创立并提交延时履行使命 DelayedRunanbleTask
,最终经过 delay
办法创立可撤销续体 CancellableContinuationImpl
和延时康复使命 DelayedResumeTask
,并把延时康复使命提交到事情轮询器。在 CancellableCoroutineImpl 初始化的时分,会把自己设为父使命(超时协程)的作业节点,这个点很要害,因为只要设为作业节点后,父使命被撤销或许完结的时分,它才干收到告诉,也便是 withTimeout 是给其他的续体兜底的,假如没有其他的协程或续体在履行的话,那它的超时就不会起效果。
因为协程的这套协作机制,我们在运用协程的过程中,就尽量不要混着线程池、 RxJava 或其他异步结构和或响应式结构来运用协程,而是尽量运用协程 + Flow ,这样才干让协程之间更好地协作。
在事情轮询器中,有一个专门用来轮询使命的线程,而且维护了一个延时使命行列和一个一般使命行列,当到了推迟的时刻后,没有使命的时分,担任轮询使命的线程就会被堵塞,直到下一个延时使命的时刻到了后,就会被唤醒,这时履行 延时使命,因为在前面的第一个单测比如中,delay
的时刻比 withTimeout
要长,在还没有履行完 delay
的时分,withTimeout
创立的延时运转使命
会先履行,它履行时会调用超时撤销协程的 run() 办法,超时撤销协程就会经过 cancelCoroutine()
办法把 TimeoutCancellationException
作为成果告诉给作业节点,也便是 delay 创立的可撤销续体
,然后可撤销续体就会康复 withTimeout
对应的 SuspendLambda 的履行,也便是走到了反编译后的匿名内部类的第二个状况,查看发现有反常后,就会抛出反常。
而前面提到的第二个调用了 Thread.sleep
的单测中,因为没有履行 delay
,也就没有创立可撤销续体,超时撤销协程运转时,并没有其他的作业节点需求告诉,所以 withTimeout 代码块不会再次履行,也没有第二个状况,所以在 withTimeout 中履行 Thread.sleep 就不会抛出超时撤销反常。
如上图所示,在只调用了 Thread.sleep
时,只要 launch
和 withTimeout
两个续体(一个协程一起是一个完结了 Continuation
接口的类),有 delay
时,假如没有超时,则由 delay 来告诉事情轮询器
,让事情轮询器康复 withTimeout 代码块的履行,事情轮询器能够看成是协程结构自定义的 Looper 。假如超时了还没有履行完 withTimeout 代码块的代码,则由事情轮询器来告诉 DelayedRunnableTask
,让它经过超时协程来康复 withTimeout 的第二个状况的履行,一起履行成果为 Failure
。
(续体是一个协程的康复点,当协程代码块因为调用了挂起函数被分为多个状况后,就需求续体来康复一个个状况的履行)。
超时协程的作业节点
更详细一点来说,前面提到的两个单测比如创立的作业节点主要是 DisposeOnCompletion
和 CancellableContinuatonImpl
。
第二个单测中,只调用了 Thread.sleep
,没有调用 delay
,那超时协程就只要一个作业节点 DisposeOnCompletion
,这个作业结点是 withTimeout 办法中创立的。DisposeOnCompletion
持有了延时运转使命 DelayedRunnableTask
作为句柄,当它收到完结协程履行完结回调的时分,就会把延时运转使命
的状况改为 REMOVED_TASK
,这样延时使命就无法再被调度。
假如调用了 delay
,delay
会经过 suspendCancellableCoroutine
办法创立可撤销续体,可撤销续体在初始化可撤销性的时分,会创立子续体 ChildContinuation
,并把子续体
作为超时撤销协程
的作业节点,这样超时协程在超时撤销的时分,就会告诉给可撤销续体 ,把反常传给 withTimeout 代码块
对应的 SuspendLambda
续体,然后抛出反常。
下面来看下部分源码的解析,因为有些源码在我上一篇文章中有解析过,所以这里就不再重复说明了。
2. withTimeout
因为 launch
办法的完结在我的上一篇文章中有讲过,所以这里就不细说了,从第一个单元测验反编译后的 Java 代码来看,履行 launch 代码会实例化一个 SuspendLambda
目标,SuspendLambda
会被封装为 DispatchedContinuation
,DispatchedContinuation
是 DispatchedTask
的子类,会被提交到协程分发器中,然后履行挂起 Lambda 表达式(SuspendLambda)中的代码。
在 launch 创立了独立协程 StandaloneCoroutine
后,就会经过 withTimeout
办法创立超时协程 TimeoutCorotuine
。
2.1 withTimeout
withTimeout
办法用于在指定的超不时刻内,在一个协程中运转给定的挂起代码,并在超不时抛出 TimeoutCancellationException
超时撤销反常,履行在 withTimeout 中的代码在超不时将被撤销。假如不想在超时的时分抛出反常,能够运用 withTimeoutOrNull
办法。
/**
* @param timeMillis 超不时刻,单位为毫秒。
*/
public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T):
T {
// ...
// 假如时刻小于等于 0 ,则立刻跑出反常
if (timeMillis <= 0L) throw TimeoutCancellationException("Timed out immediately")
// 经过挂起协程运转代码块,假如超时了就抛出 TimeoutCancellationException 反常。
return suspendCoroutineUninterceptedOrReturn { uCont ->
setupTimeout(TimeoutCoroutine(timeMillis, uCont), block)
}
}
TimeoutCoroutine
private class TimeoutCoroutine<U, in T: U>(
@JvmField val time: Long,
uCont: Continuation<U> // 未阻拦的续体
) : ScopeCoroutine<T>(uCont.context, uCont), Runnable {
override fun run() {
// 撤销协程
cancelCoroutine(TimeoutCancellationException(time, this))
}
// ...
}
TimeoutCoroutine 是 ScopeCoroutine 的子类,ScopeCoroutine 是经过 coroutineScope 协程构建器创立的协程实例。
效果域协程 ScopeCoroutine
ScopeCoroutine
除了 TimeoutCoroutine
还有其他的子类,比方 withContext 办法中会判别协程上下文(协程分发器)是否发生了改变,假如没有改变的话,则运用 UndispatchedCoroutine 直接在当时线程履行协程代码,假如发生了改变,则运用 DispatchedCoroutine 把协程代码块切换到另一个分发器履行。
别的经过 Flow.shareIn
扩展函数,也能够创立 FlowCoroutine
,用 supervisorScope()
函数则能够创立 SupervisorCoroutine
,SupervisorCoroutine
会把协程上下文中本来的 Job 替换为 SupervisorJob
,然后让代码块内的某个子协程在失利时不影响其他子协程。
TimeoutCoroutine
还完结了 Runnable
接口,它的 run 办法是经过协程的事情轮询机制(EventLoop)来触发的,在 run 办法中,创立了一个 TimeoutCancellationException ,并传给了 cancelCoroutine()
办法,这个反常会被封装为表明协程履行失利的 Result.Failure
,前面有讲到,只要上一个续体的履行成果是 Failure ,SuspendLambda 的第二个 case 句子就会查看到反常并抛出。
setupTimeout
private fun <U, T: U> setupTimeout(
coroutine: TimeoutCoroutine<U, T>,
block: suspend CoroutineScope.() -> T
): Any? {
// 协程的续体
val cont = coroutine.uCont
val context = cont.context
// 创立延时履行使命,并设置超时撤销协程的完结回调
coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine, coroutine.context))
// 在当时线程履行代码块
return coroutine.startUndispatchedOrReturnIgnoreTimeout(coroutine, block)
}
在 setupTimeout
办法中,会调用协程上下文的 delay
目标的 invokeOnTimout
办法,delay 目标的默许完结是 DefaultExecutor
, 完结了 Delay
接口的有下面几个类。
-
BlockingEventLoop :用于在
runBlocking
中处理使命,包含延时使命 -
DefaultExecutor :用于在 IO 线程和默许的子线程中履行使命,包含延时使命
-
Dispatcher in TestCoroutineContext :用于在担任履行测验的线程中处理使命,包含延时使命
-
EventLoopImplBase :
BlockingEventLoop
和DefaultExecutor
的父类 -
ExecutorCoroutineDispatcherImpl :
用于在自定义的协程分发器(经过
ExecutorService.asCoroutineDispatcher
创立)中处理使命,包含延时使命 -
HandlerContext:
便是平常运用的
Dispatchers.Main
的完结类,是HandlerDispatcher
的子类,用于在主线程中履行使命,包含延时使命,经过主线程 Looper 完结,也能够经过Handler.asCoroutineDispatcher
把自定义的 Handler 转换为HandlerContext
Delay#invokeOnTimeout
/**
* [CoroutineDispatcher]完结以原生支持使命的守时履行功用, 完结此接口会影响[delay]和[withTimeout]函数的操作。
*
* @suppress **这是一个内部 API,不该从一般代码中运用。**
*/
@InternalCoroutinesApi
public interface Delay {
/**
* 不堵塞线程并在指定的时刻后持续暂停协程。
*
* 这个挂起函数能够被撤销。假如当时协程的[Job]在该挂起函数等候期间被撤销或完结,
* 此函数将当即运用[CancellationException]持续履行。有**即时撤销确保**,假如该作业在该函数挂起时被撤销,则它将无法成功康复。
* 有关详细信息,请拜见[suspendCancellableCoroutine]文档。
*/
public suspend fun delay(time: Long) {
if (time <= 0) return // 不需求推迟
return suspendCancellableCoroutine { scheduleResumeAfterDelay(time, it) }
}
// ...
/**
* 在指定的推迟[timeMillis]后调度[Runnable]的[block]调用。假如不再需求这个调用恳求,那么发生的[DisposableHandle]能够被用于[dispose][DisposableHandle.dispose].
*
* 这个完结运用一个内置的单线程计划的 executor 服务。
*/
public fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
DefaultDelay.invokeOnTimeout(timeMillis, block, context)
}
invokeOnTimeout()
办法会调用 DefaultDelay
的 invokeOnTimeout
的办法。
DefaultExecutor#invokeOnTimeout
internal actual val DefaultDelay: Delay = DefaultExecutor
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
scheduleInvokeOnTimeout(timeMillis, block)
}
DefaultExecutor 的 invokeOnTimeout 办法会调用父类 EventLoopImplBase
的 scheduleInvokeOnTimeout
办法,经过该办法把使命参加到推迟使命行列。
scheduleInvokeOnTimeout
internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
// 调度即将到来的使命
protected fun scheduleInvokeOnTimeout(
timeMillis: Long,
block: Runnable
): DisposableHandle {
// 将毫秒数转换为纳秒数
val timeNanos = delayToNanos(timeMillis)
// 假如时刻不超过最大延时,则创立一个 DelayedRunnableTask 使命,并调度该使命
return if (timeNanos < MAX_DELAY_NS) {
val now = nanoTime()
// 创立延时可运转使命
DelayedRunnableTask(now + timeNanos, block).also { task ->
schedule(now, task) // 调度传入的使命
}
} else {
// 假如超过了最大延时,则回来 NonDisposableHandle
NonDisposableHandle
}
}
// 把延时使命增加到延时使命行列
public fun schedule(now: Long, delayedTask: DelayedTask) {
when (scheduleImpl(now, delayedTask)) {
SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
else -> error("unexpected result")
}
}
private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
if (isCompleted) return SCHEDULE_COMPLETED
// 假如延时行列为空,则创立一个新的延时使命行列
val delayedQueue = _delayed.value ?: run {
_delayed.compareAndSet(null, DelayedTaskQueue(now))
_delayed.value!!
}
// 把使命增加到行列
return delayedTask.scheduleTask(now, delayedQueue, this)
}
// 延时运转使命
private class DelayedRunnableTask(
nanoTime: Long, // 使命的履行时刻
private val block: Runnable // 代码块(使命的详细作业)
) : DelayedTask(nanoTime) {
override fun run() { block.run() } // 运转使命中的 Runnable
}
}
scheduleInvokeOnTimeout
办法会创立一个 DelayedRunnableTask
,并把该 Task 经过 schedule 办法参加到延时使命行列。DelayedRunnableTask 是 DelayedTask 的子类,DelayedTask 的另一个子类是 DelayedReumeTask ,DelayedResumeTask 便是调用 delay() 办法时需求用到的使命类型。
schedule 的详细完结是 scheduleImpl
,scheduleImpl 办法中会判别 _delayed 延时使命行列是否为空,为空的话则创立一个延时使命行列 DelayedTaskQueue ,再把延时使命(超时回调)经过 DelayedTask 的 scheduleTask 办法放到延时使命行列中,scheduleImpl 回来调度成功后,就会唤醒事情轮询。
DelayedTask
// 延时使命
internal abstract class DelayedTask(
/** 使命的履行时刻 */
@JvmField var nanoTime: Long
) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode {
/**
* _heap 变量能够是三种类型:null、ThreadSafeHeap 或 DISPOSED_TASK。
*/
private var _heap: Any? = null
/**
* 判别是不是要履行使命了,当被调用时,将给守时刻 now 与当时目标的 nanoTime 特点进行比较。
* 假如 now >= nanoTime,则回来 true;不然回来 false。
*/
fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L
/**
* scheduleTask 办法用于将当时 DelayedTask 目标参加到 DelayedTaskQueue 堆中,以便在给定的时刻点履行使命。
* now 表明当不时刻,delayed 表明 DelayedTaskQueue 堆,eventLoop 表明 EventLoopImplBase 实例。
* 该办法运用 @Synchronized 注解表明线程同步,确保对 _heap 和 delayed 的修正是原子的。
* 假如 _heap 现已被设置为 DISPOSED_TASK,则回来 SCHEDULE_DISPOSED。
*/
@Synchronized
fun scheduleTask(now: Long, delayed: DelayedTaskQueue, eventLoop: EventLoopImplBase): Int {
if (_heap === DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed
delayed.addLastIf(this) { firstTask ->
// ...
}
return SCHEDULE_OK
}
/**
* dispose 办法用于移除当时 DelayedTask 目标从 _heap 中,使之不再参加后续堆操作。
* 在 @Synchronized 保护下,首要读取 _heap 的值,假如现已被设置为 DISPOSED_TASK 则直接回来,
* 不然测验将当时 DelayedTask 从堆中移除。最终将 _heap 的值设置为 DISPOSED_TASK,
* 表明该目标现已被注销,而且不会再增加到任何堆中。
*/
@Synchronized
final override fun dispose() {
val heap = _heap
if (heap === DISPOSED_TASK) return // 已移除
@Suppress("UNCHECKED_CAST")
(heap as? DelayedTaskQueue)?.remove(this) // 从使命堆中移除当时使命
_heap = DISPOSED_TASK // 不再增加到行列中
}
}
DefaultSchedule#run
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
@Suppress("ObjectPropertyName")
@Volatile
private var _thread: Thread? = null
// 事情轮询线程,假如不存在则创立一个新的轮询线程
override val thread: Thread
get() = _thread ?: createThreadSync()
override fun run() {
ThreadLocalEventLoop.setEventLoop(this)
registerTimeLoopThread()
try {
var shutdownNanos = Long.MAX_VALUE
if (!notifyStartup()) return
while (true) {
Thread.interrupted() // 重置中断标志
// 处理下一个事情
var parkNanos = processNextEvent()
// 没有下一个事情
if (parkNanos == Long.MAX_VALUE) {
// 初始化封闭时刻
val now = nanoTime()
// 假如封闭时刻是长整型的最大值,则将轮询封闭时刻改为一秒后
if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
// 核算剩余需求等候的时刻
val tillShutdown = shutdownNanos - now
if (tillShutdown <= 0) return // 时刻已过,不再轮询
// 用当时运转的最小值和剩余等候时刻中较小的一个来更新等候时刻
parkNanos = parkNanos.coerceAtMost(tillShutdown)
} else
shutdownNanos = Long.MAX_VALUE
if (parkNanos > 0) {
// 查看是否恳求了封闭线程,是的话退出轮询
if (isShutdownRequested) return
// 不中止运转线程,等候一段时刻后唤醒线程
parkNanos(this, parkNanos)
}
}
} finally {
// ...
}
}
}
DefaultExecutor 被唤醒后,就会开端履行 run() 办法,并调用 processNextEvent()
处理下一个事情,在前面提到的第一个单元测验的比如中,下一个事情便是履行延时操作(delay)。
EventLoopImplBase
internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
// 一般使命行列,或许的值:null | CLOSED_EMPTY | task | Queue<Runnable>
private val _queue = atomic<Any?>(null)
// 延时使命行列
private val _delayed = atomic<DelayedTaskQueue?>(null)
// 处理下一个事情
override fun processNextEvent(): Long {
// 优先处理无限制(不需求调度)的使命
if (processUnconfinedEvent()) return 0
// 再找出延时使命来履行
val delayed = _delayed.value
if (delayed != null && !delayed.isEmpty) {
val now = nanoTime()
while (true) {
delayed.removeFirstIf {
// 判别是不是需求履行该使命
if (it.timeToExecute(now)) {
// 使命的时刻到了后,就把使命参加一般使命行列
enqueueImpl(it)
} else
false
} ?: break
}
}
// 取出一般使命并履行
val task = dequeue()
if (task != null) {
task.run()
return 0
}
return nextTime
}
@Suppress("UNCHECKED_CAST")
private fun enqueueImpl(task: Runnable): Boolean {
_queue.loop { queue ->
if (isCompleted) return false // 已中止轮询,不能再增加使命
when (queue) {
// 行列为空,直接把当时使命赋值给 _queue
null -> if (_queue.compareAndSet(null, task)) return true
// 行列不为空,把使命增加到行列
is Queue<*> -> {
when ((queue as Queue<Runnable>).addLast(task)) {
Queue.ADD_SUCCESS -> return true
Queue.ADD_CLOSED -> return false
Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
}
}
// _queue 是一个使命的花,则将新使命增加到队尾
else -> when {
queue === CLOSED_EMPTY -> return false
else -> {
val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
newQueue.addLast(queue as Runnable)
newQueue.addLast(task)
if (_queue.compareAndSet(queue, newQueue)) return true
}
}
}
}
}
// 从一般使命行列中取出使命
@Suppress("UNCHECKED_CAST")
private fun dequeue(): Runnable? {
_queue.loop { queue ->
when (queue) {
null -> return null
is Queue<*> -> {
val result = (queue as Queue<Runnable>).removeFirstOrNull()
if (result !== Queue.REMOVE_FROZEN) return result as Runnable?
_queue.compareAndSet(queue, queue.next())
}
else -> when {
queue === CLOSED_EMPTY -> return null
else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable
}
}
}
}
}
2. delay
delay
/**
* 在不堵塞线程的情况下,推迟协程一段时刻并在指守时刻后康复它
*/
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
if (timeMillis < Long.MAX_VALUE) {
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
}
delay 办法会经过 suspendCancellableCoroutine
办法创立一个可撤销续体,并把这个续体传给 DefaultExecutor
。
suspendCancellableCoroutine
public suspend inline fun <T> suspendCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
// 创立可撤销续体
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
// 初始化可撤销性(设置撤销回调)
cancellable.initCancellability()
// 履行代码块
block(cancellable)
// 获取履行成果(把状况改为挂起或直接回来成果)
cancellable.getResult()
}
suspendCancellableCoroutine
中会把上一个续体经过 intercepted()
扩展函数封装为能够分发给分发器履行的 DispatchedContinuation
,再把 DispatchedContinuation
传给续体 CancellableContinuationImpl
,再初始化 CancellableContinuationImpl
的可撤销性、履行代码块、获取可撤销续体的履行成果。
suspendCancellableCoroutine
办法的典型用法,便是在等候回调成果时挂起协程,并将成果回来给调用方,这样就不必再为回调创立一层嵌套,又或许说是对回调嵌套的封装,假如回调有屡次的话,就要用 callbackFlow。
suspend fun awaitCallback(): T = suspendCancellableCoroutine { continuation ->
val callback = object : Callback { // 完结某个回调接口
override fun onCompleted(value: T) {
// 经过回调提供的值康复协程
continuation.resume(value)
}
override fun onApiError(cause: Throwable) {
// 经过回调提供的反常康复协程
continuation.resumeWithException(cause)
}
}
// 注册回调
api.register(callback)
// 在撤销协程时移除回调
continuation.invokeOnCancellation { api.unregister(callback) }
// 此刻 suspendCancellableCoroutine 函数挂起协程,直到回调触发
}
但是要留意的是,suspendCancellableContinuation 的代码块中引用到的续体(continuation)只能康复一次,假如屡次康复的话,就会抛出反常,所以尽量不要在没有设置反常处理器的线程中康复续体的履行。
CancellableContinuationImpl#initCancellability
@PublishedApi
internal open class CancellableContinuationImpl<in T>(
final override val delegate: Continuation<T>,
resumeMode: Int
) : DispatchedTask<T>(resumeMode), CancellableContinuation<T>, CoroutineStackFrame {
public override val context: CoroutineContext = delegate.context
/*
* 完结说明
*
* CancellableContinuationImpl 是 Job 的子集,具有以下限制:
* 1)它只能有撤销监听器(没有“正在撤销”)
* 2)假如被撤销,它总是调用撤销监听器(没有“当即调用”)
* 3)它最多只能有一个撤销监听器
* 4)它的撤销监听器无法注销
* 作为成果,它具有更简单的状况机、更轻量级的机制和更少的依靠
*/
/* 决议计划状况机
+-----------+ trySuspend +-----------+
| UNDECIDED | -------------> | SUSPENDED |
+-----------+ +-----------+
|
| tryResume
V
+-----------+
| RESUMED |
+-----------+
留意:tryResume 和 trySuspend 最多能够被调用一次,哪个先被调用就进入哪个状况
*/
private val _decision = atomic(UNDECIDED)
// 初始化可撤销性
public override fun initCancellability() {
val handle = installParentHandle()
?: return // 假如没有父使命,下面的代码就不必履行了
// 假如父使命已完结,则直接开释资源
if (isCompleted) {
handle.dispose()
parentHandle = NonDisposableHandle
}
}
// 装置父使命句柄
private fun installParentHandle(): DisposableHandle? {
// 没有父使命的话直接回来
val parent = context[Job] ?: return null
// 装置句柄(设置完结回调)
val handle = parent.invokeOnCompletion(
onCancelling = true,
// 创立子续体
handler = ChildContinuation(this).asHandler
)
parentHandle = handle
return handle
}
}
CancellableContinuationImpl 的 initCancellability() 办法会调用 installParentHandle() 办法把自己设为父使命的作业节点,假如父使命已完结的话,就直接让父使命回来的 JobNode 句柄开释资源(把自己从父使命的作业节点列表中移除)。
在 installParentHandle() 办法中,会把自己封装为 ChildContinuation,把调用父使命的 invokeOnCompletion() 办法获取句柄(作业节点)。
ChildContinuation
internal class ChildContinuation(
@JvmField val child: CancellableContinuationImpl<*>
) : JobCancellingNode() {
override fun invoke(cause: Throwable?) {
child.parentCancelled(child.getContinuationCancellationCause(job))
}
}
ChildContinuation
是 JobCancellingNode
的子类,当协程被撤销的时分,就会把撤销事情和原因传给 JobCancellingNode
,ChildContinuation
收到撤销告诉,invoke
办法被调用后,就会把反常传给 CancellableContinuationImpl
。
完整时序图
参考资料
Kotlin 官方文档撤销与超时:kotlinlang.org/docs/cancel…