Kotlin协程不是什么空中阁楼,Kotlin源代码会被编译成class字节码文件,终究会运转到虚拟机中。所以从本质上讲,Kotlin和Java是相似的,都是可以编译发生class的言语,但终究还是会遭到虚拟机的约束,它们的代码终究会在虚拟机上的某个线程上被履行。
之前咱们剖析了launch的原理,但其时咱们没有去剖析协程创立出来后是怎样与线程发生相关的,怎样被分发到详细的线程上履行的,本篇文章就带咱们剖析一下。
前置知识
要想搞懂Dispatchers,咱们先来看一下Dispatchers、CoroutineDispatcher、ContinuationInterceptor、CoroutineContext之间的关系
public actual object Dispatchers {
@JvmStatic
public actual val Default: CoroutineDispatcher = DefaultScheduler
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
@JvmStatic
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
@JvmStatic
public val IO: CoroutineDispatcher = DefaultIoScheduler
}
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
}
public interface ContinuationInterceptor : CoroutineContext.Element {}
public interface Element : CoroutineContext {}
Dispatchers中寄存的是协程调度器(它本身是一个单例),有咱们平常常用的IO、Default、Main等。这些协程调度器都是CoroutineDispatcher的子类,这些协程调度器其实都是CoroutineContext。
demo
咱们先来看一个关于launch的demo:
fun main() {
val coroutineScope = CoroutineScope(Job())
coroutineScope.launch {
println("Thread : ${Thread.currentThread().name}")
}
Thread.sleep(5000L)
}
在生成CoroutineScope时,demo中没有传入相关的协程调度器,也便是Dispatchers。那这个launch会运转到哪个线程之上?
运转试一下:
Thread : DefaultDispatcher-worker-1
居然运转到了DefaultDispatcher-worker-1
线程上,这看起来显着是Dispatchers.Default
协程调度器里边的线程。我明明没传Dispatchers相关的context,居然会运转到子线程上。阐明运转到default线程是launch默许的。
它是怎样与default线程发生相关的?打开源码一探终究:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
//代码1
val newContext = newCoroutineContext(context)
//代码2
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
//代码3
coroutine.start(start, coroutine, block)
return coroutine
}
- 将传入的CoroutineContext结构出新的context
- 发动形式,判别是否为懒加载,假如是懒加载则构建懒加载协程目标,不然便是规范的
- 发动协程
咱们重点关注代码1,这是与CoroutineContext相关的。
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
//从父协程那里承继过来的context+这次的context
val combined = coroutineContext.foldCopiesForChildCoroutine() + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
//combined可以简单的把它看成是一个map,它是CoroutineContext类型的
//假如当时context不等于Dispatchers.Default,而且从map里边取ContinuationInterceptor(用于拦截之后分发线程的)值为空,阐明没有传入协程应该在哪个线程上运转的相关参数
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
调用launch的时候,咱们没有传入context,默许参数是EmptyCoroutineContext。这儿的combined,它其实是CoroutineContext类型的,可以简单的看成是map(其实不是,仅仅相似)。经过combined[ContinuationInterceptor]可以将传入的线程调度相关的参数给取出来,这儿假如取出来为空,是给该context添加了一个Dispatchers.Default,然后把新的context回来出去了。所以launch默许状况下,会走到default线程去履行。
补充一点:CoroutineContext可以经过+
连接是因为它内部有个public operator fun plus
函数。可以经过combined[ContinuationInterceptor]这种办法访问元素是因为有个public operator fun get
函数。
public interface CoroutineContext {
/**
* Returns the element with the given [key] from this context or `null`.
*/
public operator fun <E : Element> get(key: Key<E>): E?
/**
* Returns a context containing elements from this context and elements from other [context].
* The elements from this context with the same key as in the other one are dropped.
*/
public operator fun plus(context: CoroutineContext): CoroutineContext {
......
}
}
startCoroutineCancellable
上面咱们剖析了launch默许状况下,context中会添加Dispatchers.Default的这个协程调度器,到时launch的Lambda会在default线程上履行,其间详细流程是怎样样的,咱们剖析一下。
在之前的文章 Kotlin协程之launch原理 中咱们剖析过,launch默许状况下会终究履行到startCoroutineCancellable
函数。
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
//构建ContinuationImpl
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
//走这儿
create(probeCompletion)
else
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function1<Continuation<T>, Any?>).invoke(it)
}
}
在Kotlin协程之launch原理 文章中,咱们剖析过create(probeCompletion)这儿创立出来的是launch的那个Lambda,编译器会发生一个匿名内部类,它承继自SuspendLambda,而SuspendLambda是承继自ContinuationImpl。所以 createCoroutineUnintercepted(completion)一开端构建出来的是一个ContinuationImpl,接下来需求去看它的intercepted()函数。
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
}
第一次走到intercepted()函数时,intercepted肯定是为null的,还没初始化。此时会经过context[ContinuationInterceptor]取出Dispatcher目标,然后调用该Dispatcher目标的interceptContinuation()函数。这个Dispatcher目标在demo这儿其实便是Dispatchers.Default。
public actual object Dispatchers {
@JvmStatic
public actual val Default: CoroutineDispatcher = DefaultScheduler
}
可以看到,Dispatchers.Default是一个CoroutineDispatcher目标,interceptContinuation()函数就在CoroutineDispatcher中。
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
}
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
这个办法十分简单,便是新建而且回来了一个DispatchedContinuation目标,将this和continuation给传入进去。这儿的this是Dispatchers.Default。
所以,终究咱们发现走完startCoroutineCancellable的前2步之后,也便是走完intercepted()之后,创立的是DispatchedContinuation目标,最终是调用的DispatchedContinuation的resumeCancellableWith函数。最终这步比较要害,这是真实将协程的详细履行逻辑放到线程上履行的部分。
internal class DispatchedContinuation<in T>(
//这儿传入的dispatcher在demo中是Dispatchers.Default
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
//代码1
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
//代码2
dispatcher.dispatch(context, this)
} else {
//代码3
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result)
}
}
}
}
}
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
......
}
internal actual typealias SchedulerTask = Task
internal abstract class Task(
@JvmField var submissionTime: Long,
@JvmField var taskContext: TaskContext
) : Runnable {
......
}
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
}
从DispatchedContinuation的承继结构来看,它既是一个Continuation(经过托付给传入的continuation参数),也是一个Runnable。
- 首先看代码1:这个dispatcher在demo中其实是Dispatchers.Default ,然后调用它的isDispatchNeeded(),这个函数界说在CoroutineDispatcher中,默许便是回来true,只有Dispatchers.Unconfined回来false
- 代码2:调用Dispatchers.Default的dispatch函数,将context和自己(DispatchedContinuation,也便是Runnable)传过去了
- 代码3:对应Dispatchers.Unconfined的状况,它的isDispatchNeeded()回来false
现在咱们要剖析代码2之后的履行逻辑,也便是将context和Runnable传入到dispatch函数之后是怎样履行的。按道理,看到Runnable,那可能这个与线程履行相关,应该离咱们想要的答案不远了。回到Dispatchers,咱们发现Dispatchers.Default是DefaultScheduler类型的,那咱们就去DefaultScheduler中或许其父类中去找dispatch函数。
public actual object Dispatchers {
@JvmStatic
public actual val Default: CoroutineDispatcher = DefaultScheduler
}
internal object DefaultScheduler : SchedulerCoroutineDispatcher(
CORE_POOL_SIZE, MAX_POOL_SIZE,
IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {
......
}
internal open class SchedulerCoroutineDispatcher(
private val corePoolSize: Int = CORE_POOL_SIZE,
private val maxPoolSize: Int = MAX_POOL_SIZE,
private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {
private var coroutineScheduler = createScheduler()
private fun createScheduler() =
CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
}
最终发现dispatch函数在其父类SchedulerCoroutineDispatcher中,在这儿构建了一个CoroutineScheduler,直接调用了CoroutineScheduler目标的dispatch,然后将Runnable(也便是上面的DispatchedContinuation目标)传入。
internal class CoroutineScheduler(
@JvmField val corePoolSize: Int,
@JvmField val maxPoolSize: Int,
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
override fun execute(command: Runnable) = dispatch(command)
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
trackTask() // this is needed for virtual time support
//代码1:构建Task,Task完成了Runnable接口
val task = createTask(block, taskContext)
//代码2:取当时线程转为Worker目标,Worker是一个承继自Thread的类
val currentWorker = currentWorker()
//代码3:尝试将Task提交到本地行列并依据结果履行相应的操作
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
//代码4:notAdded不为null,则再将notAdded(Task)添加到大局行列中
if (!addToGlobalQueue(notAdded)) {
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null
// Checking 'task' instead of 'notAdded' is completely okay
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
//代码5: 创立Worker并开端履行该线程
signalCpuWork()
} else {
// Increment blocking tasks anyway
signalBlockingWork(skipUnpark = skipUnpark)
}
}
private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
internal inner class Worker private constructor() : Thread() {
.....
}
}
观察发现,本来CoroutineScheduler类完成了java.util.concurrent.Executor接口,一起完成了它的execute办法,这个办法也会调用dispatch()。
- 代码1:首先是经过Runnable构建了一个Task,这个Task其实也是完成了Runnable接口,仅仅把传入的Runnable包装了一下
- 代码2:将当时线程取出来转换成Worker,当然第一次时,这个转换不会成功,这个Worker是承继自Thread的一个类
- 代码3:将task提交到本地行列中,这个本地行列待会儿会在Worker这个线程履行时取出Task,并履行Task
- 代码4:假如task提交到本地行列的过程中没有成功,那么会添加到大局行列中,待会儿也会被Worker取出来Task并履行
- 代码5:创立Worker线程,并开端履行
开端履行Worker线程之后,咱们需求看一下这个线程的run办法履行的是啥,也便是它的详细履行逻辑。
internal inner class Worker private constructor() : Thread() {
override fun run() = runWorker()
private fun runWorker() {
var rescanned = false
while (!isTerminated && state != WorkerState.TERMINATED) {
//代码1
val task = findTask(mayHaveLocalTasks)
if (task != null) {
rescanned = false
minDelayUntilStealableTaskNs = 0L
//代码2
executeTask(task)
continue
} else {
mayHaveLocalTasks = false
}
if (minDelayUntilStealableTaskNs != 0L) {
if (!rescanned) {
rescanned = true
} else {
rescanned = false
tryReleaseCpu(WorkerState.PARKING)
interrupted()
LockSupport.parkNanos(minDelayUntilStealableTaskNs)
minDelayUntilStealableTaskNs = 0L
}
continue
}
tryPark()
}
tryReleaseCpu(WorkerState.TERMINATED)
}
fun findTask(scanLocalQueue: Boolean): Task? {
if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
// If we can't acquire a CPU permit -- attempt to find blocking task
val task = if (scanLocalQueue) {
localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
} else {
globalBlockingQueue.removeFirstOrNull()
}
return task ?: trySteal(blockingOnly = true)
}
private fun executeTask(task: Task) {
val taskMode = task.mode
idleReset(taskMode)
beforeTask(taskMode)
runSafely(task)
afterTask(taskMode)
}
fun runSafely(task: Task) {
try {
task.run()
} catch (e: Throwable) {
val thread = Thread.currentThread()
thread.uncaughtExceptionHandler.uncaughtException(thread, e)
} finally {
unTrackTask()
}
}
}
run办法直接调用的runWorker(),在里边是一个while循环,不断从行列中取Task来履行。
- 代码1:从本地行列或许大局行列中取出Task
- 代码2:履行这个task,终究其实便是调用这个Runnable的run办法。
也便是说,在Worker这个线程中,履行了这个Runnable的run办法。还记得这个Runnable是谁么?它便是上面咱们看过的DispatchedContinuation,这儿的run办法履行的便是协程使命,那这块详细的run办法的完成逻辑,咱们应该到DispatchedContinuation中去找。
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
......
}
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
public final override fun run() {
assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation
withContinuationContext(continuation, delegate.countOrElement) {
val context = continuation.context
val state = takeState() // NOTE: Must take state in any case, even if cancelled
val exception = getExceptionalResult(state)
/*
* Check whether continuation was originally resumed with an exception.
* If so, it dominates cancellation, otherwise the original exception
* will be silently lost.
*/
val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
//非空,且未处于active状况
if (job != null && !job.isActive) {
//开端之前,协程现已被取消,将详细的Exception传出去
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
continuation.resumeWithStackTrace(cause)
} else {
//有反常,传递反常
if (exception != null) {
continuation.resumeWithException(exception)
} else {
//代码1
continuation.resume(getSuccessfulResult(state))
}
}
}
} catch (e: Throwable) {
// This instead of runCatching to have nicer stacktrace and debug experience
fatalException = e
} finally {
val result = runCatching { taskContext.afterTask() }
handleFatalException(fatalException, result.exceptionOrNull())
}
}
}
咱们主要看一下代码1处,调用了resume敞开协程。前面没有反常,才开端发动协程,这儿才是真实的开端发动协程,开端履行launch传入的Lambda表达式。这个时候,协程的逻辑是在Worker这个线程上履行的了,切到某个线程上履行的逻辑现已完成了。
ps: rusume会走到BaseContinuationImpl的rusumeWith,然后走到launch传入的Lambda匿名内部类的invokeSuspend办法,开端履行状况机逻辑。前面的文章 Kotlin协程createCoroutine和startCoroutine原理 咱们剖析过这儿,这儿就仅仅简单提一下。
到这儿,Dispatchers的履行流程就算完了,前后都串起来了。
小结
Dispatchers是协程框架中与线程交互的要害。底层会有不同的线程池,Dispatchers.Default、IO,协程使命来了的时候会封装成一个个的Runnable,丢到线程中履行,这些Runnable的run办法中履行的其实便是continuation.resume,也便是launch的Lambda生成的SuspendLambda匿名内部类,也便是敞开协程状况机,开端协程的真实履行。