开篇
flow api 现已渐渐被谷歌列为数据流的首选,能够见到官网的数据流篇都渐渐偏向于flow api,尽管LiveData等数据流类型现已深化开发者观念中,可是flow api也正渐渐的崛起出自己的商场。本篇讲的StateFlow是flow api中的一个更偏向于应用层的api,功用也非常和LiveData相似,那么为什么要出一个和LiveData相似的东西的,因为LiveData天生就引入了生命周期相关的概念,从设计角度出发,其实是耦合了生命周期这部分,所以现在才重整旗鼓,呈现了StateFlow。
接口部分
public interface StateFlow<out T> : SharedFlow<T> {
/**
* The current value of this state flow.
*/
public val value: T
}
能够看到StateFlow在SharedFlow上层上添加了一个属性,便是value值,能够被以为当时是当时可观测的值,跟LiveData的value相似。StateFlow愈加推崇的是单体状况,所以差异于一般的flowapi(主要是数据流状况),它的实现制止了一个重置操作。
StateFlowImpl 中
@Suppress("UNCHECKED_CAST")
override fun resetReplayCache() {
throw UnsupportedOperationException("MutableStateFlow.resetReplayCache is not supported")
}
还有便是一般的flow属于冷流,冷流这个概念就不再赘述,而flow之所以是冷流,是因为只要在collect的时分间接通过flowcollecter的emit办法去产生数据,实质上数据的改动依靠搜集者,所以才是冷流,具体分析能够下看上一篇文章
/post/705860…
而StateFlow承继于SharedFlow,并且value数值是不依靠于collect办法改动,简略来说,便是搜集者能够改动value数值,可是StateFlow中的value改动并不是只要这一种手法【注意这儿概念】,它还能够直接对value数据改动,如下面比如
class CounterModel {
private val _counter = MutableStateFlow(0) // private mutable state flow
val counter = _counter.asStateFlow() // publicly exposed as read-only state flow
fun inc() {
_counter.value++
}
}
所以按照数据划分,它就能够属于暖流,当然冷流暖流实质是一个概念,便于区别即可。
发送者部分
为了便利分析,咱们将数据改动部分称为发送者部分,每个对value进行set操作时都会进入下面办法。
private fun updateState(expectedState: Any?, newState: Any): Boolean {
var curSequence = 0
var curSlots: Array<StateFlowSlot?>? = this.slots // benign race, we will not use it
synchronized(this) {
val oldState = _state.value
if (expectedState != null && oldState != expectedState) return false // CAS support
if (oldState == newState) return true // Don't do anything if value is not changing, but CAS -> true
_state.value = newState
curSequence = sequence
if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
curSequence++ // make it odd
sequence = curSequence
} else {
// update is already in process, notify it, and return
sequence = curSequence + 2 // change sequence to notify, keep it odd
return true // updated
}
curSlots = slots // read current reference to collectors under lock
}
/*
Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines.
Loop until we're done firing all the changes. This is a sort of simple flat combining that
ensures sequential firing of concurrent updates and avoids the storm of collector resumes
when updates happen concurrently from many threads.
*/
while (true) {
// Benign race on element read from array
curSlots?.forEach {
it?.makePending()
}
// check if the value was updated again while we were updating the old one
synchronized(this) {
if (sequence == curSequence) { // nothing changed, we are done
sequence = curSequence + 1 // make sequence even again
return true // done, updated
}
// reread everything for the next loop under the lock
curSequence = sequence
curSlots = slots
}
}
}
其实一段代码下来,除了数据保护部分,便是做了这么一件事,更新value的数据值,并记载当时数值是否是最新数值。和livedata的更新数据部分相似,实质都是保护一个计数器sequence,用来区别当时的value是否是最新。slots用来(下文会讲)记载数据更新状况,总的来说便是当数值发生改动的时分更新数据。
订阅者部分
当咱们调用collect的时分,就会进行数据获取,也便是调用接收的那一部分。
override suspend fun collect(collector: FlowCollector<T>) {
val slot = allocateSlot()
try {
if (collector is SubscribedFlowCollector) collector.onSubscription()
val collectorJob = currentCoroutineContext()[Job]
var oldState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
// The loop is arranged so that it starts delivering current value without waiting first
while (true) {
// Here the coroutine could have waited for a while to be dispatched,
// so we use the most recent state here to ensure the best possible conflation of stale values
val newState = _state.value
// always check for cancellation
collectorJob?.ensureActive()
// Conflate value emissions using equality
if (oldState == null || oldState != newState) {
collector.emit(NULL.unbox(newState))
oldState = newState
}
// Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
if (!slot.takePending()) { // try fast-path without suspending first
slot.awaitPending() // only suspend for new values when needed
}
}
} finally {
freeSlot(slot)
}
}
因为collect在协程傍边,所以要时刻重视协程的状况collectorJob?.ensureActive()便是之一,那么什么时分才会正在调用咱们collect里的自定义逻辑呢?换个说法来说便是什么时分像LiveData相同收到回调呢?
注意一下上面的这儿
if (oldState == null || oldState != newState) {
collector.emit(NULL.unbox(newState))
oldState = newState
}
和livedata不相同的是,livedata每次设定value都会收到回调,而flow是在新旧状况不相同时,才会进行 collector.emit(NULL.unbox(newState)),进而触发搜集。所以当value被屡次设定同一个值的时分,LiveData会回调屡次,而StateFlow只会调用一次。
关于StateFlowSlot
能够看到不管上面的发送部分或许接收部分都有StateFlowSlot类的影子,
比如
var curSlots: Array<StateFlowSlot?>? = this.slots /
里边保护着一个Array,那么这个是有什么用处呢?
咱们知道flow是运行在协程里边的,简略来说协程实质便是状况机各种回调,那么根据这种环境下,在多线程或许多子协程中,为了保护State的正确更改,里边也要设定自己的状况机进行标识状况的正确改动。
private class StateFlowSlot : AbstractSharedFlowSlot<StateFlowImpl<*>>() {
private val _state = atomic<Any?>(null)
override fun allocateLocked(flow: StateFlowImpl<*>): Boolean {
// No need for atomic check & update here, since allocated happens under StateFlow lock
if (_state.value != null) return false // not free
_state.value = NONE // allocated
return true
}
override fun freeLocked(flow: StateFlowImpl<*>): Array<Continuation<Unit>?> {
_state.value = null // free now
return EMPTY_RESUMES // nothing more to do
}
@Suppress("UNCHECKED_CAST")
fun makePending() {
_state.loop { state ->
when {
state == null -> return // this slot is free - skip it
state === PENDING -> return // already pending, nothing to do
state === NONE -> { // mark as pending
if (_state.compareAndSet(state, PENDING)) return
}
else -> { // must be a suspend continuation state
// we must still use CAS here since continuation may get cancelled and free the slot at any time
if (_state.compareAndSet(state, NONE)) {
(state as CancellableContinuationImpl<Unit>).resume(Unit)
return
}
}
}
}
}
fun takePending(): Boolean = _state.getAndSet(NONE)!!.let { state ->
assert { state !is CancellableContinuationImpl<*> }
return state === PENDING
}
@Suppress("UNCHECKED_CAST")
suspend fun awaitPending(): Unit = suspendCancellableCoroutine sc@ { cont ->
assert { _state.value !is CancellableContinuationImpl<*> } // can be NONE or PENDING
if (_state.compareAndSet(NONE, cont)) return@sc // installed continuation, waiting for pending
// CAS failed -- the only possible reason is that it is already in pending state now
assert { _state.value === PENDING }
cont.resume(Unit)
}
}
一个StateFlowSlot里边有以下状况:
一般状况下有如下状况机切换
graph TD
null --> NONE --> PENDING --> 进行新value设定
最终调用的是CancellableContinuationImpl 里边的执行,这儿没什么好讲的,便是使用cas机制保证状况机的切换
值得注意的是,第一个null 和NONE的差异,null代表的是初始状况,设置null是为了防止新new一个StateFlowSlot 和对StateFlowSlot同时进行makePending 导致状况机紊乱,所以才多加了一个初始状况null,简略表示StateFlowSlot刚刚创立,还没来的及去进入状况机更改,而NONE代表着能够进入状况机更改。
总结
到这儿StateFlow现已是根本解说完毕了,有理解过错的地方还望纠正,最好能够重视一下往期的解析:
使用flow api打造一个总线型库
/post/705860…
github地址:github.com/TestPlanB/p… 欢迎star or pr