概述
What
参阅官方介绍StateFlow 和 SharedFlow | Android 开发者 | Android Developers
这两个类都是基于底层的Flow异步流 Kotlin 官方文档 中文版 (kotlincn.net)
Why
当Kotlin Coroutines异步履行的上下文中需求状况办理,StateFlow和SharedFlow便是被规划用于这种场景, 进一步能够经过这样的状况办理去完成相似LiveData
的功用
How
参阅StateFlow 和 SharedFlow | Android 开发者 | Android Developers 中的运用
本文针对SharedFlow的论述其根本架构,然后经过具体的源码剖析对根本架构进行具体的解释。因为StateFlow能够看作特殊SharedFlow,此篇暂且略过。
根本架构
上描述了SharedFlow的一个简略规划模型,看着十分简略,其实需求考虑几个问题
首要collect(订阅)这一步需求考虑如下几个问题
- 多线程订阅需求考虑到线程安全,这是一个根本前提,后续问题都需求考虑线程安全问题
- Slots怎么动态办理多个Collector,以分配空间(add)解释或许遇到的问题,如下图
暂时无法在文档外展示此内容
- 怎么保证订阅者生命周期,即只需没有撤销订阅,发布者发布消息的时分,订阅者总能接收到
- 该模型相似生产者顾客模型,肯定会遇到生产者顾客速度不一致问题,怎么处理?
其次再来看看emit(发布)需求考虑的问题
- 线程安全问题
- 线程安全的找到一切订阅者,进行
emit
- 生产者和顾客模型中的一切问题
除了上述所提出的需求处理的问题,SharedFlow还供给了不少feature, 如下
- 生产者顾客模型中因为生产者和顾客的速度不一致的三种形式,取最新,取最旧,等候值
- 事情重放,即新的订阅者敞开时会收到之前的发送的数据,重放次数在代码里用
replay
表明 - 供给
LiveData
转化 - 冷热流的转化
大体结构定了,咱们再以源码的角度具体论述一下SharedFlow的创立,订阅和发布的进程。
源码剖析
SharedFlow的创立进程
MutableSharedFlow
public fun <T> MutableSharedFlow(
// 重放
replay: Int = 0,
// 生产者和顾客模型中的缓存行列大小
extraBufferCapacity: Int = 0,
// 生产者和顾客模型中的放弃办法,取最新,取最旧,等候值
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
require(replay >= 0) { "replay cannot be negative, but was $replay" }
require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }
require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) {
"replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow"
}
// 前面其实现已判断了replay和extraBufferCapacity有必要大于0,这儿有点多余了
val bufferCapacity0 = replay + extraBufferCapacity
val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 // coerce to MAX_VALUE on overflow
return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}
- 常用的状况replay = 0, buffercapacity = 0
SharedFlowImpl
private class SharedFlowImpl<T>(
private val replay: Int,
private val bufferCapacity: Int,
private val onBufferOverflow: BufferOverflow
) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
}
internal abstract class AbstractSharedFlow<S : AbstractSharedFlowSlot<*>> : SynchronizedObject() {
protected var slots: Array<S?>? = null // allocated when needed
private set
protected var nCollectors = 0 // number of allocated (!free) slots
private set
private var nextIndex = 0 // oracle for the next free slot index
private var _subscriptionCount: MutableStateFlow<Int>? = null // init on first need
val subscriptionCount: StateFlow<Int>
get() = synchronized(this) {
// allocate under lock in sync with nCollectors variable
_subscriptionCount ?: MutableStateFlow(nCollectors).also {
_subscriptionCount = it
}
}
}
能够看其承继了AbstractSharedFlow
,其中有两个重要变量后面会提到,slots
和
nCollectors
SharedFlow的订阅进程
订阅进程即把订阅者注册到发布者(AbstractSharedFlow
)中
Collect.kt
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
flow调用了collect就会新建一个collector
,这儿能够理解为订阅者,咱们能够想到这个collector在后续的进程肯定是会注册到发布者中。
该办法持续调用SharedFlow中的collect函数
SharedFlow -> collect
@Suppress("UNCHECKED_CAST")
override suspend fun collect(collector: FlowCollector<T>) {
// 一个collector对应着一个slot
val slot = allocateSlot()
try {
if (collector is SubscribedFlowCollector) collector.onSubscription()
val collectorJob = currentCoroutineContext()[Job]
// 第一层while用于collect不断接收值
while (true) {
var newValue: Any?
// 第二层while用于等候上游发送值
while (true) {
// 两件事
// 1. 尝试直接获得上游发送的值
// 2. slot假如处于unlocked状况,则resume上游保存的续体
newValue = tryTakeValue(slot) // attempt no-suspend fast path first
// 拿到了就退出
if (newValue !== NO_VALUE) break
// 没拿到持续等候值, 等候上游emit
awaitValue(slot) // await signal that the new value is available
}
collectorJob?.ensureActive()
// suspend 履行collect里的block {}
collector.emit(newValue as T)
}
} finally {
freeSlot(slot)
}
}
两个循环 + awaitValue
表明SharedFlow的collect
正常状况下是不会退出,全体逻辑如下
-
tryTakeValue
循环获取上游发送的值,有值时退出,履行collector.emit
,没值时交给awaitvalue
去挂起等候上游经过slot.cont
发送值。参阅tryTakeValue -
collector.emit
是一个suspend函数,当该suspend函数履行完成后,又会持续经过tryTakeValue
去取上游数据。 -
awaitvalue
意图是给slot
赋值cont
即slot.cont = awaitvalue.cont
AbstractSharedFlow -> allocateSlot
protected fun allocateSlot(): S {
// Actually create slot under lock
var subscriptionCount: MutableStateFlow<Int>? = null
val slot = synchronized(this) {
val slots = when (val curSlots = slots) {
null -> createSlotArray(2).also { slots = it }
else -> if (nCollectors >= curSlots.size) {
// 只需订阅者大于slots的大小,阐明空间不够需求分配
curSlots.copyOf(2 * curSlots.size).also { slots = it }
} else {
curSlots
}
}
var index = nextIndex
var slot: S
while (true) {
slot = slots[index] ?: createSlot().also { slots[index] = it }
index++
if (index >= slots.size) index = 0
// new size 2,但是只分配1个内存
if ((slot as AbstractSharedFlowSlot<Any>).allocateLocked(this)) break // break when found and allocated free slot
}
nextIndex = index
nCollectors++
subscriptionCount = _subscriptionCount // retrieve under lock if initialized
slot
}
// increments subscription count
subscriptionCount?.increment(1)
return slot
}
这一部分便是分配slot,能够简略理解为slot
为保存订阅者的空间。Slot的具体数据结构可参阅附录[Slot的数据结构]
SharedFlow -> tryTakeValue
private fun tryTakeValue(slot: SharedFlowSlot): Any? {
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
val value = synchronized(this) {
val index = tryPeekLocked(slot)
// index < 0 代表slot locked
if (index < 0) {
NO_VALUE
} else {
// slot unlocked, resume上游
val oldIndex = slot.index
val newValue = getPeekedValueLockedAt(index)
slot.index = index + 1 // points to the next index after peeked one
// 得到上游能够resume的cont
resumes = updateCollectorIndexLocked(oldIndex)
newValue
}
}
// resume上游cont
for (resume in resumes) resume?.resume(Unit)
return value
}
updateCollectorIndexLocked
有点杂乱,其干了两件事
- 从slot和buffer里边找续体。先找buffer在找slot,将buffer和slot找到的一切resume添加到resumes中。buffer的续体为emit的suspend 能够简略理解为生产者, slot的续体为collect的suspend,能够简略理解为顾客。
这个当地举个比如:
生产者速度很快,顾客很慢,这样就会产生产者续体Emitter,作用是,等候顾客释放,调用resume,交给顾客。比如连续emit 1~10,顾客collector 10s中承受一个, buffer为3。
- 更新buffer及各种记录buffer数据的全局变量,将现已发送的buffer给clear掉
SharedFlow -> awaitValue
private suspend fun awaitValue(slot: SharedFlowSlot): Unit = suspendCancellableCoroutine { cont ->
synchronized(this) lock@ {
//
val index = tryPeekLocked(slot) // recheck under this lock
if (index < 0) {
// 给slot分配续体,阐明SharedFlowSlot的slot的cont是var
// 此处的cont便是在SharedFlow -> findSlotsToResumeLocked中slot.cont
slot.cont = cont // Ok -- suspending
} else {
//
cont.resume(Unit) // has value, no need to suspend
return@lock
}
slot.cont = cont // suspend, waiting
}
}
SharedFlow的emit进程
emit进程能够理解为发布/订阅中的发布进程
SharedFlow -> emit
override suspend fun emit(value: T) {
if (tryEmit(value)) return // fast-path
// 该函数在满buffer且suspend的状况下履行
emitSuspend(value)
}
SharedFlow -> TryEmit
override fun tryEmit(value: T): Boolean {
// 多少个收集者nCollectors就有多少个resumes
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
// 多线程安全,shareFlow能够在不同线程emit
val emitted = synchronized(this) {
if (tryEmitLocked(value)) {
// 找到collector对应的续体进行回调
resumes = findSlotsToResumeLocked(resumes)
true
} else {
false
}
}
// 这儿是collect的cont,这儿履行后collect中的代码就运行了
for (cont in resumes) cont?.resume(Unit)
return emitted
}
SharedFlow -> tryEmitLocked
private fun tryEmitLocked(value: T): Boolean {
// Fast path without collectors -> no buffering
// 没有收集者,回来
if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) // always returns true
// With collectors we'll have to buffer
// cannot emit now if buffer is full & blocked by slow collectors
// 当缓存容量超越最大容量时
if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
when (onBufferOverflow) {
// 假如是等候形式,回来运用emit
BufferOverflow.SUSPEND -> return false // will suspend
// 放弃最新值即取最旧值, 所以选择不发射新值
BufferOverflow.DROP_LATEST -> return true // just drop incoming
// 放弃旧值取最新值,所以选择发射新值
BufferOverflow.DROP_OLDEST -> {} // force enqueue & drop oldest instead
}
}
// 还有buffer就直接入队了
enqueueLocked(value)
bufferSize++ // value was added to buffer
// drop oldest from the buffer if it became more than bufferCapacity
// 移除最旧值,假如超越缓存max
if (bufferSize > bufferCapacity) dropOldestLocked()
// keep replaySize not larger that needed
if (replaySize > replay) { // increment replayIndex by one
updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
}
return true
}
从这个函数能够看出,emitSuspend
函数只在BufferOverflow.``SUSPEND
形式下才或许会调用,下面剖析下该发射函数,在不是满Buffer的状况下,会经过findSlotsToResumeLocked
去找slot
SharedFlow -> findSlotsToResumeLocked
private fun findSlotsToResumeLocked(resumesIn: Array<Continuation<Unit>?>): Array<Continuation<Unit>?> {
var resumes: Array<Continuation<Unit>?> = resumesIn
var resumeCount = resumesIn.size
forEachSlotLocked loop@ { slot ->
val cont = slot.cont ?: return@loop // only waiting slots
if (tryPeekLocked(slot) < 0) return@loop // only slots that can peek a value
if (resumeCount >= resumes.size) resumes = resumes.copyOf(maxOf(2, 2 * resumes.size))
resumes[resumeCount++] = cont
slot.cont = null // not waiting anymore
}
return resumes
}
该函数意图是找到slots中一切的数组
SharedFlow -> emitSuspend
// 全体来讲相似滑动窗口逻辑
private suspend fun emitSuspend(value: T) = suspendCancellableCoroutine<Unit> sc@ { cont ->
// 开始没有续体, 假如buffer不够了,才会去slot里边找续体
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
val emitter = synchronized(this) lock@ {
// recheck buffer under lock again (make sure it is really full)
// double check
if (tryEmitLocked(value)) {
// 只要BufferOverflow.SUSPEND的状况才走这个分支
cont.resume(Unit)
resumes = findSlotsToResumeLocked(resumes)
return@lock null
}
// add suspended emitter to the buffer
// BufferOverflow.SUSPEND需求入队buffer
Emitter(this, head + totalSize, value, cont).also {
enqueueLocked(it)
queueSize++ // added to queue of waiting emitters
// synchronous shared flow might rendezvous with waiting emitter
// slot里边找续体
if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
}
}
// outside of the lock: register dispose on cancellation
emitter?.let { cont.disposeOnCancellation(it) }
// outside of the lock: resume slots if needed
// 这儿的cont和该suspend的cont不太相同,其是之前Emitter保存的的cont
for (cont in resumes) cont?.resume(Unit)
}
根据collect
和现在emitSuspend
能够总结出当指定为BufferOverFlow.Suspend
时,处理生产者顾客的全体逻辑相似滑动窗口(想想计算机网络里学习的滑动窗口算法便是用来处理生产者顾客问题的计划之一),假定设置了extraBufferCapacity = 1
一共就有2个buffer。所以emit 1, 2时,因为订阅者消费速度较慢,需求保存在buffer中,不用resume()。当collector delay完成后,flow的slot立马分配了,于是再emit3的时分,能够找到slot的cont, 发送给了collector。
附录
SharedFlowSlot的数据结构
private class SharedFlowSlot : AbstractSharedFlowSlot<SharedFlowImpl<*>>() {
@JvmField
var index = -1L // current "to-be-emitted" index, -1 means the slot is free now
@JvmField
var cont: Continuation<Unit>? = null // collector waiting for new value
override fun allocateLocked(flow: SharedFlowImpl<*>): Boolean {
if (index >= 0) return false // not free
index = flow.updateNewCollectorIndexLocked()
return true
}
override fun freeLocked(flow: SharedFlowImpl<*>): Array<Continuation<Unit>?> {
assert { index >= 0 }
val oldIndex = index
index = -1L
cont = null // cleanup continuation reference
return flow.updateCollectorIndexLocked(oldIndex)
}
}
- 此处的
cont
是协程中的续体,怎么熟悉协程原理的话,这个续体里边其实持有Collector引用,用于异步回调
Q&A
全体来讲,SharedFlow是怎么处理生产者与顾客问题的?
答:上游一个续体,下流多个续体;上游resume下流续体,下流resume上游续体;Buffer存储上游续体,slot存储下流续体
在collect()后能否履行代码
答:collect是一个suspend函数,所以需求在collect函数履行完成后后续代码才干履行,然后collect在协程正常时是一个无限while循环。所以后续代码是无法履行的。除非协程撤销
slots为什么要规划成数组结构
答:这个问题或许理解得不全。代码内部运用了许多index下标访问和指定数量扩容机制,需求运用数组提高遍历功能和扩容灵活性。
总结与收获
StateFlow和SharedFlow能够平替LiveData
两者都是订阅和发布模型,但是LiveData因为粘性特点,有[LiveData数据倒灌]的问题。这个问题能够用SharedFlow处理(设置replay = 0),而其他需求该粘性特点的数据状况能够用StateFlow
处理
SharedFlow能够处理生产者顾客问题(背压)
因为SharedFlow
的后两个参数,使它有了数据流的处理能力,利用滑动窗口的思想处理了生产者和顾客问题。所以当遇到生产者和顾客在不同线程中且生产速率和消费速率不一起,能够用SharedFlow
处理。