本文剖析示例代码如下:
launch(Dispatchers.Main) {
flow {
emit(1)
emit(2)
}.flowOn(Dispatchers.IO).collect {
delay(1000)
withContext(Dispatchers.IO) {
Log.d("liduo", "$it")
}
Log.d("liduo", "$it")
}
}
一.flowOn办法
flowOn办法用于将上游的流切换到指定协程上下文的调度器中履行,同时不会把协程上下文露出给下流的流,即flowOn办法中协程上下文的调度器不会对下流的流生效。如下面这段代码所示:
launch(Dispatchers.Main) {
flow {
emit(2) // 履行在IO线程池
}.flowOn(Dispatchers.IO).map {
it + 1 // 履行在Default线程池
}.flowOn(Dispatchers.Default).collect {
Log.d("liduo", "$it") //履行在主线程
}
}
接下来,剖析一下flowOn办法,代码如下:
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
// 查看当时协程没有履行完毕
checkFlowContext(context)
return when {
// 为空,则回来本身
context == EmptyCoroutineContext -> this
// 假如是可交融的Flow,则尝试交融操作,获取新的流
this is FusibleFlow -> fuse(context = context)
// 其他状况,包装成可交融的Flow
else -> ChannelFlowOperatorImpl(this, context = context)
}
}
// 保证Job不为空
private fun checkFlowContext(context: CoroutineContext) {
require(context[Job] == null) {
"Flow context cannot contain job in it. Had $context"
}
}
在flowOn办法中,首要会查看办法地点的协程是否履行完毕。假如没有完毕,则会履行判别句子,这儿flowOn办法传入的上下文不是空上下文,且经过flow办法构建出的Flow目标也不是FusibleFlow类型的目标,因而这儿会走到else分支,将上游flow办法创立的Flow目标和上下文包装成ChannelFlowOperatorImpl类型的目标。
1.ChannelFlowOperatorImpl类
ChannelFlowOperatorImpl类承继自ChannelFlowOperator类,用于将上游的流包装成一个ChannelFlow目标,它的承继联系如下图所示:
经过上图能够知道,ChannelFlowOperatorImpl类最终承继了ChannelFlow类,代码如下:
internal class ChannelFlowOperatorImpl<T>(
flow: Flow<T>,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.OPTIONAL_CHANNEL,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) {
// 用于流交融时创立新的流
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow)
// 若当时的流不需求经过Channel即可实现正常作业时,会调用此办法
override fun dropChannelOperators(): Flow<T>? = flow
// 触发对下一级流进行搜集
override suspend fun flowCollect(collector: FlowCollector<T>) =
flow.collect(collector)
}
二.collect办法
在Kotlin协程:Flow根底原理中讲到,当履行collect办法时,内部会调用最终发生的Flow目标的collect办法,代码如下:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
这个最终发生的Flow目标便是ChannelFlowOperatorImpl类目标。
1.ChannelFlowOperator类的collect办法
ChannelFlowOperatorImpl类没有重写collect办法,因而调用的是它的父类ChannelFlowOperator类的collect办法,代码如下:
override suspend fun collect(collector: FlowCollector<T>) {
// OPTIONAL_CHANNEL为默认值,这儿满足条件,之后会详细解说
if (capacity == Channel.OPTIONAL_CHANNEL) {
// 获取当时协程的上下文
val collectContext = coroutineContext
// 核算新的上下文
val newContext = collectContext + context
// 假如前后上下文没有发生改变
if (newContext == collectContext)
// 直接触发对下一级流的搜集
return flowCollect(collector)
// 假如上下文发生改变,但不需求切换线程
if (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor])
// 切换协程上下文,调用flowCollect办法触发下一级流的搜集
return collectWithContextUndispatched(collector, newContext)
}
// 调用父类的collect办法
super.collect(collector)
}
// 获取当时协程的上下文,该办法会被编译器处理
@SinceKotlin("1.3")
@Suppress("WRONG_MODIFIER_TARGET")
@InlineOnly
public suspend inline val coroutineContext: CoroutineContext
get() {
throw NotImplementedError("Implemented as intrinsic")
}
ChannelFlowOperator类的collect办法在规划上与协程的withContext办法规划思路是共同的:在办法内依据上下文的不同状况进行判别,在必要时才会切换线程去履行任务。
经过flowOn办法创立的ChannelFlowOperatorImpl类目标,参数capacity为默认值OPTIONAL_CHANNEL。因而代码在履行时会进入到判别中,但由于我们指定了上下文为Dispatchers.IO,因而上下文发生了改变,同时拦截器也发生了改变,所以最终会调用ChannelFlowOperator类的父类的collect办法,也便是ChannelFlow类的collect办法。
2.ChannelFlow类的collect办法
ChannelFlow类的代码如下:
override suspend fun collect(collector: FlowCollector<T>): Unit =
coroutineScope {
collector.emitAll(produceImpl(this))
}
在ChannelFlow类的collect办法中,首要经过coroutineScope办法创立了一个作用域协程,接着调用了produceImpl办法,代码如下:
public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)
produceImpl办法内部调用了produce办法,而且传入了待履行的任务collectToFun。
produce办法在Kotlin协程:协程的根底与运用中曾提到过,它是官方提供的发动协程的四个办法之一,别的三个办法为launch办法、async办法、actor办法。代码如下:
internal fun <E> CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
start: CoroutineStart = CoroutineStart.DEFAULT,
onCompletion: CompletionHandler? = null,
@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> {
// 依据容量与溢出策略创立Channel目标
val channel = Channel<E>(capacity, onBufferOverflow)
// 核算新的上下文
val newContext = newCoroutineContext(context)
// 创立协程
val coroutine = ProducerCoroutine(newContext, channel)
// 监听完成事件
if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
// 发动协程
coroutine.start(start, coroutine, block)
return coroutine
}
在produce办法内部,首要创立了一个Channel类型的目标,接着创立了类型为ProducerCoroutine的协程,而且传入Channel目标作为参数。最终,produce办法回来了一个ReceiveChannel接口指向的目标,当协程履行完毕后,会经过Channel目标将成果经过send办法发送出来。
至此,能够知道flowOn办法的实现实际上是利用了协程拦截器的拦截功能。
在这儿之后,代码逻辑分成了两部分,一部分是block在ProducerCoroutine协程中的履行,另一部分是经过ReceiveChannel目标获取履行的成果。
3.flow办法中代码的履行
在produceImpl办法中,调用了produce办法,而且传入了collectToFun目标,这个目标将会在produce办法创立的协程中履行,代码如下:
internal val collectToFun: suspend (ProducerScope<T>) -> Unit
get() = { collectTo(it) }
当调用collectToFun目标的invoke办法时,会触发collectTo办法的履行,该办法在ChannelFlowOperator类中被重写,代码如下:
protected override suspend fun collectTo(scope: ProducerScope<T>) =
flowCollect(SendingCollector(scope))
在collectTo办法中,首要将参数scope封装成SendingCollector类型的目标,接着调用了flowCollect办法,该办法在ChannelFlowOperatorImpl类中被重写,代码如下:
override suspend fun flowCollect(collector: FlowCollector<T>) =
flow.collect(collector)
ChannelFlowOperatorImpl类的flowCollect办法内部调用了flow目标的collect办法,这个flow目标便是开始经过flow办法构建的目标。依据Kotlin协程:Flow根底原理的剖析,这个flow目标类型为SafeFlow,最终会经过collectSafely办法,触发flow办法中的block履行。代码如下:
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
// 触发履行
collector.block()
}
}
当flow办法在履行过程中需求向下流宣布值时,会调用emit办法。依据上面flowCollect办法和collectTo办法能够知道,collectSafely办法的collector目标便是collectTo办法中创立的SendingCollector类型的目标,代码如下:
@InternalCoroutinesApi
public class SendingCollector<T>(
private val channel: SendChannel<T>
) : FlowCollector<T> {
// 经过Channel类目标发送值
override suspend fun emit(value: T): Unit = channel.send(value)
}
当调用SendingCollector类型的目标的emit办法时,会经过调用类型为Channel的目标的send办法,将值发送出去。
接下来,将剖析下流如何接纳上游宣布的值。
4.接纳flow办法宣布的值
回到ChannelFlow类的collect办法,之前提到collect办法中调用produceImpl办法,敞开了一个新的协程去履行任务,而且回来了一个ReceiveChannel接口指向的目标。代码如下:
override suspend fun collect(collector: FlowCollector<T>): Unit =
coroutineScope {
collector.emitAll(produceImpl(this))
}
在调用完produceImpl办法后,接着调用了emitAll办法,将ReceiveChannel接口指向的目标作为emitAll办法的参数,代码如下:
public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Unit =
emitAllImpl(channel, consume = true)
emitAll办法是FlowCollector接口的扩展办法,内部调用了emitAllImpl办法对参数channel进行封装,代码如下:
private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
// 用于保存反常
var cause: Throwable? = null
try {
// 死循环
while (true) {
// 挂起,等候接纳Channel成果或Channel封闭
val result = run { channel.receiveOrClosed() }
// 假如Channel封闭了
if (result.isClosed) {
// 假如有反常,则抛出
result.closeCause?.let { throw it }
// 没有反常,则跳出循环
break
}
// 获取并发送值
emit(result.value)
}
} catch (e: Throwable) {
// 捕获到反常时抛出
cause = e
throw e
} finally {
// 履行完毕封闭Channel
if (consume) channel.cancelConsumed(cause)
}
}
emitAllImpl办法是FlowCollector接口的扩展办法,而这儿的FlowCollector接口指向的目标,便是collect办法中创立的匿名目标,代码如下:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
在emitAllImpl办法中,当经过receiveOrClosed办法获取到上游宣布的值时,会调用emit办法告诉下流,这时就会触发collect办法中block的履行,最终实现值从流的上游传递到了下流。
三.flowOn办法与流的交融
假定对一个流连续调用两次flowOn办法,那么流最终会在哪个flowOn办法指定的调度器中履行呢?代码如下:
launch(Dispatchers.Main) {
flow {
emit(2)
// emit办法是在IO线程履行还是在主线程履行呢?
}.flowOn(Dispatchers.IO).flowOn(Dispatchers.Main).collect {
Log.d("liduo", "$it")
}
}
答案是在IO线程履行,为什么呢?
依据本篇上面的剖析,当第一次调用flowOn办法时,上游的流会被包裹成ChannelFlowOperatorImpl目标,代码如下:
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
// 查看当时协程没有履行完毕
checkFlowContext(context)
return when {
// 为空,则回来本身
context == EmptyCoroutineContext -> this
// 假如是可交融的Flow,则尝试交融操作,获取新的流
this is FusibleFlow -> fuse(context = context)
// 其他状况,包装成可交融的Flow
else -> ChannelFlowOperatorImpl(this, context = context)
}
}
而当第2次调用flowOn办法时,由于此刻上游的流——ChannelFlowOperatorImpl类型的目标,实现了FusibleFlow接口,因而,这儿会触发流的交融,直接调用上游的流的fuse办法,并传入新的上下文。这儿容量和溢出策略均为默认值。
依据Kotlin协程:Flow的交融、Channel容量、溢出策略的剖析,这儿会调用ChannelFlow类的fuse办法。相关代码如下:
public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> {
...
// 核算交融后流的上下文
// context为下流的上下文,this.context为上游的上下文
val newContext = context + this.context
...
}
再依据之前在Kotlin协程:协程上下文与上下文元素中的剖析,当两个上下文进行相加时,后一个上下文中的拦截器会掩盖前一个上下文中的拦截器。在上面的代码中,后一个上下文为上游的流的上下文,因而会优先运用上游的拦截器。代码如下:
public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other
四.总结
粉线为运用时代码编写次序,绿线为下流触发上游的调用次序,红线为上游向下流发送值的调用次序,蓝线为线程切换的位置。