开启生长之旅!这是我参与「日新方案 12 月更文应战」的第29天,点击查看活动概况
1.Flow流程中为什么是【冷】的
先看一段Flow的运用代码:
fun main() = runBlocking {
flow {
emit(0)
emit(1)
emit(2)
emit(3)
}.collect{
println("it:$it")
}
println("end")
}
//输出成果:
//it:0
//it:1
//it:2
//it:3
//end
这是Flow的最简略的运用方法,经过调用collect
达到我所希望的成果,那么在弄清楚Flow为什么是冷的之前先看一下Flow的创立流程:
public fun <T> flow(
@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> =
SafeFlow(block)
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
flow是一个高阶函数,参数类型是FlowCollector.() -> Unit
,FlowCollector是它的扩展或者成员办法,没有参数也没有返回值,flow()函数的返回值是Flow,详细到返回类型是SafeFlow()
,SafeFlow()
是AbstractFlow()
的子类。
@FlowPreview
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
//1
public final override suspend fun collect(collector: FlowCollector<T>) {
//2
val safeCollector = SafeCollector(collector, coroutineContext)
try {
//3
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}
从AbstractFlow
源码中能够知道它完成了Flow
接口,这儿进一步知道了SafeFlow
间接的完成了Flow
接口。这儿深化了解一下AbstractFlow
做了哪些工作,经过上面三个注释进行解说:
- 注释1:这个
collect
便是demo中的collect
的调用。这儿我做一个猜想:collect
的调用会触发上游Lambda中的emit()
函数的履行,然后将数据传递给collect
; - 注释2:这儿主要便是进行了安全查看,
SafeCollector
只是把collect
中的参数FlowCollector<T>
重新进行了封装,详细的安全查看是怎么样的稍后进行剖析; - 注释3:这儿调用了
collectSafely
这个抽象办法,而这儿的详细完成是在SafeFlow
中的collectSafely
,然后调用了collector.block()
,这儿其实便是调用了flow()
中的emit
办法,或者说collector.block()
调用了4次emit()
函数。
到这儿Flow
的创立就完毕了,那么现在来总结一下为什么Flow
是冷的:FLow之所以是冷的是由于它的结构进程,在它的结构进程中结构了一个SafeFlow
目标但并不会触发Lambda表达式的履行,只有当collect()
被调用后Lambda表达式才开始履行所以它是冷的。
-
SafeCollector
是怎么进行安全查看的
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER", "INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "UNCHECKED_CAST")
internal actual class SafeCollector<T> actual constructor(
//1
@JvmField internal actual val collector: FlowCollector<T>,
@JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
override val callerFrame: CoroutineStackFrame? get() = completion as? CoroutineStackFrame
override fun getStackTraceElement(): StackTraceElement? = null
@JvmField // Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector
internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
private var lastEmissionContext: CoroutineContext? = null
private var completion: Continuation<Unit>? = null
// ContinuationImpl
override val context: CoroutineContext
get() = completion?.context ?: EmptyCoroutineContext
override fun invokeSuspend(result: Result<Any?>): Any {
result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
completion?.resumeWith(result as Result<Unit>)
return COROUTINE_SUSPENDED
}
// Escalate visibility to manually release intercepted continuation
public actual override fun releaseIntercepted() {
super.releaseIntercepted()
}
/**
* 这是状态机重用的巧妙完成。
* 首先它查看它没有被并发运用(这儿是明确制止的),
* 然后只缓存一个完成实例以避免在每次宣布时进行额定分配,使其在热路径上消除垃圾。
*/
//2
override suspend fun emit(value: T) {
return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
try {
//3
emit(uCont, value)
} catch (e: Throwable) {
//保存已抛出emit反常的事实,便于被上游
lastEmissionContext = DownstreamExceptionElement(e)
throw e
}
}
}
private fun emit(uCont: Continuation<Unit>, value: T): Any? {
val currentContext = uCont.context
currentContext.ensureActive()
//4
val previousContext = lastEmissionContext
if (previousContext !== currentContext) {
checkContext(currentContext, previousContext, value)
}
completion = uCont
//5
return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
}
private fun checkContext(
currentContext: CoroutineContext,
previousContext: CoroutineContext?,
value: T
) {
if (previousContext is DownstreamExceptionElement) {
exceptionTransparencyViolated(previousContext, value)
}
checkContext(currentContext)
lastEmissionContext = currentContext
}
private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) {
error("""
Flow exception transparency is violated:
Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
For a more detailed explanation, please refer to Flow documentation.
""".trimIndent())
}
}
//6
private val emitFun =
FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
- 注释1:这儿的
collect
参数对应的便是Flow在结构时的匿名内部类FlowCollector
,在AS中看到的this:FlowCollector<Int>
便是它; - 注释2:这个
emit()
函数其实便是demo中用来发送数据的emit(0)...emit(3)
,这儿能够了解为Flow上游发送的数据终究会传递到这个emit()
中; - 注释3:这儿的
emit(uCont, value)
函数中多了两个参数,其间第一个是suspendCoroutineUninterceptedOrReturn
,它是一个高阶函数,是把挂起函数的Continuation
暴露出来并作为参数进行传递,第二个则是上游发送过来的数据。这儿还有一个反常捕获,反常被捕获后存储在lastEmissionContext
,作用是:在下流发送反常今后能够让上游感知到。下面会有一个比照; - 注释4:这儿对当时协程上下文与之前的协程上下文进行比照,如果两者不一致就会履行
checkContext()
,在它里面做出进一步的判别和提示; - 注释5:这儿调用的便是下流的
emit()
,也便是FlowCollector
中的emit
函数,这儿接纳的那个value
便是上游的emit(0)...emit(3)
传递过来的,这儿可能不太好了解,我将demo中的代码换个写法就理解了:
fun main() = runBlocking {
flow {
emit(0)
emit(1)
emit(2)
emit(3)
}.collect{
println("it:$it")
}
println("换个写法")
flow{
emit(0)
emit(1)
emit(2)
emit(3)
}.collect(object :FlowCollector<Int>{
//注释5调用的便是这个emit办法。
override suspend fun emit(value: Int) {
println("value:$value")
}
})
println("end")
}
两种写法的到的成果是如出一辙的,第一种写法其实便是第二种写法的简写方法。
- 注释6:这是函数引用的语法,代表了它便是
FlowCollector
的emit()
办法,它的类型是Function3, Any?, Continuation, Any?>
,而这个Function3在前面了解挂起函数原理的时分将Kotlin代码反编译后有提过相似的,Function3代表的是三个参数的函数类型。
2.FlowCollector:上游与下流之间的桥梁
Flow创立的时分结构了一个SafeFlow
目标,间接完成了Flow
接口,因而能够直接调用collect()
停止操作符,然后获取到Flow
中的emit
发送出来的数据。
这儿其实就验证了我上面的那个猜想:下流collect
的调用会触发上游Lambda中的emit()
函数的履行,上游的emit
将数据传递给下流的emit()
。
那么上游和下流又是怎么进行衔接的呢?
先来看一段源码
public interface Flow<out T> {
public suspend fun collect(collector: FlowCollector<T>)
}
public fun interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
在Flow的结构进程中结构了SafeFlow
目标而且间接的完成了Flow,Flow中的collect
便是停止操作,而collect
函数中的参数FlowCollector
中的emit()
函数则是下流用来接纳上游发送数据的emit()
。
这儿再来回忆一下SafeCollector
办法中所做的事情:首先collect
停止符的调用会触发上游Lambda中的emit()
函数履行,它将数据发送出去,然后进入到SafeCollector
中的emit()
函数,在这个函数中又将从上游数据发送到下流collect
中的emitFun()
函数中,这样就完成了衔接。所以说FlowCollector
是上下流之间的桥梁。
3.中心操作符
在前面剖析Kotlin协程—Flow时咱们知道在Flow的上下流之间还能够添加一些操作符(中转站)的。
这儿我用之前的代码进行剖析:
fun flowTest() = runBlocking {
flow {
emit(0)
emit(1)
emit(2)
emit(3)
emit(4)
}.filter { //中转站①
it > 2
}.map { //中转站②
it * 2
}.collect { //接纳
println("it:$it")
}
}
//输出成果:
//it:6
//it:8
由于上面咱们现已知道了上下流之间调用进程,所以这儿我先用一张图来表示有了中心操作符的进程。
从图中能够看到当Flow中呈现中心操作符的时分,上下流之间就会多出2个中转站,关于每一个中转站都会有上游和下流,而且都是被下流触发履行,也会触发自己的上游,一起还会接纳来自上游的数据并传递给自己的下流。为什么会是这样一个流程,咱们对中心操作符进行剖析:
- filter
//1
public inline fun <T> Flow<T>.filter(
crossinline predicate: suspend (T) -> Boolean
): Flow<T> = transform { value ->
//8
if (predicate(value)) return@transform emit(value)
}
//2
internal inline fun <T, R> Flow<T>.unsafeTransform(
@BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = unsafeFlow {
//6
collect { value ->
//7
return@collect transform(value)
}
}
//3
internal inline fun <T> unsafeFlow(
@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit
): Flow<T> {
//4
return object : Flow<T> {
//5
override suspend fun collect(collector: FlowCollector<T>) {
collector.block()
}
}
}
-
- 注释1、2、3:返回值均为Flow,也便是说
Flow.filter
的返回值也是Flow
,这个进程只是一个Flow的再次被封装的进程; - 注释4:这儿会变成一个普通的Flow匿名内部类目标;
- 注释5:这儿应该比较熟悉了,完好代码应该是
flow{}.filter{}.collect{}
,依据之前的剖析很容易知道这便是触发上游Lambda代码的履行,也便是履行注释6、注释7; - 注释6:
collect{}
,这儿是在调用上游 Flow 的collect{}
,触发上游的 Lambda 履行了,也便是注释5触发的Lambda的履行,然后注释 7 就会被履行; - 注释7:这儿的
transform
中的额value
便是上游传递下来的值,至于怎么传递下来的就要看注释8了; - 注释8:首先这儿有一个条件
if (predicate(value))
,这个判别便是filter
传入的it > 2
这个条件,符合这个条件的才会持续履行,也便是经过emit()
函数向下流传递数据。
- 注释1、2、3:返回值均为Flow,也便是说
- map
public inline fun <T, R> Flow<T>.map(
crossinline transform: suspend (value: T) -> R
): Flow<R> = transform { value ->
return@transform emit(transform(value))
}
internal inline fun <T, R> Flow<T>.unsafeTransform(
@BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal use
collect { value ->
// kludge, without it Unit will be returned and TCE won't kick in, KT-28938
return@collect transform(value)
}
}
internal inline fun <T> unsafeFlow(
@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit
): Flow<T> {
return object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
collector.block()
}
}
}
-
- 能够看到
map
和前面的filter
是如出一辙的流程。
- 能够看到
4.上下文维护
在前面剖析Kotlin协程—Flow时有提到withContext
,当时仅仅是提到了运用withContext
会使代码变得丑陋,其实还有另一层原因——如果调用withContext
改变协程上下文的话,Flow上游与下流的协程上下文就会变得不一致。在默许情况下Flow下流的协程上下文终究会成为上游的履行环境,也会变成中心操作符的履行环境,所以才让Flow本身就支持协程的「结构化并发」的特性,例如结构化撤销。而withContext
参加会使Flow上游与下流的协程上下文变得不一致,它们的整体结构也会被损坏,然后导致「结构化并发」的特性被损坏,所以不要容易运用withContext
,而Flow本身是带有上下文维护的。
Flow 源码中关于上下文的检测,称之为上下文维护,前面剖析SafeCollector
时没有深化checkContext
剖析,这儿持续来剖析一下,上下文维护是怎样的流程。
private fun emit(uCont: Continuation<Unit>, value: T): Any? {
val currentContext = uCont.context
currentContext.ensureActive()
val previousContext = lastEmissionContext
if (previousContext !== currentContext) {
checkContext(currentContext, previousContext, value)
}
completion = uCont
return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
}
internal fun SafeCollector<*>.checkContext(currentContext: CoroutineContext) {
val result = currentContext.fold(0) fold@{ count, element ->
val key = element.key
val collectElement = collectContext[key]
if (key !== Job) {
return@fold if (element !== collectElement) Int.MIN_VALUE
else count + 1
}
val collectJob = collectElement as Job?
val emissionParentJob = (element as Job).transitiveCoroutineParent(collectJob)
if (emissionParentJob !== collectJob) {
error(
"Flow invariant is violated:\n" +
"\t\tEmission from another coroutine is detected.\n" +
"\t\tChild of $emissionParentJob, expected child of $collectJob.\n" +
"\t\tFlowCollector is not thread-safe and concurrent emissions are prohibited.\n" +
"\t\tTo mitigate this restriction please use 'channelFlow' builder instead of 'flow'"
)
}
if (collectJob == null) count else count + 1
}
//判别上游、下流的Context
if (result != collectContextSize) {
error(
"Flow invariant is violated:\n" +
"\t\tFlow was collected in $collectContext,\n" +
"\t\tbut emission happened in $currentContext.\n" +
"\t\tPlease refer to 'flow' documentation or use 'flowOn' instead"
)
}
}
所以,总的来说,Flow 不允许直接运用 withContext{} 的原因,是为了“结构化并发”,它并不是不允许切换线程,而是不允许随意损坏协程的上下文。Kotlin 提供的操作符 flowOn{},官方现已帮咱们处理好了上下文的问题,所以咱们能够放心地切换线程。
5.总结:
1.Flow的调用进程分为三个过程:
- 上游的Flow创立
SafeFlow
目标,下流的Flow的collect()
函数触发上游的emit()
函数履行开始发送数据; - 上游的
emit()
发送的数据进入到SafeCollector
,其实上游的emit()
函数调用的便是SafeCollector
中的emit()
函数; - 在
SafeCollector
中调用emitFun()
其实便是调用了下流的emit()
函数将数据传递给下流。
2.中心操作符:
如果有中心操作符的话,每个操作符都会有上游和下流,而且都是被下流触发履行,也会触发自己的上游,一起还会接纳来自上游的数据并传递给自己的下流;
3.上下文维护:
Flow的上游和中心操作符并不需要协程作用域,所以他们都是共用Flow下流的协程上下文,也正是由于这种规划所以Flow具有天然的『结构化并发』的特色,因而Kotlin官方也限制了开发者不能随意在上游与中转站阶段改变Flow的上下文。