本文首要剖析了冷流 和 暖流 的相关完成原理,原理逻辑长而杂乱。特别是触及暖流 SharedFlow 相关完成原理时,逻辑更是笼统,了解比较困难。本文比较长,建议依据目录挑选分段阅览,能够先看根底概念和冷流,再分别看暖流 SharedFlow 和 StateFlow 。
阅览本文时,能够带着以下问题去思考:
- 冷流和暖流 指的是什么?
- 在事务开发中,冷流和暖流能够用来做什么或许解决什么问题?
- 冷流和暖流 的差异是什么?
- 冷流的履行原理是什么?
- 暖流 SharedFlow 对它发射的数据是怎么办理的?
- 暖流 SharedFlow 对它的订阅者是怎么办理的?
- 暖流 StateFlow 和 LiveData 有什么差异?
技术都是为事务服务的,不管是冷流仍是暖流,它们都需求解决事务开发中实践的问题,比如:
- 协程和冷流能够替换
RxJava
结构进行呼应式编程,在Kotlin 项目中,运用协程和冷流比运用 RxJava 更有优势; - 暖流 SharedFlow 能够用来做事情总线,替换
EventBus
; - 暖流 StateFlow 能够用来做事情状况更新,替换
LiveData
,并结合MVI
替换MVVM
。
假如本文中出现错误,会及时纠正。欢迎阅览。
根底概念
从上一篇文章:Kotlin Flow 探索,知道 Kotlin Flow 便是 Kotlin 数据流,而数据流需求包含供给方(出产者),中介(中心操作),运用方(顾客):
- 供给方(出产者):源数据,将数据增加到数据流中;
- 中介(中心操作):能够修正发送到数据流的值,或修正数据流本身;
- 运用方(顾客):成果数据,运用数据流中的值。
flow
.operator1()
.operator2()
.operator3()
.collect(consumer)
创立一个数据流,能够运用 Kotlin 扩展函数 flowOf
,asFlow
, flow{}
:
flowOf(1, 2, 3).map { it * it }.collect {}
(1..3).asFlow().map { it % 2 == 0 }.collect {}
flow<Int> {
emit(1)
emit(2)
emit(3)
}.map { it * 2 }.collect {}
在上面创立数据流的方式中,有必要要有运用方(顾客),也便是搜集器 collect{}
时,中心操作才会履行,这一点和 Kotlin Sequences
一样。
现在,除了上面的创立数据流方式,还能够运用 SharedFlow
和 StateFlow
:
class TestFlow {
private val _sharedFlow = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)
val sharedFlow: SharedFlow<Int> = _sharedFlow
fun testSharedFlow() {
MainScope().launch {
Log.e("Flow", "sharedFlow:emit 1")
_sharedFlow.emit(1)
Log.e("Flow", "sharedFlow:emit 2")
_sharedFlow.emit(2)
}
}
private val _stateFlow = MutableStateFlow<Int>(value = 1)
val stateFlow: SharedFlow<Int> = _stateFlow
fun testStateFlow() {
MainScope().launch {
_stateFlow.value = 1
}
}
}
在运用SharedFlow
和 StateFlow
时创立数据流时,能够没有或许有多个搜集器 collect{}
,它独立于搜集器 collect{}
存在,而且不会被asList
,asSet
等顾客中止。
testFlow.testSharedFlow()
testFlow.testStateFlow()
控制台输出成果:
Flow com.wangjiang.example E sharedFlow:emit 1
Flow com.wangjiang.example E sharedFlow:emit 2
Flow com.wangjiang.example E stateFlow:value 1
能够看到,没有搜集器 collect{}
的时分,ShareFlow 和 StateFlow 仍是履行了。下面增加搜集器 collect{}
,再看一下:
lifecycleScope.launch {
testFlow.sharedFlow.collect {
Log.e("Flow", "SharedFlow Collect1: value=$it")
}
}
lifecycleScope.launch {
testFlow.sharedFlow.collect {
Log.e("Flow", "SharedFlow Collect2: value=$it")
}
}
testFlow.testSharedFlow()
lifecycleScope.launch {
testFlow.stateFlow.collect {
Log.e("Flow", "StateFlow Collect1: value=$it")
}
}
lifecycleScope.launch {
testFlow.stateFlow.collect {
Log.e("Flow", "StateFlow Collect2: value=$it")
}
}
testFlow.testStateFlow()
控制台输出成果:
Flow com.wangjiang.example E StateFlow Collect1: value=1
Flow com.wangjiang.example E StateFlow Collect2: value=1
Flow com.wangjiang.example E sharedFlow:emit 1
Flow com.wangjiang.example E SharedFlow Collect1: value=1
Flow com.wangjiang.example E SharedFlow Collect2: value=1
Flow com.wangjiang.example E sharedFlow:emit 2
Flow com.wangjiang.example E SharedFlow Collect1: value=2
Flow com.wangjiang.example E SharedFlow Collect2: value=2
关于SharedFlow
,它相似事情总线,将事情分发给事情订阅者,同享事情。关于StateFlow
,它相似 LiveData,更新事情最新状况,告知订阅者事情的更新。
现在关于冷流和暖流能够简略区分为:将运用 flowOf
,asFlow
, flow{}
等创立的数据流称为冷流,也便是运用 : Flow<T>
创立的数据流,它不能独立于搜集器 collect{}
存在,且每个数据流需求搜集器 collect{}
才干称为一个完好的数据流;将运用: SharedFlow<T>
或 : StateFlow<T>
创立的数据流称为暖流,它能独立于搜集器 collect{}
存在,能够没有或多个搜集器 collect{}
。
冷流
Flow 是一种相似于序列的冷流 — 这段 flow 构建器中的代码直到流被搜集的时分才运转。
Flow 和序列一样,需求有完毕操作符,也便是有搜集器 collect{}
或 asList
,asSet
等操作的时分,才运转:
lifecycleScope.launch {
val flow = flow {
Log.e("Flow", "emit:1")
emit(1)
Log.e("Flow", "emit:2")
emit(2)
}.map {
Log.e("Flow", "map:$it")
it * it
}
flow.collect {
Log.e("Flow", "collect:$it")
}
}
控制台输出成果:
Flow com.wangjiang.example E emit:1
Flow com.wangjiang.example E map:1
Flow com.wangjiang.example E collect:1
Flow com.wangjiang.example E emit:2
Flow com.wangjiang.example E map:2
Flow com.wangjiang.example E collect:4
当运用 collect{}
时,开端履行数据出产:发射值 emit
,再履行中心操作:map
改换,再履行数据消费:collect
。在整个过程中,数据流是按照时间顺序产生的,也便是emit:1
→map:1
→collect:1
,emit:2
→map:2
→collect:4
,而不是emit:1
→emit:2
,map:1
→map:2
,collect:1
→collect:4
。
下面从一个示例来简略看下冷流的履行原理 :
class TestFlow {
fun testColdFlow() {
MainScope().launch {
flow<Int> { emit(1) }.map { it * it }.collect {
Log.e("Flow", "testColdFlow", Throwable())
}
}
}
}
运转 testColdFlow
办法,控制台输出 collect
办法调用栈信息:
Flow com.wangjiang.example E testColdFlow
java.lang.Throwable
at com.wangjiang.example.flow.TestFlow$testColdFlow$1$3.emit(TestFlow.kt:46)
at com.wangjiang.example.flow.TestFlow$testColdFlow$1$3.emit(TestFlow.kt:45)
at com.wangjiang.example.flow.TestFlow$testColdFlow$1$invokeSuspend$$inlined$map$1$2.emit(Emitters.kt:224)
at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt:15)
at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke(SafeCollector.kt:15)
at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:87)
at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:66)
at com.wangjiang.example.flow.TestFlow$testColdFlow$1$1.invokeSuspend(TestFlow.kt:45)
at com.wangjiang.example.flow.TestFlow$testColdFlow$1$1.invoke(Unknown Source:14)
at com.wangjiang.example.flow.TestFlow$testColdFlow$1$1.invoke(Unknown Source:4)
at kotlinx.coroutines.flow.SafeFlow.collectSafely(Builders.kt:61)
at kotlinx.coroutines.flow.AbstractFlow.collect(Flow.kt:230)
at com.wangjiang.example.flow.TestFlow$testColdFlow$1$invokeSuspend$$inlined$map$1.collect(SafeCollector.common.kt:113)
at com.wangjiang.example.flow.TestFlow$testColdFlow$1.invokeSuspend(TestFlow.kt:45)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at android.os.Handler.handleCallback(Handler.java:900)
at android.os.Handler.dispatchMessage(Handler.java:103)
at android.os.Looper.loop(Looper.java:219)
at android.app.ActivityThread.main(ActivityThread.java:8668)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:513)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:1109)
从上面办法调用栈信息,能够看出大致顺序是:collect
→AbstractFlow.collect
→SafeFlow.collectSafely
→map
→SafeCollector.emit
→emit
→TestFlow$testColdFlow$1$3.emit
。(上面的日志和源码对应不上,这儿能够经过 AndroidStudio 查看 kotlin 的 class 文件。)
这儿重点来了解一下中心操作 map 改换:
import kotlinx.coroutines.flow.unsafeTransform as transform
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
return@transform emit(transform(value))
}
internal inline fun <T, R> Flow<T>.unsafeTransform(
@BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal use
collect { value ->
// kludge, without it Unit will be returned and TCE won't kick in, KT-28938
return@collect transform(value)
}
}
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
return object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
collector.block()
}
}
}
从上面代码剖析,map 流程是:map
→ transform
→ unsafeTransform
→ unsafeFlow { }
→ collect {}
→ 履行上一个 flow
→ 拿到上一个 flow 的成果
→ @collect transform(value)
→ @transform emit(transform(value))
→ map改换操作成果
→ 到下一个 flow 或 顾客 collect
。
看到这个流程,没错,flow 冷流的履行流程是与 Kotin Sequence 履行流程的原理是相似的。
所以整个冷流的触发流程是能够简略概括为:顾客 collect 触发 中心操作,中心操作 filter, map 改换等 触宣布产者,然后出产者 emit 出产数据,然后将数据交给中心操作改换,最终再将改换后的数据交给顾客。这也便是冷流履行的原理。有点相似,从下往上触发,再从上往下活动的感觉。
事务场景
关于 : Flow<T>
的事务场景,与运用 RxJava
相似,用于呼应式编程,例如:
GlobalScope.launch(Dispatchers.Main) {
flowOf(bitmap).map { bmp ->
//在子线程中履行耗时操作,存储 bitmap 到本地
saveBitmap(bmp)
}.flowOn(Dispatchers.IO).collect { bitmapLocalPath ->
//在主线程中处理存储 bitmap 后的本地路地址
}
}
在 Kotlin 项目中,能够运用 协程和冷流 替换 RxJava 来做呼应式编程。
总结
冷流需求有数据出产者、0或多个中心操作、数据顾客才干一同构建成为一个完好的流。它的履行原理相似 Kotin Sequence,当有顾客 collect 或其它终端操作时,流开端从下往上触发,然后从上往下活动。
暖流
暖流分为 SharedFlow 和 StateFlow,它们都是独立于搜集器的存在而存在。
SharedFlow
SharedFlow ,顾名思义,它被称作为暖流,首要在于它能让一切搜集器同享它所宣布的值,其间同享的方式是播送,且它的实例能够独立于搜集器的存在而存在。要了解 ShredFlow,就要了解它同享和独立存在的意义。
下面将从 SharedFlow 的创立,发摄 和 搜集来剖析。
创立
创立一个 SharedFlow 运用的是 SharedFlowKt 供给的 MutableSharedFlow
结构办法:
private val _sharedFlow = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverflow.SUSPEND
)
val sharedFlow: SharedFlow<Int> = _sharedFlow
参数意义:
-
replay
:新订阅者订阅时,从头发送多少个之前已宣布的值给新订阅者(相似粘性数据); -
extraBufferCapacity
:除了 replay 外,缓存的值数量,当缓存空间还有值时,emit 不会 suspend(emit 过快,collect 过慢,emit 的数据将被缓存起来); -
onBufferOverflow
:指定缓存区中已存满要发送的数据项时的处理战略(缓存区巨细由 replay 和 extraBufferCapacity 一同决议)。默认值为 BufferOverflow.SUSPEND,还能够是 BufferOverflow.DROP_LATEST 或 BufferOverflow.DROP_OLDEST(顾名思义)。
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
//.....省掉
//缓存的值数量
val bufferCapacity0 = replay + extraBufferCapacity
val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0
return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}
internal open class SharedFlowImpl<T>(
private val replay: Int,
private val bufferCapacity: Int,
private val onBufferOverflow: BufferOverflow
) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
//.....省掉
}
MutableSharedFlow 结构办法回来的是一个 SharedFlowImpl
实例,下面来看一下关于 SharedFlowImpl 类相关的类和接口的简略关系:
从上往下,各个类或接口的职责是:
-
Flow 接口
:用于流的消费操作,也便是订阅者订阅 collect,它供给public suspend fun collect(collector: FlowCollector<T>)
接口办法,办法参数 collecter 便是冷流或暖流的搜集器,所以 Flow 接口依赖于 FlowCollecter 接口; -
FlowCollecter 接口
:用于流的搜集,能够是流的完毕操作或中心操作,它供给public suspend fun emit(value: T)
办法,办法参数 value 便是 数据出产者或许中心操作 emit 发送的值; -
SharedFlow 接口
:承继 Flow 接口,并界说了public val replayCache: List<T>
特色,它表明给新订阅者的(replay个数)值缓存快照; -
MutableSharedFlow 接口
:承继 SharedFlow 和 FlowCollecter 接口,那么它就能够collect(collecter: FlowCollecter<T>)
也能够emit(value: T)
; -
CancellableFlow 接口
:承继 Flow 接口,是一个空接口,首要标记这个 Flow 是能够撤销的,也便是 SharedFlow 是能够撤销的; -
CancellableFlowImpl 类
:完成了 Flow 接口的collect(collector: FlowCollector<T>)
办法; -
FusibleFlow 接口
:与 BufferOverflow 和 flowOn 操作结合起来一同作业; -
AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> 笼统类
:担任对订阅者的办理,接纳AbstractSharedFlowSlot
,承继了SynchronizedObject
类; -
SynchronizedObject 类
:协程内部担任供给加锁synchronized(lock: SynchronizedObject, block: () -> T)
办法参数 的 lock 锁目标; -
AbstractSharedFlowSlot<SharedFlowImpl<*>> 笼统类
:界说了fun allocateLocked(flow: F): Boolean
和fun freeLocked(flow: F): Array<Continuation<Unit>?>
办法,分别表明订阅者相关的 SharedFlowSlot 的申请和开释; -
SharedFlowSlot 类
:承继 AbstractSharedFlowSlot 类,完成了allocateLocked
和freeLocked
笼统办法,并界说了 特色var index = -1L
和var cont: Continuation<Unit>? = null
,其间 index 表明将要在处理的数据在缓存数组中的索引,cont 表明用来保存等候新数据发送的订阅者的续体(包装订阅者); -
BufferOverflow 枚举类
:流中的缓存区溢出处理战略,枚举值SUSPEND
表明发送或发送值的上游在缓存区已满时挂起,枚举值DROP_OLDEST
表明溢出时删除缓存区中最旧的值,将新值增加到缓存区,不要挂起,枚举值DROP_LATEST
表明在缓存区溢出时删除当时增加到缓存区的最新值(以便缓存区内容坚持不变),不要挂起; -
SharedFlowImpl 类
:真实的 SharedFlow 完成类,承继了AbstractSharedFlow<SharedFlowSlot>
笼统类 和 完成了MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T>
接口。
从上面的信息,创立一个 SharedFlow 后,它供给的才能,能够简略综合概括为:能够运用 emit
发射数据,发射时触及数据缓存,缓存溢出战略,是否会被挂起等问题;也能够运用 collect
订阅,订阅时触及订阅者的办理,数据获取,是否会被挂起等问题。
发射
SharedFlowImpl
类完成了 FlowCollector
接口的 emit
办法,当调用 emit 办法时:
- 假如有订阅者正在
collect
当时 SharedFlow,且 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,此调用才可能会被挂起; - 假如没有订阅者正在
collect
当时 SharedFlow,则不会运用缓存区。假如 replay != 0,则最近宣布的值会简略地存储到 replay 缓存中,并替换 replay 缓存中旧的元素;假如 replay=0,则将最近宣布的值丢弃; - emit 办法是被
suspend
修饰的,与它相关的还有一个 非suspend
修饰的办法:tryEmit
; - 此办法是线程安全的,能够在没有外部同步的情况下从并发协程中安全地调用。
调用 emit 办法可能会被挂起
首先,看下emit
办法的完成:
override suspend fun emit(value: T) {
if (tryEmit(value)) return // fast-path
emitSuspend(value)
}
emit 办法会不会被挂起,首要取决于 tryEmit(value)
办法回来值,假如回来 true,那么就不会履行 emitSuspend(value)
,也便是不会被挂起,不然履行 emitSuspend(value)
,emit 会被挂起。
下面先看一下 emit 办法 不会挂起和会挂起的比如:
不会挂起,调用的是 tryEmit 办法
class TestFlow {
private val _sharedFlow = MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.SUSPEND
)
val sharedFlow: SharedFlow<Int> = _sharedFlow
fun testSharedFlow() {
MainScope().launch {
Log.e("Flow", "sharedFlow:emit 1")
_sharedFlow.emit(1)
}
}
}
lifecycleScope.launch {
testFlow.sharedFlow.collect(object : FlowCollector<Int> {
override suspend fun emit(value: Int) {
Log.e("Flow", "SharedFlow Collect: value=$value", Throwable())
}
})
}
控制台输出日志:
Flow com.wangjiang.example E sharedFlow:emit 1
Flow com.wangjiang.example E SharedFlow Collect: value=1
java.lang.Throwable
at com.wangjiang.example.fragment.TestFlowFragment$initView$5$1.emit(TestFlowFragment.kt:76)
at com.wangjiang.example.fragment.TestFlowFragment$initView$5$1.emit(TestFlowFragment.kt:174)
at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl(SharedFlow.kt:383)
at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend(Unknown Source:15)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:234)
at kotlinx.coroutines.DispatchedTaskKt.resumeUnconfined(DispatchedTask.kt:190)
at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:161)
at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397)
at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:431)
at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$default(CancellableContinuationImpl.kt:420)
at kotlinx.coroutines.CancellableContinuationImpl.resumeWith(CancellableContinuationImpl.kt:328)
at kotlinx.coroutines.flow.SharedFlowImpl.tryEmit(SharedFlow.kt:400)
at kotlinx.coroutines.flow.SharedFlowImpl.emit$suspendImpl(SharedFlow.kt:405)
at kotlinx.coroutines.flow.SharedFlowImpl.emit(Unknown Source:0)
at com.wangjiang.example.flow.TestFlow$testSharedFlow$1.invokeSuspend(TestFlow.kt:20)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at android.os.Handler.handleCallback(Handler.java:900)
at android.os.Handler.dispatchMessage(Handler.java:103)
at android.os.Looper.loop(Looper.java:219)
at android.app.ActivityThread.main(ActivityThread.java:8668)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:513)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:1109)
将上面 MutableSharedFlow 结构办法中的 extraBufferCapacity = 1
修正为 extraBufferCapacity = 0
,其它坚持不变:
会挂起,调用的是 emitSuspend 办法
控制台输出日志:
Flow com.wangjiang.example E sharedFlow:emit 1
Flow com.wangjiang.example E SharedFlow Collect: value=1
java.lang.Throwable
at com.wangjiang.example.fragment.TestFlowFragment$initView$5$1.emit(TestFlowFragment.kt:76)
at com.wangjiang.example.fragment.TestFlowFragment$initView$5$1.emit(TestFlowFragment.kt:174)
at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl(SharedFlow.kt:383)
at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend(Unknown Source:15)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:234)
at kotlinx.coroutines.DispatchedTaskKt.resumeUnconfined(DispatchedTask.kt:190)
at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:161)
at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397)
at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:431)
at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$default(CancellableContinuationImpl.kt:420)
at kotlinx.coroutines.CancellableContinuationImpl.resumeWith(CancellableContinuationImpl.kt:328)
at kotlinx.coroutines.flow.SharedFlowImpl.emitSuspend(SharedFlow.kt:504)
at kotlinx.coroutines.flow.SharedFlowImpl.emit$suspendImpl(SharedFlow.kt:406)
at kotlinx.coroutines.flow.SharedFlowImpl.emit(Unknown Source:0)
at com.wangjiang.example.flow.TestFlow$testSharedFlow$1.invokeSuspend(TestFlow.kt:20)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at android.os.Handler.handleCallback(Handler.java:900)
at android.os.Handler.dispatchMessage(Handler.java:103)
at android.os.Looper.loop(Looper.java:219)
at android.app.ActivityThread.main(ActivityThread.java:8668)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:513)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:1109)
上面两份日志差异在于:
-
emit
→suspendImpl
→tryEmit
→CancellableContinuationImpl.resumeWith
→DispatchedTaskKt.resume
→TestFlowFragment$initView$5$1.emit
-
emit
→suspendImpl
→emitSuspend
→CancellableContinuationImpl.resumeWith
→DispatchedTaskKt.resume
→TestFlowFragment$initView$5$1.emit
从输出日志对比,当 onBufferOverflow
战略为BufferOverflow.SUSPEND
时,假如缓存空间extraBufferCapacity
有值,emit 不会被挂起,不然会被挂起。所以,现在能够综合猜测 onBufferOverflow
和 extraBufferCapacity
的值会影响 tryEmit
办法的回来值。
override fun tryEmit(value: T): Boolean {
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
val emitted = synchronized(this) {
if (tryEmitLocked(value)) {
resumes = findSlotsToResumeLocked(resumes)
true
} else {
false
}
}
for (cont in resumes) cont?.resume(Unit)
return emitted
}
private fun tryEmitLocked(value: T): Boolean {
// Fast path without collectors -> no buffering
if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) // always returns true
// With collectors we'll have to buffer
// 假如缓存已满且订阅者消费慢时,不能直接给订阅者值
if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
when (onBufferOverflow) {
BufferOverflow.SUSPEND -> return false // will suspend
BufferOverflow.DROP_LATEST -> return true // just drop incoming
BufferOverflow.DROP_OLDEST -> {} // force enqueue & drop oldest instead
}
}
enqueueLocked(value)
bufferSize++ // value was added to buffer
// drop oldest from the buffer if it became more than bufferCapacity
if (bufferSize > bufferCapacity) dropOldestLocked()
// keep replaySize not larger that needed
if (replaySize > replay) { // increment replayIndex by one
updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
}
return true
}
tryEmit 办法的回来值取决于 emitted
, emitted 的值又取决于 tryEmitLocked
办法的回来值。tryEmitLocked 的回来值是否为 false 取决于:
if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
when (onBufferOverflow) {
BufferOverflow.SUSPEND -> return false // will suspend
BufferOverflow.DROP_LATEST -> return true // just drop incoming
BufferOverflow.DROP_OLDEST -> {} // force enqueue & drop oldest instead
}
}
字段: bufferSize
,bufferCapacity
,minCollectorIndex
,replayIndex
,它们都是 SharedFlowImp 的全局变量。
private class SharedFlowImpl<T>(
private val replay: Int, //新订阅者订阅时,从头发送多少个之前已宣布的值给新订阅者
private val bufferCapacity: Int, // replay+extraBufferCapacity,缓存容量
private val onBufferOverflow: BufferOverflow //缓存溢出战略
) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
/*
Logical structure of the buffer 缓存逻辑结构
buffered values
/-----------------------\
replayCache queued emitters
/----------\/----------------------\
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
| | 1 | 2 | 3 | 4 | 5 | 6 | E | E | E | E | E | E | | | |
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
^ ^ ^ ^
| | | |
head | head + bufferSize head + totalSize
| | |
index of the slowest | index of the fastest
possible collector | possible collector
| |
| replayIndex == new collector's index
\---------------------- /
range of possible minCollectorIndex
head == minOf(minCollectorIndex, replayIndex) // by definition
totalSize == bufferSize + queueSize // by definition
INVARIANTS:
minCollectorIndex = activeSlots.minOf { it.index } ?: (head + bufferSize)
replayIndex <= head + bufferSize
*/
// Stored state 缓存存储状况
private var buffer: Array<Any?>? = null // 缓存数组,用于保存 emit 发送的数据
private var replayIndex = 0L // 新订阅者从 replayCache 中获取数据的开端方位
private var minCollectorIndex = 0L // 当时活跃订阅者从缓存数组中获取数据时,对应的方位最小索引
private var bufferSize = 0 // 缓存数组中 buffered values 的巨细
private var queueSize = 0 // 缓存数组中 queued emitters 的巨细
// Computed state 缓存计算状况
private val head: Long get() = minOf(minCollectorIndex, replayIndex) // 缓存数组的开端方位
private val replaySize: Int get() = (head + bufferSize - replayIndex).toInt() // 缓存数组中 replay 的巨细
private val totalSize: Int get() = bufferSize + queueSize // 缓存数组中现已缓存的数据数量
private val bufferEndIndex: Long get() = head + bufferSize// 缓存数组中 buffered values 的完毕方位的后一位索引,也便是 queued emitters 的开端方位
private val queueEndIndex: Long get() = head + bufferSize + queueSize 缓存数组中 queued emitters 的完毕方位的后一位索引
从上面 SharedFlowImpl 中的缓存逻辑结构,再结合:
MutableSharedFlow<Int>(
replay = 0,
extraBufferCapacity = 1 或 0,
onBufferOverflow = BufferOverflow.SUSPEND
)
当 extraBufferCapacity = 1 ,调用 emit 办法发射数据时,此刻 bufferSize=0,bufferCapacity=1,minCollectorIndex=0,replayIndex=0,所以 bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex
为 false,所以 tryEmitLocked 回来 true, tryEmit 回来 true, emit 不会被挂起。
当 extraBufferCapacity = 0 ,调用 emit 办法发射数据时,此刻 bufferSize=0,bufferCapacity=0,minCollectorIndex=0,replayIndex=0,所以 bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex
为 true,又由于 onBufferOverflow=BufferOverflow.SUSPEND
,所以 tryEmitLocked 回来 false, tryEmit 回来 false, 所以会履行 emitSuspend,emit 会被挂起。
这便是调用 emit 办法可能会被挂起原因。其实,满足bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex
判读条件便是缓存区溢出,这时需求挑选处理战略,是 BufferOverflow.SUSPEND,仍是 BufferOverflow.DROP_LATEST,仍是 BufferOverflow.DROP_OLDEST。
缓存区
emit 发送的值存储在缓存数组 buffer
中:
private var buffer: Array<Any?>? = null // 缓存数组,用于保存 emit 发送的数据
buffer 包含: buffered values
和 queued emitters
。
其间 buffered values
中存储的是 emit(value) 办法中的 value 值,buffered values 的巨细取决于 bufferCapacity=replay + extraBufferCapacity
,replay 和 extraBufferCapacity 便是创立 MutableSharedFlow( replay: Int = 0, extraBufferCapacity: Int = 0, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND )
传入的值,所以 buffered values 也分为两部分:replay 和 extraBufferCapacity。
其间 queued emitters
中存储的是 emit(value) 办法中的 value 被包装成的 Emitter
值:
private suspend fun emitSuspend(value: T) = suspendCancellableCoroutine<Unit> sc@{ cont ->
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
val emitter = synchronized(this) lock@{
// recheck buffer under lock again (make sure it is really full)
if (tryEmitLocked(value)) {
cont.resume(Unit)
resumes = findSlotsToResumeLocked(resumes)
return@lock null
}
// add suspended emitter to the buffer
// 将 value 包装成 Emitter 目标存储到 buffer 中,存储方位为 queued emitters 的开端完毕方位规模
Emitter(this, head + totalSize, value, cont).also {
enqueueLocked(it)
queueSize++ // added to queue of waiting emitters
// synchronous shared flow might rendezvous with waiting emitter
if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
}
}
// outside of the lock: register dispose on cancellation
emitter?.let { cont.disposeOnCancellation(it) }
// outside of the lock: resume slots if needed
for (r in resumes) r?.resume(Unit)
}
private class Emitter(
@JvmField val flow: SharedFlowImpl<*>,
@JvmField var index: Long,
@JvmField val value: Any?,
@JvmField val cont: Continuation<Unit>
) : DisposableHandle {
override fun dispose() = flow.cancelEmitter(this)
}
在 emit 挂起时(缓存区 buffered values 已满或巨细为0),才存储 Emitter 值到 buffer
中。
现在存储值到 buffer 中,有下面几条链路:
-
emit
→tryEmit
→tryEmitLocked
→tryEmitNoCollectorsLocked
→enqueueLocked
-
emit
→tryEmit
→tryEmitLocked
→enqueueLocked
-
emit
→emitSuspend
→tryEmitLocked
→tryEmitNoCollectorsLocked
→enqueueLocked
-
emit
→emitSuspend
→tryEmit
→tryEmitLocked
→enqueueLocked
-
emit
→emitSuspend
→Emitter
→enqueueLocked
// enqueues item to buffer array, caller shall increment either bufferSize or queueSize
private fun enqueueLocked(item: Any?) {
val curSize = totalSize
val buffer = when (val curBuffer = buffer) {
// 创立一个巨细为 2 的缓存数组
null -> growBuffer(null, 0, 2)
// 假如当时缓存数据现已存满,则扩容,扩大为本来的 2 倍
else -> if (curSize >= curBuffer.size) growBuffer(curBuffer, curSize,curBuffer.size * 2) else curBuffer
}
buffer.setBufferAt(head + curSize, item)
}
没有订阅者正在 collect 当时 SharedFlow
当没有订阅者正在 collect
当时 SharedFlow,存储值到 buffer 中,会测验走链路:
-
emit
→tryEmit
→tryEmitLocked
→tryEmitNoCollectorsLocked
→enqueueLocked
-
emit
→emitSuspend
→tryEmitLocked
→tryEmitNoCollectorsLocked
→enqueueLocked
private fun tryEmitLocked(value: T): Boolean {
// Fast path without collectors -> no buffering
if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) // always returns true
// ..... 省掉
return true
}
private fun tryEmitNoCollectorsLocked(value: T): Boolean {
assert { nCollectors == 0 }
if (replay == 0) return true // no need to replay, just forget it now
enqueueLocked(value) // enqueue to replayCache
bufferSize++ // value was added to buffer
// drop oldest from the buffer if it became more than replay
if (bufferSize > replay) dropOldestLocked()
minCollectorIndex = head + bufferSize // a default value (max allowed)
return true
}
此刻,假如又 replay=0,则不会运用缓存区。不然会将 emit(value) 中 value 存储到 buffer 数组中 buffered values 的 replayCache 开端完毕规模方位内。
replay 的值
当有订阅者正在 collect
当时 SharedFlow,此刻,假如 replay=0,extraBufferCapacity=0,则会测验走链路:
-
emit
→emitSuspend
→Emitter
→enqueueLocked
emit(value) 中 value值 包装成 Emitter 目标存储到 buffer 数组中的 queued emitters 开端完毕规模方位内。当没有订阅者时,emit(value) 中 value 就会被丢弃。
假如 replay!=0 或 extraBufferCapacity!=0,会测验走链路:
-
emit
→tryEmit
→tryEmitLocked
→enqueueLocked
-
emit
→emitSuspend
→tryEmit
→tryEmitLocked
→enqueueLocked
-
emit
→emitSuspend
→Emitter
→enqueueLocked
emit(value) 办法中的 value 值会被存储到 buffer 数组中 buffered values 或 queued emitters 开端完毕规模方位内,当存储到 buffered values 开端完毕规模方位内,replayCache 开端完毕规模方位内的值会更新,还会遭到缓存溢出战略 onBufferOverflow(BufferOverflow.SUSPEND,BufferOverflow.DROP_LATEST 或 BufferOverflow.DROP_OLDEST)影响。
搜集
依据上面发射的剖析,搜集便是从缓存区 buffer
中去取值,能够从 buffered values 区域直接取出 value 值,或能够从 queued emitters 取出 Emitter 目标拆解出 value 值。
SharedFlowImpl
类完成了 Flow
接口的 collect
办法:
override suspend fun collect(collector: FlowCollector<T>) {
// 分配一个 SharedFlowSlot
val slot = allocateSlot()
try {
// 假如订阅者是一个 SubscribedFlowCollector,则先告诉订阅者开端订阅
if (collector is SubscribedFlowCollector) collector.onSubscription()
// 当时订阅者地点协程
val collectorJob = currentCoroutineContext()[Job]
// 死循环
while (true) {
var newValue: Any?
// 死循环
while (true) {
// 经过分配的 slot 去从缓存区 buffer 获取值
newValue = tryTakeValue(slot) // attempt no-suspend fast path first
// 获取到值
if (newValue !== NO_VALUE) break
// 没有获取到值,订阅者地点协程会被挂起,等候 emit 发射新数据到缓存区
awaitValue(slot) // await signal that the new value is available
}
//承认订阅者地点协程是否还存活,假如不存活,会抛出 CancellationException 异常,直接到 finally
collectorJob?.ensureActive()
// 将新值给订阅者
collector.emit(newValue as T)
}
} finally {
// 订阅者不存活时,开释分配的 slot
freeSlot(slot)
}
}
订阅者订阅时首要的步骤是:
- 分配一个 SharedFlowSlot:
val slot = allocateSlot()
- 经过分配的 slot 去从缓存区 buffer 获取值:
newValue = tryTakeValue(slot)
;假如获取值成功,就直接进入下一步,不然订阅者地点协程会被挂起,等候 emit 发射新数据到缓存区:awaitValue(slot)
- 承认订阅者地点协程是否还存活,假如不存活,会抛出
CancellationException
异常,直接到 finally:collectorJob?.ensureActive()
- 将新值给订阅者:
collector.emit(newValue as T)
- 订阅者不存活时,开释分配的 slot:
freeSlot(slot)
下面剖析一下allocateSlot
, tryTakeValue(slot)
,awaitValue
和 freeSlot
。
allocateSlot
allocateSlot
办法界说在 SharedFlowImpl 承继的 AbstractSharedFlow
笼统类中:
@Suppress("UNCHECKED_CAST")
protected var slots: Array<S?>? = null // 用于办理给订阅者分配的 slot
private set
protected var nCollectors = 0 // 还存活的订阅者数量
private set
private var nextIndex = 0 // 分配下一个 slot 目标在 slots 数组中的索引
private var _subscriptionCount: MutableStateFlow<Int>? = null // 用一个 StateFlow 来记载订阅者数量
protected fun allocateSlot(): S {
// Actually create slot under lock
var subscriptionCount: MutableStateFlow<Int>? = null
// 加锁
val slot = synchronized(this) {
// 获取一个 Array<SharedFlowSlot?> 目标
val slots = when (val curSlots = slots) {
// 新创立一个巨细为 2 的 Array<SharedFlowSlot?>
null -> createSlotArray(2).also { slots = it }
// 扩容,容量扩大为本来 Array<SharedFlowSlot?> 的 2 倍
else -> if (nCollectors >= curSlots.size) {
curSlots.copyOf(2 * curSlots.size).also { slots = it }
} else {
// 直接运用当时的 Array<SharedFlowSlot?>
curSlots
}
}
// 下面为从上面的 slots 数组中获取一个 slot 目标
var index = nextIndex
var slot: S
while (true) {
slot = slots[index] ?: createSlot().also { slots[index] = it }
index++
if (index >= slots.size) index = 0
// 给 slot 的特色 index 赋值,index 的值指向的缓存区 buffer 中的 index
if ((slot as AbstractSharedFlowSlot<Any>).allocateLocked(this)) break // break when found and allocated free slot
}
nextIndex = index
// 订阅者加1
nCollectors++
subscriptionCount = _subscriptionCount // retrieve under lock if initialized
slot
}
// 订阅数量加 1
subscriptionCount?.increment(1)
return slot
}
从上面的代码逻辑,这个办法的首要效果是:为订阅者分配一个 SharedFlowSlot 目标,该目标能够用来相关从缓存区 buffer 获取值的索引,也便是能够用来确定订阅者将要收到的值,以及将订阅者地点协程挂起,等候有新值发送到缓冲区 buffer。
关于 SharedFlowSlot
类:
private class SharedFlowSlot : AbstractSharedFlowSlot<SharedFlowImpl<*>>() {
@JvmField
var index = -1L // 指向缓冲区 buffer 中的索引,值为 -1 表明当时 slot 现已被开释
//......省掉
override fun allocateLocked(flow: SharedFlowImpl<*>): Boolean {
if (index >= 0) return false // not free
index = flow.updateNewCollectorIndexLocked()
return true
}
//......省掉
}
internal fun updateNewCollectorIndexLocked(): Long {
val index = replayIndex
if (index < minCollectorIndex) minCollectorIndex = index
return index
}
为订阅者分配的 slot 目标的变量 index
,它从缓存区 buffer 获取值的初始值索引为 replayIndex(index=replayIndex),也便是新订阅者从 replayCache 中的开端方位开端获取值。
tryTakeValue
tryTakeValue
办法的效果是:经过 SharedFlowSlot
的 index 从缓存区 buffer 获取值,index 指向的可能是缓存区 buffer 中的 buffered values 或 queued emitters 开端完毕方位规模。当取值成功后,index 指向缓存区 buffer 的下一个方位slot.index = index + 1
:
private fun tryTakeValue(slot: SharedFlowSlot): Any? {
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
// 加锁
val value = synchronized(this) {
// 经过 slot ,获取指向的缓存区 buffer 中的 index
val index = tryPeekLocked(slot)
if (index < 0) {
// 没有值
NO_VALUE
} else {
// 记载一下当时 slot 的 index
val oldIndex = slot.index
// 经过上面的 index,从缓存区 buffer 中取出对应的值
val newValue = getPeekedValueLockedAt(index)
// slot 的 index 指向缓存区 buffer 中的下一位 index+1
slot.index = index + 1 // points to the next index after peeked one
// 更新缓存数组的方位,并获取缓存数组与订阅者数组中可康复的续体
resumes = updateCollectorIndexLocked(oldIndex)
newValue
}
}
for (resume in resumes) resume?.resume(Unit)
return value
}
判别 index 是契合缓存区 buffer 中的 buffered values 仍是 queued emitters 的开端完毕方位规模,首要经过 tryPeekLocked
办法:
// returns -1 if cannot peek value without suspension
private fun tryPeekLocked(slot: SharedFlowSlot): Long {
// slot.index 刚开端的值是 replayIndex,也便是指向 buffered values(参看上面的 updateNewCollectorIndexLocked 办法 )
val index = slot.index
// 假如 index 在 buffered values 的开端完毕方位规模内,直接回来
if (index < bufferEndIndex) return index
// 下面的逻辑都是用来判别是否能在 queued emitters 取值
// index>=bufferEndIndex ,此刻假如 buffered values 的容量又大于0,找不到值
if (bufferCapacity > 0) return -1L
// 此刻缓存数组只要 queued emitters,不能取开端方位后边的 Emitter,所以找不到值
// 由于 head=minOf(minCollectorIndex, replayIndex)
if (index > head) return -1L
// 缓存数组巨细为0 ,找不到值
if (queueSize == 0) return -1L
// 从 queued emitters 开端完毕方位规模内取值
return index
}
awaitValue
当 tryTakeValue 办法回来 NO_VALUE
值,也便是 tryPeekLocked 办法回来 -1L
时,此刻在缓存区 buffer 找不到对应的 index,就会履行 awaitValue
:
// 这个办法是一个挂起办法
private suspend fun awaitValue(slot: SharedFlowSlot): Unit = suspendCancellableCoroutine { cont ->
synchronized(this) lock@{
// 再此测验获取指向的缓存区 buffer 中的 index
val index = tryPeekLocked(slot) // recheck under this lock
if (index < 0) {
// 没有找到,给 slot 目标 cont 赋值,也便是让订阅者地点协程挂起
slot.cont = cont // Ok -- suspending
} else {
// 找到,康复协程,不需求挂起
cont.resume(Unit) // has value, no need to suspend
return@lock
}
slot.cont = cont // suspend, waiting
}
}
该办法的首要效果是:将订阅者封装成 Continuation
接口完成类目标,挂起订阅者地点协程。
slot.cont 是一个 Continuation
接口完成类目标:
public interface Continuation<in T> {
/**
* 相关的协程
*/
public val context: CoroutineContext
/**
* 康复相关的协程,传递一个 successful 或 failed 成果值过去
*
*/
public fun resumeWith(result: Result<T>)
}
它的 context
相关的便是订阅者地点的协程。所以 slot.cont 的值存储的是相关订阅者的 Continuation 目标。
freeSlot
freeSlot 办法与 allocateSlot 相对应,当订阅者不再存活时,freeSlot
办法就会履行:
protected fun freeSlot(slot: S) {
// 运用 StateFlow 保存订阅数量
var subscriptionCount: MutableStateFlow<Int>? = null
// 加锁
val resumes = synchronized(this) {
// 订阅者数量减1
nCollectors--
subscriptionCount = _subscriptionCount
// 假如没有订阅者,下一次在 slots 中分配 slot 目标,从索引0开端
if (nCollectors == 0) nextIndex = 0
// slot 目标的真实开释
(slot as AbstractSharedFlowSlot<Any>).freeLocked(this)
}
/*
Resume suspended coroutines.
This can happens when the subscriber that was freed was a slow one and was holding up buffer.
When this subscriber was freed, previously queued emitted can now wake up and are resumed here.
*/
for (cont in resumes) cont?.resume(Unit)
// 订阅数量减1
subscriptionCount?.increment(-1)
}
这个办法首要的效果:记载的订阅者数量减1,以及将 slot 目标中 index 和 cont 重置,也便是 index 不再指向缓存区 buffer 的开端完毕方位规模,cout 不再相关订阅者地点协程。
private class SharedFlowSlot : AbstractSharedFlowSlot<SharedFlowImpl<*>>() {
@JvmField
var index = -1L // 指向缓冲区 buffer 中的索引,值为 -1 表明当时 slot 现已被开释
//......省掉
@JvmField
var cont: Continuation<Unit>? = null // 用来保存等候新数据发送的订阅者的续体,当订阅者等候新值时用到
//......省掉
override fun freeLocked(flow: SharedFlowImpl<*>): Array<Continuation<Unit>?> {
assert { index >= 0 }
val oldIndex = index
index = -1L
cont = null // cleanup continuation reference
return flow.updateCollectorIndexLocked(oldIndex)
}
}
在 freeLocked中调用的 flow.updateCollectorIndexLocked(oldIndex)
办法用于更新缓存数组的方位。
到这儿,SharedFlow 的创立,发送 和 搜集剖析完毕。关于它的特色:同享和独立存在有了一个大概的了解。
事务场景
知道了 SharedFlow 的创立,发送 和 搜集原理后,根据它同享和独立存在的特色,能够在事务中用来做事情总线,相似以前运用的 EventBus。下面是一个 SharedFlow 完成的 EventBus 简略比如:
界说事情总线:
object EventBus {
private val events = ConcurrentHashMap<String, MutableSharedFlow<Event>>()
private fun getOrPutEventFlow(eventName: String): MutableSharedFlow<Event> {
return events[eventName] ?: MutableSharedFlow<Event>().also { events[eventName] = it }
}
fun getEventFlow(event: Class<Event>): SharedFlow<Event> {
return getOrPutEventFlow(event.simpleName).asSharedFlow()
}
suspend fun produceEvent(event: Event) {
val eventName = event::class.java.simpleName
getOrPutEventFlow(eventName).emit(event)
}
fun postEvent(event: Event, delay: Long = 0, scope: CoroutineScope = MainScope()) {
scope.launch {
delay(delay)
produceEvent(event)
}
}
}
@Keep
open class Event(val value: Int) {
}
事情发射和订阅:
lifecycleScope.launch {
EventBus.getEventFlow(Event::class.java).collect {
Log.e("Flow", "EventBus Collect: value=${it.value}")
}
}
EventBus.postEvent(Event(1), 0, lifecycleScope)
EventBus.postEvent(Event(2), 0)
控制台输出成果:
Flow com.example.wangjaing E EventBus Collect: value=1
Flow com.example.wangjaing E EventBus Collect: value=2
运用 SharedFlow 来做事情总线,有以下优点:
- 事情能够延迟发送
- 能够界说粘性事情
- 事情能够感知 Activity 或 Fragment 的生命周期
- 事情是有序的
总结
在暖流 SharedFlow 中,当它创立以后它就存在了,它能够在出产者 emit 数据时,没有顾客 collect 数据而独立运转。当出产者 emit 数据后,这些数据会被缓存下来,新老顾客都能够收到这些数据,然后到达同享数据。
关于发射数据操作,会遭到 MutableSharedFlow 结构办法参数 replay, extraBufferCapacity,onBufferOverflow 值的影响,这些参数会决议发射操作是挂起仍是不挂起。发射的数据,将运用缓存数组进行办理,办理区域分为 buffered values 和 queued emitters。replay 和extraBufferCapacity 参数决议了buffered values 区域的巨细,当 buffered values 区域存满溢出时,会依据溢出战略 onBufferOverflow 进行区域调整。当 replay=0 和 extraBufferCapacity=0 ,或 replay!=0 和 extraBufferCapacity!=0 且 buffered values 区域存满, 发射的数据将被包装成 Emitter 存储到 queued emitters 区域。另外,订阅者数量决议了发射数据是存储到缓存区仍是丢弃。最终,缓存区存储的数据对一切订阅者同享。
关于搜集数据操作,运用 slots: Array<SharedFlowSlot?> 数组来办理订阅者,其间每一个 slot 目标对应一个订阅者,slot 目标的 slot.index 将订阅者要搜集的数据与缓存区相关起来,slot.cont 将订阅者地点协程与 SharedFlow上下文相关起来。假如经过 slot.index 能在缓存区取到值,就直接将值给订阅者。不然就将订阅者封装成 Continuation 接口完成类目标存储到 slot.cont 中,挂起订阅者地点协程,等候缓存区有值时,再康复订阅者协程并给它值。当订阅者协程不存活时,会开释订阅者相关的 slot 目标,也便是重置 slot.inext 和 slot.cont 的值,并从头调整缓存数组的方位。
StateFlow
StateFlow
也是根据 SharedFlow 完成的,所以能够把 StateFlow 了解为 SharedFlow 的一种特别存在。
public interface StateFlow<out T> : SharedFlow<T> {
/**
* The current value of this state flow.
*/
public val value: T
}
StateFlow ,它也是暖流,它也能让一切搜集器同享它所宣布的值,只不过这个值为当时最新的值,它的实例也能够独立于搜集器的存在而存在。
下面将从 StateFlow 的创立,发送 和 搜集来剖析,原理与 SharedFlow 的创立,发送 和 搜集相似,所以这儿只做简略剖析。
创立
创立一个 StateFlow 运用的是 SharedFlowKt 供给的 MutableStateFlow
结构办法:
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
private class StateFlowImpl<T>(
initialState: Any // T | NULL
) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
private val _state = atomic(initialState)
//......省掉
}
StateFlow 有必要要有一个初始值,这个值被缓存在一个原子目标里边:_state = atomic(initialState)
。假如这个值没有更新,那么订阅者订阅时,将收到这个值。
发射
StateFlowImpl 类完成了发射数据 emit
和 tryEmit
办法:
private class StateFlowImpl<T>(
initialState: Any // T | NULL
) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
// 缓存当时值原子目标
private val _state = atomic(initialState)
private var sequence = 0
public override var value: T
get() = NULL.unbox(_state.value)
//发送时,更新当时值,并缓存在 _state 中
set(value) { updateState(null, value ?: NULL) }
override fun compareAndSet(expect: T, update: T): Boolean =
updateState(expect ?: NULL, update ?: NULL)
//更新原子目标 _state 的值
private fun updateState(expectedState: Any?, newState: Any): Boolean {
var curSequence = 0
// 订阅者相关的 slots
var curSlots: Array<StateFlowSlot?>? = this.slots
synchronized(this) {
val oldState = _state.value
if (expectedState != null && oldState != expectedState) return false // CAS 操作
if (oldState == newState) return true // 假如当时值和新值持平,则不必更新,也不必通知订阅者
// 将新值更新到缓存中
_state.value = newState
curSequence = sequence
if (curSequence and 1 == 0) {
curSequence++ //
sequence = curSequence
} else {
sequence = curSequence + 2 // change sequence to notify, keep it odd
return true
}
curSlots = slots // read current reference to collectors under lock
}
//......省掉
}
//......省掉
//非挂起发送
override fun tryEmit(value: T): Boolean {
this.value = value
return true
}
//挂起发送
override suspend fun emit(value: T) {
this.value = value
}
}
这两个办法都是去更新缓存目标 _state
存储的 value 值,假如当时值和新值持平,则不会更新,不然更新,并把新值给订阅者。
搜集
依据上面发射的剖析,搜集便是从缓存 _state
中去取值。
StateFlowImpl
类完成了 Flow
接口的 collect
办法:
override suspend fun collect(collector: FlowCollector<T>): Nothing {
// 分配一个 StateFlowSlot
val slot = allocateSlot()
try {
// 假如订阅者是 SubscribedFlowCollector 类型,则告诉订阅者订阅开端
if (collector is SubscribedFlowCollector) collector.onSubscription()
// 当时协程
val collectorJob = currentCoroutineContext()[Job]
// 记载一下上一个缓存值
var oldState: Any? = null
// 死循环
while (true) {
// Here the coroutine could have waited for a while to be dispatched,
// 获取当时缓存值
val newState = _state.value
// 承认订阅者地点协程是否还存活,假如不存活,会抛出 `CancellationException` 异常,直接到 finally
collectorJob?.ensureActive()
// 假如上一个缓存值为空或新值不等于上一个缓存值,则将新值给订阅者
if (oldState == null || oldState != newState) {
collector.emit(NULL.unbox(newState))
//更新记载的缓存值
oldState = newState
}
// 判别订阅者是否需求挂起
if (!slot.takePending()) {
//订阅者地点协程会被挂起,等候 emit 发射新数据到缓存
slot.awaitPending()
}
}
} finally {
// 订阅者不存活时,开释分配的 slot
freeSlot(slot)
}
}
订阅者订阅时首要的步骤是:
- 分配一个 StateFlowSlot:
val slot = allocateSlot()
- 经过分配的 slot 去从缓存
_state
中取值:val newState = _state.value
- 承认订阅者地点协程是否还存活,假如不存活,会抛出
CancellationException
异常,直接到 finally:collectorJob?.ensureActive()
- 将新值给订阅者:
collector.emit(NULL.unbox(newState))
- 订阅者不存活时,开释分配的 slot:
freeSlot(slot)
事务场景
知道了 StateFlow 的创立,发送 和 搜集的大致原理后,以及它同享最新状况的特色。在事务中能够用来做状况更新(替换 LiveData)。
比如从服务端获取一个列表数据,并把列表数据展示到 UI。下面运用 MVI (Model-View-Intent)
来做:
Data Layer:
class FlowRepository private constructor() {
companion object {
@JvmStatic
fun newInstance(): FlowRepository = FlowRepository()
}
fun requestList(): Flow<List<ItemBean>> {
val call = ServiceGenerator
.createService(FlowListApi::class.java)
.getList()
return flow {
emit(call.execute())
}.flowOn(Dispatchers.IO).filter { it.isSuccessful }
.map {
it.body()?.data
}
.filterNotNull().catch {
emit(emptyList())
}.onEmpty {
emit(emptyList())
}
}
}
ViewModel:
class ListViewModel : ViewModel() {
private val repository: FlowRepository = FlowRepository.newInstance()
private val _uiIntent: Channel<FlowViewIntent> = Channel()
private val uiIntent: Flow<FlowViewIntent> = _uiIntent.receiveAsFlow()
private val _uiState: MutableStateFlow<FlowViewState<List<ItemBean>>> =
MutableStateFlow(FlowViewState.Init())
val uiState: StateFlow<FlowViewState<List<ItemBean>>> = _uiState
fun sendUiIntent(intent: FlowViewIntent) {
viewModelScope.launch {
_uiIntent.send(intent)
}
}
init {
viewModelScope.launch {
uiIntent.collect {
handleIntent(it)
}
}
}
private fun handleIntent(intent: FlowViewIntent) {
viewModelScope.launch {
repository.requestList().collect {
if (it.isEmpty()) {
_uiState.emit(FlowViewState.Failure(0, "data is invalid"))
} else {
_uiState.emit(FlowViewState.Success(it))
}
}
}
}
}
data class FlowViewIntent()
sealed class FlowViewState<T> {
@Keep
class Init<T> : FlowViewState<T>()
@Keep
class Success<T>(val result: T) : FlowViewState<T>()
@Keep
class Failure<T>(val code: Int, val msg: String) : FlowViewState<T>()
}
UI:
private var isRequestingList = false
private lateinit var listViewModel: ListViewModel
private fun initData() {
listViewModel = ViewModelProvider(this)[ListViewModel::class.java]
lifecycleScope.launchWhenStarted {
listViewModel.uiStateFlow.collect {
when (it) {
is FlowViewState.Success -> {
showList(it.result)
}
is FlowViewState.Failure -> {
showListIfFail()
}
else -> {}
}
}
}
requestList()
}
private fun requestList() {
if (!isRequestingList) {
isRequestingList = true
listViewModel.sendUiIntent( FlowViewIntent() )
}
}
运用 StateFlow 替换 LiveData ,并用结合 MVI 替换 MVVM 后,能够有以下优点:
- 唯一可信数据源:MVVM 中 可能会存在很多 LiveData,这导致数据交互或并行更新出现逻辑不可控,增加 UIState 结合 StateFlow ,数据源只要 UIState;
- 数据单向活动:MVVM 中存在数据 UI ⇆ ViewModel 相互活动,而 MVI 中数据只能从 Data Layer → ViewModel → UI 活动,数据是单向活动的。
运用 StateFlow 替换 LiveData 来做事情状况更新,有以下差异:
- StateFlow 需求将初始状况传递给结构函数,而 LiveData 不需求。
- 当 View 进入 STOPPED 状况时,LiveData.observe() 会主动撤销注册运用方,而从 StateFlow 或任何其他数据流搜集数据的操作并不会主动中止。如需完成相同的行为,需求从 Lifecycle.repeatOnLifecycle 块搜集数据流。
总结
暖流 StateFlow,根据 SharedFlow 完成,所以它也有独立存在和同享的特色。但在 StateFlow 中发射数据,只要最新的值被缓存下来,所以当新老订阅者订阅时,只会收到它最终一次更新的值,假如发射的新值和当时值持平,订阅者也不会收到通知。