-
初步知道Flow(冷流)
1.Flow上游
a.先看一下flow的简单运用
private fun flowTest() = runBlocking { flow { emit(0) emit(1) emit(2) emit(3) emit(4) }.collect { Log.d(TAG, "it:$it") } } 输出成果: //输出成果: //it:0 //it:1 //it:2 //it:3 //it:4
Flow从字面意思了解便是流,Flow除了有发送方和接纳方之外还能够有中转站,什么是中转站呢,中转站便是咱们常用的中心操作符。比方 : .filter() .map()等等
b.中转站用法
private fun flowTest() = runBlocking { flow { //发送方 emit(0) emit(1) emit(2) emit(3) emit(4) }.filter { //中转站(1) it > 2 }.map { //中转站(2) it * 2 }.collect { //承受方 Log.d(TAG, "it:$it") } } //输出成果: //it:6 //it:8
(1) flow{}: 是一个高阶函数,作用便是创立一个新的
Flow
,创立好后就要把音讯发送出去,这儿的emit
是发射、发送的意思,那么flow{}
的作用便是创立一个数据流而且将数据发送出去;(2) filter{}、map{}: 这是中心操作符,都是高阶函数,就像中转站相同对数据进行处理后向下传递;
(3) collect{}: 中止操作符,中止
Flow
数据流并接纳从上游传递的数据。(4) 除了经过
flow{}
创立Flow
之外还有flowOf{}
,也能够创立一个Flow
private fun flowOfTest() { runBlocking { flowOf(0, 1, 2, 3, 4) .filter { it > 2 }.map { it * 2 }.collect { Log.d(TAG, "it:$it") } } } //输出成果: //it:6 //it:8
(5)collect ->是中止操作符, 作用是接纳从上游传递的数据,那要是不接纳会怎样样?
答案是:运转上面的代码会发现什么都没有做就结束了,而添加**
collect
函数后filter
和map
的日志便是正常输出的,因而得出一个定论:只要调用中止操作符collect**之后,Flow 才会开端作业。所以Flow是一个 冷流没有承受方Flow是不会作业的。
(6)上面两段代码都发送了5条数据,然后由
collect
接纳,那么是一次发送完毕仍是逐条发送呢?private fun flowOfTest() { runBlocking { flowOf(0, 1, 2, 3, 4) .filter { Log.d(TAG, "filter:$it") it > 2 }.map { Log.d(TAG, "map:$it") it * 2 }.collect { Log.d(TAG, "collect:$it") } } } 输出成果: //filter:0 //filter:1 //filter:2 //filter:3 //map:3 //collect:6 //filter:4 //map:4 //collect:8
从输出成果能够很清楚的知道
Flow
一次只会处理一条数据。(7)Kotlin还供给了Flow转List、List转Flow的API….Flow创立的几种办法
Flow创立办法 运用场景 用法 flow 未知数据集 flow{ emit() }.collect{ } flowOf 已知数据集 flowOf(T).collect{ } listOf 已知数据来历的调集 listOf(T).asFlow().collect{ } 2.Flow中转站(中心操作符)
A.中转站便是咱们常用的中心操作符,跟调集相同操作符(这边只列出了一部分。更多的自己点击查看源码)
/** * 回来只包括与给定[predicate]匹配的原始流的值的流 */ public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value -> if (predicate(value)) return@transform emit(value) } /** * 回来只包括与给定[predicate]值不匹配的原始流的值的流 */ public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value -> if (!predicate(value)) return@transform emit(value) } /** * 回来一个只包括原始流的非空值的流 */ public fun <T: Any> Flow<T?>.filterNotNull(): Flow<T> = transform<T?, T> { value -> if (value != null) return@transform emit(value) } /** * 回来一个流,其间包括对原始流的每个值运用给定[transform]函数的成果。 */ public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value -> return@transform emit(transform(value)) } /** * 回来一个流,将每个元素包装成[IndexedValue],包括value和它的索引(从0开端)。 */ public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>> = flow { var index = 0 collect { value -> emit(IndexedValue(checkIndexOverflow(index++), value)) } } /** * 回来一个流,在上游流的每个值被下流宣布之前调用给定的[action]。 */ public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value -> action(value) return@transform emit(value) }
B.
Flow
特有的操作符.生命周期onStart, onCompletionprivate fun flowTest() = runBlocking { flow { //发送方 emit(0) emit(1) emit(2) emit(3) emit(4) }.filter { //中转站(1) Log.d(TAG, "filter:$it") it > 2 }.map { Log.d(TAG, "map:$it") //中转站(2) it * 2 }.onStart { Log.d(TAG, "onStart:") }.onCompletion { Log.d(TAG, "onCompletion:") } .collect { //承受方 Log.d(TAG, "collect:$it") } } 输出: //onStart: //filter:0 //filter:1 //filter:2 //filter:3 //map:3 //collect:6 //filter:4 //map:4 //collect:8 //onCompletion
(1)能够看到
onStart
函数的履行数序与它在代码中界说的次序没有联系,而其他两个操作符filter
、map
的履行流程则跟它们界说的次序息息相关。(2)
onCompletion
在它的注释中也标注的比较清楚,类似于finally
,都是在终究履行。C.
Flow
特有的操作符->catch反常处理Flow
中的catch
反常处理时要遵从上下流规矩的,由于Flow
是具有上下流之分的,详细来讲便是catch
只能管理自己上游产生的反常,对于它下流的反常则无能为力,用代码来展示一下他们的差异:private fun flowOfTest() { runBlocking { flowOf(0, 1, 2, 3, 4) .filter { Log.d(TAG, "filter:$it") it > 2 }.map { Log.d(TAG, "map:$it") it * 2 }.map { it / 0 }.catch { Log.d(TAG, "catch:$it") }.collect { Log.d(TAG, "collect:$it") } } } 输出成果: //filter:0 //filter:1 //filter:2 //filter:3 //map:3 //catch:java.lang.ArithmeticException: divide by zero
上游产生 反常 ,在反常后捕获
private fun flowOfTest() { runBlocking { flowOf(0, 1, 2, 3, 4) .filter { Log.d(TAG, "filter:$it") it > 2 }.map { Log.d(TAG, "map:$it") it * 2 }.catch { Log.d(TAG, "catch:$it") }.map { it / 0 }.collect { Log.d(TAG, "collect:$it") } } } //直接奔溃了 // java.lang.ArithmeticException: divide by zero
上游捕获反常,下流产生 反常 。
(1)从两段代码能够非常清楚的总结出:上游产生反常并在反常后捕获是不会形成程序中止的,而在上游捕获反常,下流产生反常时则会形成程序中止。
(2)那么下流的反常就无法捕获了吗?并不是,对于下流的反常能够考虑选用最传统的做法
try catch
办法(3)总结:Flow中的catch操作符的作用与它地点的方位是强相关的,catch无法捕获的能够选用try catch捕获。
D.
Flow
特有的操作符——切换Context:flowOn、launchInFlow
由于它具有上游、中心操作符、下流的特性,使得它能够处理杂乱且异步履行的使命,那么异步履行的使命中大多又涉及到线程切换,Flow
也恰好供给了线程切换的API。flowOn
private fun flowTest2() { //flow切换线程 runBlocking { flow { emit(0) emit(1) emit(2) emit(3) emit(4) }.filter { Log.d(TAG, "filter:$it") Log.d(TAG, "thread = ${Thread.currentThread().name}") it > 2 }.flowOn(Dispatchers.IO).map { Log.d(TAG, "map:$it") Log.d(TAG, "thread = ${Thread.currentThread().name}") it * 2 }.collect { Log.d(TAG, "collect:$it") Log.d(TAG, "thread = ${Thread.currentThread().name}") } } } //输出成果 //filter:0 //thread = DefaultDispatcher-worker-2 //filter:1 //thread = DefaultDispatcher-worker-2 //filter:2 //thread = DefaultDispatcher-worker-2 //filter:3 //thread = DefaultDispatcher-worker-2 //map:3 //map thread = main //collect: 6 //collect thread = main //map:4 //map thread = main //collect: 8 //collect thread = main
flowOn
线程的切换范围与catch
相同仅针对上游,那么要拟定collect
中的Context
该怎样办?能够运用
withContext
,假如除了collect
之外还想让其他操作符也运转在collect
地点的线程中就会遇到问题,尽管依旧能够运用withContext
可是这样的写法就会很丑恶,就像下面这样失去了本来简洁的链式调用。那么解决这个问题的另一种计划launchIn
就派上用场了。launchIn
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch { collect() }
运用了
launchIn
操作符的flow
无法再调用collect
,从launchIn
源码中可知,launchIn
调用了collect()
。private fun flowTest3() { runBlocking { flow { emit(0) emit(1) emit(2) emit(3) emit(4) }.filter { Log.d(TAG, "filter:$it") Log.d(TAG, "thread = ${Thread.currentThread().name}") it > 2 }.flowOn(Dispatchers.IO).map { Log.d(TAG, "map:$it") Log.d(TAG, "map thread = ${Thread.currentThread().name}") it * 2 }.onEach { Log.d(TAG, "onEach:$it") Log.d(TAG, "onEach thread = ${Thread.currentThread().name}") }.launchIn(CoroutineScope(SupervisorJob() + Dispatchers.Main.immediate)) //当然你能够自界说的dispatcher //lifecycleScope == CoroutineScope(SupervisorJob() + Dispatchers.Main.immediate) } } //输出成果: //map{}、onEach{}、flow{}运转在主线程 //filter{}运转在DefaultDispatcher -> DefaultDispatcher-worker-1
3.Flow下流->中止操作符
A. 在
Flow
中最常见的操作符便是collect
(),除此之外还有first()
、Last()
、single()
、fold{}
、reduce{}
。还有一个特殊的当Flow
调用toList
转化成调集后toList
后边的API都不再属于Flow
因而这也就阐明toList
也算是一种中止操作符4.Flow总结&特色
-
Flow的原理
1.Flow流程中为什么是冷流?
咱们来剖析下flow的源码
public fun <T> flow(
@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> =
SafeFlow(block)
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
flow是一个高阶函数,参数类型是FlowCollector.() -> Unit
,FlowCollector是它的扩展或许成员办法,没有参数也没有回来值,flow()函数的回来值是Flow,详细到回来类型是SafeFlow()
,SafeFlow()
是AbstractFlow()
的子类。
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
//注释1
public final override suspend fun collect(collector: FlowCollector<T>) {
//注释2
val safeCollector = SafeCollector(collector, coroutineContext)
try {
//注释3
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}
从AbstractFlow
源码中能够知道它完成了Flow
接口,这儿进一步知道了SafeFlow
间接的完成了Flow
接口。这儿深入了解一下AbstractFlow
做了哪些作业,经过上面三个注释进行解释:
-
注释1:这个
collect
便是demo中的collect
的调用。这儿做一个猜想:collect
的调用会触发上游Lambda中的emit()
函数的履行,然后将数据传递给collect
; -
注释2:这儿首要便是进行了安全查看,
SafeCollector
仅仅把collect
中的参数FlowCollector<T>
重新进行了封装,详细的安全查看是怎样样的稍后进行剖析; -
注释3:这儿调用了
collectSafely
这个笼统办法,而这儿的详细完成是在SafeFlow
中的collectSafely
,然后调用了collector.block()
,这儿其实便是调用了flow()
中的emit
办法,或许说collector.block()
调用了4次emit()
函数。
现在来总结一下为什么Flow
是冷的:FLow之所以是冷的是由于它的结构进程,在它的结构进程中结构了一个SafeFlow
目标但并不会触发Lambda表达式的履行,只要当collect()
被调用后Lambda表达式才开端履行所以它是冷的。
2.SafeCollector剖析
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER", "INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "UNCHECKED_CAST")
internal actual class SafeCollector<T> actual constructor(
//注释1
@JvmField internal actual val collector: FlowCollector<T>,
@JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
override val callerFrame: CoroutineStackFrame? get() = completion as? CoroutineStackFrame
override fun getStackTraceElement(): StackTraceElement? = null
@JvmField // Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector
internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
private var lastEmissionContext: CoroutineContext? = null
private var completion: Continuation<Unit>? = null
// ContinuationImpl
override val context: CoroutineContext
get() = completion?.context ?: EmptyCoroutineContext
override fun invokeSuspend(result: Result<Any?>): Any {
result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
completion?.resumeWith(result as Result<Unit>)
return COROUTINE_SUSPENDED
}
// Escalate visibility to manually release intercepted continuation
public actual override fun releaseIntercepted() {
super.releaseIntercepted()
}
/**
* 这是状况机重用的奇妙完成。
* 首要它查看它没有被并发运用(这儿是清晰制止的),
* 然后只缓存一个完成实例以防止在每次宣布时进行额定分配,使其在热路径上消除垃圾。
*/
//注释2
override suspend fun emit(value: T) {
return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
try {
//注释3
emit(uCont, value)
} catch (e: Throwable) {
//保存已抛出emit反常的现实,便于被上游
lastEmissionContext = DownstreamExceptionElement(e)
throw e
}
}
}
private fun emit(uCont: Continuation<Unit>, value: T): Any? {
val currentContext = uCont.context
currentContext.ensureActive()
//注释4
val previousContext = lastEmissionContext
if (previousContext !== currentContext) {
checkContext(currentContext, previousContext, value)
}
completion = uCont
//注释5
return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
}
private fun checkContext(
currentContext: CoroutineContext,
previousContext: CoroutineContext?,
value: T
) {
if (previousContext is DownstreamExceptionElement) {
exceptionTransparencyViolated(previousContext, value)
}
checkContext(currentContext)
lastEmissionContext = currentContext
}
private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) {
error("""
Flow exception transparency is violated:
Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
For a more detailed explanation, please refer to Flow documentation.
""".trimIndent())
}
}
//注释6
private val emitFun =
FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
注释1:这儿的collect
参数对应的便是Flow在结构时的匿名内部类FlowCollector
,在AS中看到的this:FlowCollector<Int>
便是它;
注释2:这个emit()
函数其实便是demo中用来发送数据的emit(0)...emit(3)
,这儿能够了解为Flow上游发送的数据终究会传递到这个emit()
中;
注释3:这儿的emit(uCont, value)
函数中多了两个参数,其间第一个是suspendCoroutineUninterceptedOrReturn
,它是一个高阶函数,是把挂起函数的Continuation
暴露出来并作为参数进行传递,第二个则是上游发送过来的数据。这儿还有一个反常捕获,反常被捕获后存储在lastEmissionContext
,作用是:在下流发送反常今后能够让上游感知到。下面会有一个对比;
注释4:这儿对当时协程上下文与之前的协程上下文进行对比,假如两者不一致就会履行checkContext()
,在它里边做出进一步的判别和提示;
注释5:这儿调用的便是下流的emit()
,也便是FlowCollector
中的emit
函数,这儿接纳的那个value
便是上游的emit(0)...emit(3)
传递过来的,能够了解成下面demo中的代码:
private fun flowOtherTest() {
runBlocking {
flow {
emit(0)
emit(1)
emit(2)
emit(3)
}.collect(object : FlowCollector<Int> {
//注释5调用的便是这个emit办法。
override suspend fun emit(value: Int) {
Log.d(TAG, "collect:$value")
}
})
}
}
注释6:这是函数引证的语法,代表了它便是FlowCollector
的 emit()
办法
3.FlowCollector 上游和下流之间的链接
public interface Flow<out T> {
public suspend fun collect(collector: FlowCollector<T>)
}
public fun interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
Flow供给了一个办法,让下流触发一个搜集动作collect,天经地义下流作为搜集者的FlowCollector也供给了一个emit办法,让下流来接纳上游发送数据emit()
在Flow的结构进程中结构了SafeFlow
目标而且间接的完成了Flow,Flow中的collect
便是中止操作,而collect
函数中的参数FlowCollector
中的emit()
函数则是下流用来接纳上游发送数据的emit()
。
这儿再来回顾一下SafeCollector
办法中所做的工作:首要collect
中止符的调用会触发上游Lambda中的emit()
函数履行,它将数据发送出去,然后进入到SafeCollector
中的emit()
函数,在这个函数中又将从上游数据发送到下流collect
中的emitFun()
函数中,这样就完成了衔接。所以说FlowCollector
是上下流之间的桥梁。(也能够了解为回调)
4.总结
1.Flow的调用进程分为三个步骤:
- 上游的Flow创立
SafeFlow
目标,下流的Flow的collect()
函数触发上游的emit()
函数履行开端发送数据; - 上游的
emit()
发送的数据进入到SafeCollector
,其实上游的emit()
函数调用的便是SafeCollector
中的emit()
函数; - 在
SafeCollector
中调用emitFun()
其实便是调用了下流的emit()
函数将数据传递给下流。
2.中心操作符:
假如有中心操作符的话,每个操作符都会有上游和下流,而且都是被下流触发履行,也会触发自己的上游,一起还会接纳来自上游的数据并传递给自己的下流;
-
ShardFlow与StateFlow(暖流)
上面咱们介绍到Flow是冷流。那么假如咱们要运用暖流,该怎样办呢?
答案是Flow 有供给相关完成: 那便是StateFlow 和 SharedFlow 。
StateFlow 和 SharedFlow 是暖流,出产数据不依赖顾客消费,暖流与顾客是一对多的联系,当有多个顾客时,它们之间的数据都是同一份。
1.SharedFlow与StateFlow的特色
A.SharedFlow
- 对于同一个数据流,能够答应有多个订阅者同享。
- 不调用
collect
搜集数据,也会开端发送数据。 - 答应缓存前史数据(默许是非粘性,当设置缓存数据后即可完成为粘性, replay = 1)
- 发送数据函数都是线程安全的。
- 不防抖。能够发送相同的值
- 无初始化值
B.StateFlow
- 都答应多个顾客
- 都有只读与可变类型
- 永久只保存一个状况值,会把最新值重现给订阅者,即粘性。
- 防抖。设置重复的值不会重新发送给订阅者
- 有必要传入初始值,确保值的空安全,永久有值
C. StateFlow与LiveData不同的是
- 强制要求初始默许值
- 支撑CAS形式赋值
- 默许支撑防抖 过滤
- value的空安全校验
-
Flow
丰厚的异步数据流操作 - 默许没有
Lifecycle
支撑,flow
的collect
是挂起函数,会一向等候数据流传递数据 -
线程安全,
LiveData
的postValue
尽管也可在异步运用,但会导致数据丢失。
LiveData除了对于Lifecycle
的支撑,StateFlow
基本都是处于优势
2.SharedFlow创立及运用
public fun <T> MutableSharedFlow(
//1.缓存的前史数据容量
replay: Int = 0,
//2.除前史数据外的额定缓存区容量
extraBufferCapacity: Int = 0,
//3.背压战略
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
...
val bufferCapacity0 = replay + extraBufferCapacity
val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0
return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}
public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
//4.线程安全的挂机函数发送数据
override suspend fun emit(value: T)
//5.线程安全的测验发送数据
public fun tryEmit(value: T): Boolean
//6.同享数据流的订阅者数量
public val subscriptionCount: StateFlow<Int>
...
}
注释1 : replay 表明前史元素缓存区容量。
- 能够将最新的数据缓存到调集内,当时史缓存区满后,会移除最早的元素。
- 当在新顾客订阅了该数据流,会先将前史缓存区元素顺次发送给新的顾客,然后才发送新元素。
注释2:extraBufferCapacity:MutableSharedFlow 缓存的数据个数为 replay + extraBufferCapacity; 缓存一方面用于粘性事情的发送,另一方面也为了处理背压问题,既下流的顾客的消费速度低于上游出产者的出产速度时,数据会被放在缓存中。。
注释3:onBufferOverflow:背压处理战略,缓存区满后怎样处理(挂起或丢掉数据),默许挂起。留意,当没有订阅者时,只要最近 replay 个数的数据会存入缓存区,不会触发 onBufferOverflow 战略。
private fun sharedFlow(){
runBlocking {
val _sharedFlow = MutableSharedFlow<Int>()
val sharedFlow = _sharedFlow.asSharedFlow()
lifecycleScope.launch(Dispatchers.IO) {
for(i in 0..50){
Log.d(TAG, "emit:$i")
_sharedFlow.emit(i)
delay(50)
}
}
}
}
输出成果:
//emit:0
//emit:1
// ...
//emit:50
尽管此时并没有顾客订阅,但依旧会履行发送数据操作,仅仅现在没有设置前史缓存,一切数据都被”扔掉”了。
再让咱们看看设置replay = 3的状况。这时分订阅者会收到3个值.
private fun sharedFlow(){
runBlocking {
val _sharedFlow = MutableSharedFlow<Int>(replay = 3)
val sharedFlow = _sharedFlow.asSharedFlow()
lifecycleScope.launch(Dispatchers.IO) {
for(i in 0..50){
Log.d(TAG, "emit:$i")
_sharedFlow.emit(i)
delay(50)
}
}
delay(5000)
//推迟5000毫秒去订阅,其实这时分流现已发送完了.相当于新的订阅者
sharedFlow.onEach {
Log.d(TAG, "onEach:$it")
}.launchIn(lifecycleScope)
}
}
输出成果:
//emit:0
//emit:1
// ...
//emit:50
//onEach:48
//onEach:49
//onEach:50
Flow冷流转化成SharedFlow暖流
private fun shareIn(){
runBlocking {
flowOf(0,1,2,3,4).shareIn(lifecycleScope, SharingStarted.WhileSubscribed())
}
}
public fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T> {
val config = configureSharing(replay)
val shared = MutableSharedFlow<T>(
replay = replay,
extraBufferCapacity = config.extraBufferCapacity,
onBufferOverflow = config.onBufferOverflow
)
@Suppress("UNCHECKED_CAST")
val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
return ReadonlySharedFlow(shared, job)
}
这儿started
表明新创立的同享数据流的发动与中止战略。
- Eagerly->当即开端发送数据源。而且消费端永久搜集数据,只会收到前史缓存和后续新数据,直到地点协程撤销。
- Lazily->a.等候呈现第一个顾客订阅后,才开端发送数据源。确保第一个顾客能收到一切数据,但后续顾客只能收到前史缓存和后续数据。b.顾客会永久等候搜集数据,直到地点协程撤销
- WhileSubscribed->a.能够说是
Lazily
战略的进阶版,同样是等候第一个顾客订阅后,才开端发送数据源。
b.但其能够装备在终究一个订阅者关闭后,同享数据流上游中止的时刻(默许为当即中止),与前史数据缓存清 空时刻(默许为永久保存)。
需求留意,在运用shareIn
每次都会创立一个新 SharedFlow
实例,而且该实例会一向保存在 内存 中,直到被垃圾回收。所以最好减少转化流的履行次数,不要在函数内每次都调用这类函数。
3.SharedFlow原理
经过MutableSharedFlow
工厂函数创立的SharedFlow
,内部实践是创立了SharedFlowImpl
目标,是运用数组缓存一切数据。
internal open class SharedFlowImpl<T>(
private val replay: Int, // replayCache的最大容量
private val bufferCapacity: Int, // buffered values的最大容量
private val onBufferOverflow: BufferOverflow // 溢出战略
) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
// 缓存数组,用于保存emit办法发射的数据,在需求时进行初始化
private var buffer: Array<Any?>? = null
// 新的订阅者从replayCache中获取数据的开端方位
private var replayIndex = 0L
// 当时一切的订阅者从缓存数组中获取的数据中,对应方位最小的索引
// 假如没有订阅者,则minCollectorIndex的值等于replayIndex
private var minCollectorIndex = 0L
// 缓存数组中buffered values缓存数据的数量
private var bufferSize = 0
// 缓存数组中queued emitters缓存数据的数量
private var queueSize = 0
// 当时缓存数组的开端方位
private val head: Long get() = minOf(minCollectorIndex, replayIndex)
// 当时缓存数组中replayCache缓存数据的数量
private val replaySize: Int get() = (head + bufferSize - replayIndex).toInt()
// 当时缓存数组中现已缓存的数据的数量
private val totalSize: Int get() = bufferSize + queueSize
// 当时缓存数组中buffered values的最末尾方位索引的后一位
private val bufferEndIndex: Long get() = head + bufferSize
// 当时数组中queued emitters的最末尾方位索引的后一位
private val queueEndIndex: Long get() = head + bufferSize + queueSize
...
//2.tryEmit
override fun tryEmit(value: T): Boolean {
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
val emitted = synchronized(this) {
if (tryEmitLocked(value)) {
resumes = findSlotsToResumeLocked(resumes) //寻觅需求康复的挂起函数
true
} else {
false
}
}
for (cont in resumes) cont?.resume(Unit) //康复履行挂起函数,发送数据
return emitted
}
//1.emit
override suspend fun emit(value: T) {
if (tryEmit(value)) return // fast-path 先测验发送数据
emitSuspend(value) //无法发送数据时先创立挂起函数
}
}
-
发送数据
SharedFlowImpl
这个类第一次看起来有点多,这儿先从emit
与tryEmit
看起,看看是如何完成发送数据。
在tryEmit
内部经过synchronized
加锁,是线程安全的。
注释1:emit
办法会先进行一次tryEmit
的处理,当回来false的时分再进行suspend的发送操作
注释2:经过前面对tryEmit办法的注释判别缓存战略为 BufferOverflow.SUSPEND
才有可能为true,所以再看tryEmitLocked
办法
//从这儿能够看到,当战略选用suspended一起缓存溢出的时分,回来false,否则,永久回来true,一起做一些事情上的入队处理等
@Suppress("UNCHECKED_CAST")
private fun tryEmitLocked(value: T): Boolean {
//注释1.--------没有检测到订阅者(collect搜集器)
if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) // always returns true
//注释2.--------检测到订阅者(collect搜集器)假如当时有订阅者,一起buffered values已达到最大容量。
if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
when (onBufferOverflow) {
BufferOverflow.SUSPEND -> return false // will suspend,只要这儿才会回来true
BufferOverflow.DROP_LATEST -> return true // just drop incoming
BufferOverflow.DROP_OLDEST -> {} // force enqueue & drop oldest instead
}
}
//注释3.--------buffered values还能够持续添加数据, 将数据加入到缓存数组中
// 这儿由于tryEmit办法不会挂起emit办法地点的协程,
// 所以value没有被封装成Emitter类型的目标
enqueueLocked(value)
bufferSize++ // value was added to buffer
// drop oldest from the buffer if it became more than bufferCapacity
//注释4.-------- 假如buffered values的数据数量超越最大容量的约束,
//则调用dropOldestLocked办法,丢掉最旧的数据
if (bufferSize > bufferCapacity) dropOldestLocked()
// keep replaySize not larger that needed
//注释5.-------- 假如replayCache中数据的数量超越了最大容量
if (replaySize > replay) { // increment replayIndex by one
// 更新replayIndex的值,replayIndex向前移动一位
updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
}
return true
}
- 从
tryEmitLocked
办法代码注释1
处可知: 由于是暖流tryEmitNoCollectorsLocked
办法便是没有订阅者的状况
private fun tryEmitNoCollectorsLocked(value: T): Boolean {
assert { nCollectors == 0 }
//replay == 0直接回来,不需求去缓存
if (replay == 0) return true // no need to replay, just forget it now
//将值入队
enqueueLocked(value) // enqueue to replayCache
//缓存加1
bufferSize++ // value was added to buffer
// drop oldest from the buffer if it became more than replay
//假如缓存满了,就删除最老的那一条数据
// 假如buffered values的数据数量超越了replayCache的最大容量
// 则丢掉最旧的数据
// 由于新订阅者只会从replayCache中取数据,
// 假如没有订阅者,buffered values的数据数量超越replayCache的最大容量没有意义
if (bufferSize > replay) dropOldestLocked()
//重新设置订阅者的值方位索引
minCollectorIndex = head + bufferSize // a default value (max allowed)
return true
}
// enqueues item to buffer array, caller shall increment either bufferSize or queueSize
private fun enqueueLocked(item: Any?) {
//totalSize = 缓冲值的数目+排队发射器的数量
val curSize = totalSize
val buffer = when (val curBuffer = buffer) {
// 缓存数组为空,则进行初始化,初始化容量为2
null -> growBuffer(null, 0, 2)
// 假如超越了当时缓存数组的最大容量,则进行扩容,新的缓存数组的容量为之前的2倍
// growBuffer办法会把原来缓存数组的数据填充到新的缓存数组中
else -> if (curSize >= curBuffer.size) growBuffer(curBuffer, curSize,curBuffer.size * 2) else curBuffer
}
buffer.setBufferAt(head + curSize, item)
}
从tryEmitLocked办法代码注释2
处可知现在是有订阅者的状况:
1.bufferSize >= bufferCapacity
缓存数组中buffered values缓存数据的数量 >= buffered values的最大容量。
2.minCollectorIndex <= replayIndex
当时一切的订阅者从缓存数组中获取的数据中,对应方位最小的索引<=新的订阅者从replayCache中获取数据的开端方位
到此tryEmitLocked办法剖析完了。回到tryEmit办法
override fun tryEmit(value: T): Boolean {
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
val emitted = synchronized(this) {
if (tryEmitLocked(value)) {
//搜集现已挂起的订阅者的续体
resumes = findSlotsToResumeLocked(resumes)
true
} else {
false
}
}
// 遍历,引发挂起的订阅者
for (cont in resumes) cont?.resume(Unit)
return emitted
}
findSlotsToResumeLocked
private fun findSlotsToResumeLocked(resumesIn: Array<Continuation<Unit>?>): Array<Continuation<Unit>?> {
// 引证参数中的续体数组
var resumes: Array<Continuation<Unit>?> = resumesIn
// 用于记载需求康复的续体的数量
var resumeCount = resumesIn.size
// 遍历订阅者数组
forEachSlotLocked loop@{ slot ->
// 获取续体,假如续体为空,阐明对应订阅者的协程没有挂起,本次循环回来
val cont = slot.cont ?: return@loop
// 判别slot中index是否符合要求
// 假如不符合要求,则本次循环回来
if (tryPeekLocked(slot) < 0) return@loop
// 假如需求康复的续体的数量超越续体数组的容量,则进行扩容
// 新的续体数组的容量是之前续体数组容量的2倍
if (resumeCount >= resumes.size) resumes = resumes.copyOf(maxOf(2, 2 * resumes.size))
// 保存续体到续体数组中
resumes[resumeCount++] = cont
// 清空slot中保存的续体
slot.cont = null
}
// 回来搜集完的续体数组
return resumes
}
到这儿非 挂起 的发送流程结束,下面看看挂起发送流程
override suspend fun emit(value: T) {
//非挂起
if (tryEmit(value)) return // fast-path
//挂起
emitSuspend(value)
}
挂起调用emitSuspend
办法,这儿也调用了上面提到的findSlotsToResumeLocked
办法
private suspend fun emitSuspend(value: T) =
// 直接挂起emit办法地点的协程,获取续体
suspendCancellableCoroutine<Unit> sc@{ cont ->
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
// 加锁
val emitter = synchronized(this) lock@{
// 这儿再次测验以tryEmit的办法发射数据
if (tryEmitLocked(value)) {
// 假如发射成功,则康复续体的履行
cont.resume(Unit)
// 搜集现已挂起的订阅者的续体
resumes = findSlotsToResumeLocked(resumes)
// 回来
return@lock null
}
// 将续体、待发射的数据等封装成Emitter类型的目标
Emitter(this, head + totalSize, value, cont).also {
// 加入到缓存数组中
enqueueLocked(it)
// queued emitters的数据的数量加1
queueSize++
// 假如buffered values的最大容量为0,即不存在
// 则搜集现已挂起的订阅者的续体,保存到局部变量resumes中
if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
}
}
// emitter目标监听emit办法地点协程的撤销
// 产生撤销时会调用emitter目标的dispose办法
emitter?.let { cont.disposeOnCancellation(it) }
// 遍历,引发挂起的订阅者
for (cont in resumes) cont?.resume(Unit)
}
sharedFlow设置了缓存优先从replayCache
取数据.SharedFlowImpl类完成了SharedFlow接口,重写了其间的常量replayCache
,当有新订阅者呈现时,假如replayCache
存在,而且有缓存数据,则优先从replayCache中获取,代码如下:
override val replayCache: List<T>
// 只能获取,不能设置,加锁
get() = synchronized(this) {
// 获取当时replayCache中缓存数据的数量
val replaySize = this.replaySize
// 假如数量为0,则回来一个空列表
if (replaySize == 0) return emptyList()
// 若数量不为0,则依据容量创立一个列表
val result = ArrayList<T>(replaySize)
// 获取缓存数组
val buffer = buffer!!
// 遍历replayCache,将数据进行类型转化,并添加到列表中
@Suppress("UNCHECKED_CAST")
for (i in 0 until replaySize) result += buffer.getBufferAt(replayIndex + i) as T
// 回来列表
result
}
-
collect流程(承受数据)
SharedFlowImpl.kt
@Suppress("UNCHECKED_CAST")
override suspend fun collect(collector: FlowCollector<T>) {
// 为当时的订阅者分配一个SharedFlowSlot类型的目标
val slot = allocateSlot()
try {
// 假如collector类型为SubscribedFlowCollector,
// 阐明订阅者监听了订阅进程的发动,则先回调
if (collector is SubscribedFlowCollector) collector.onSubscription()
// 获取订阅者地点的协程
val collectorJob = currentCoroutineContext()[Job]
// 死循环
while (true) {
var newValue: Any?
// 死循环
while (true) {
// 从缓存数组中获取数据
newValue = tryTakeValue(slot)
// 假如获取数据成功,则跳出循环
if (newValue !== NO_VALUE) break
// 走到这儿,阐明获取数据失利,
// 挂起订阅者地点协程,等候新数据的到来
awaitValue(slot)
}
// 走到这儿,阐明现已获取到了数据
// 判别订阅者地点协程是否是存活的,假如不是则抛出反常
collectorJob?.ensureActive()
// 进行类型转化,并向下流发射数据
collector.emit(newValue as T)
}
} finally {
// 开释已分配的SharedFlowSlot类型的目标
freeSlot(slot)
}
}
@SharedImmutable
@JvmField
internal val NO_VALUE = Symbol("NO_VALUE")
在collect办法中,经过tryTakeValue办法获取数据,代码如下:
private fun tryTakeValue(slot: SharedFlowSlot): Any? {
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
// 加锁
val value = synchronized(this) {
// 从slot中获取index
// index表明当时应该从缓存数组的index方位中获取数据
val index = tryPeekLocked(slot)
// 假如index小于0,阐明没有数据
if (index < 0) {
// 回来空数据标识
NO_VALUE
} else { // 假如有数据
// 获取当时的slot的index
val oldIndex = slot.index
// 从缓存数组的index处获取数据
val newValue = getPeekedValueLockedAt(index)
// 计算下一次获取数据的方位,并保存到slot中
slot.index = index + 1
// 更新缓存数组的方位,并获取缓存数组与订阅者数组中可康复的续体
resumes = updateCollectorIndexLocked(oldIndex)
// 回来获取的数据
newValue
}
}
// 遍历,康复续体
for (resume in resumes) resume?.resume(Unit)
// 回来获取的数据
return value
}
@JvmField
@SharedImmutable
internal val EMPTY_RESUMES = arrayOfNulls<Continuation<Unit>?>(0)
在tryTakeValue办法,获取数据之前,首要会调用tryPeekLocked办法,判别数据地点的方位是否符合要求,代码如下:
private fun tryPeekLocked(slot: SharedFlowSlot): Long {
// 从slot中获取index
val index = slot.index
// 假如是在buffered values中获取,则直接回来
if (index < bufferEndIndex) return index
// 走到这儿阐明是要在queued emitters中获取,
// 假如buffered values的最大容量大于0,则回来-1
// 在buffered values能够存在的状况下,制止发射者和订阅者接触
if (bufferCapacity > 0) return -1L
// 走到这儿阐明要在queued emitters中获取,一起buffered values的最大容量为0
// 这种状况缓存数组只能有queued emitters,
// 因而,只能处理queued emitters中的第一个Emitter类型的目标
// 假如当时订阅者想要处理下一个Emitter类型的目标,则回来-1
if (index > head) return -1L
// 走到这儿阐明要在queued emitters中获取,一起buffered values的最大容量为0
// 而且要获取当时的正在处理的Emmiter类型的目标
// 假如queued emitters为空,阐明当时没有Emmiter类型的目标,则回来-1
if (queueSize == 0) return -1L
// 满意上述要求,回来index
return index
}
假如数据地点的方位符合要求,则会调用getPeekedValueLockedAt办法获取数据,代码如下:
private fun getPeekedValueLockedAt(index: Long): Any? =
// 从缓存数组中index方位获取数据
when (val item = buffer!!.getBufferAt(index)) {
// 假如是Emitter类型的,则进行拆箱,获取数据
is Emitter -> item.value
// 直接回来
else -> item
}
Emitter类是SharedFlowImpl类的内部类,用于在挂起调用emit办法地点的协程后,对emit办法发射的数据及挂起后的续体进行封装,代码如下:
private class Emitter(
@JvmField val flow: SharedFlowImpl<*>,
@JvmField var index: Long, // 当时目标在缓存数组中的方位
@JvmField val value: Any?,// emit办法发射的数据
@JvmField val cont: Continuation<Unit> // 挂起的续体
) : DisposableHandle {
override fun dispose() = flow.cancelEmitter(this)
}
在collect办法中,当订阅者无数据可获取时,则会调用awaitValue办法,挂起订阅者地点的协程,代码如下:
private suspend fun awaitValue(slot: SharedFlowSlot): Unit =
// 直接挂起订阅者地点的协程
suspendCancellableCoroutine { cont ->
// 加锁
synchronized(this) lock@{
// 再次查看当时的index是否满意要求
val index = tryPeekLocked(slot)
// 假如确实不满意要求
if (index < 0) {
// 保存续体到slot中
slot.cont = cont
} else { // 假如再次查看发现index这时满意要求
// 则康复挂起,并回来
cont.resume(Unit)
return@lock
}
// 保存续体到slot中
slot.cont = cont
}
}
到此SharedFlow就剖析完成了。
咱们知道SharedFlow内部是依据数组+synchronized,订阅分发的逻辑则是轮询取出内部缓存数组的数据,没有数据则将订阅挂起,直到上游再次发送数据
-
StateFlow创立及运用
作为SharedFlow
的子类,StateFlow
在运用上与其父类基本相同。
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
public override var value: T
public fun compareAndSet(expect: T, update: T): Boolean
}
StateFlow是一种单数据更新的暖流,经过emit办法更新StateFlow的数据,经过value特点能够获取当时的数据
上面同样是利用同名工厂函数的进行创立,仅仅比较SharedFlow,StateFlow有必要设置默许初始值。
private fun stateFlow(){
runBlocking {
val _stateFlow = MutableStateFlow(3)
val stateFlow = _stateFlow.asStateFlow()
//没有发送数据时当即订阅
stateFlow.onEach {
Log.d(TAG, "onEach:$it")
}.launchIn(lifecycleScope)
//模拟发送数据
lifecycleScope.launch(Dispatchers.IO) {
for(i in 3..50){
Log.d(TAG, "emit:$i")
_stateFlow.emit(i)
delay(50)
}
}
//推迟5000毫秒去订阅,其实这时分流现已发送完了.相当于新的订阅者
delay(5000)
//新的订阅者
stateFlow.onEach {
Log.d(TAG, "new onEach:$it")
}.launchIn(lifecycleScope)
}
}
//输出成果:
//onEach:3
//emit:3
//emit:4
//onEach:4
//emit:5
//onEach:5
//...
//新的订阅者直接输出
//new onEach:50
从上面的输出成果看出:
-
在没有发送数据时订阅,会先接纳默许值。
-
而新发送的数据后,由于第一个值与原有值相同,直接被过滤掉了。在StateFlow中,经过Any#equals办法来判别前后两个数据是否持平。当时后两个数据持平时,数据不会被更新,订阅者也不会处理。
-
StateFlow有必要要有一个初始值。当新订阅者呈现时,StateFlow会将最新的数据发射给订阅者。StateFlow只保存终究发射的数据,除此之外不会缓存任何其他的数据。一起,StateFlow不支撑resetReplayCache办法。
Flow 冷流 转化成StateFlow暖流
StateFlow同样也有由Flow冷流转化为暖流的操作符函数stateIn。
private fun stateIn(){
flowOf(0,1,2,3,4).stateIn(lifecycleScope, SharingStarted.WhileSubscribed(),0)
}
与shareIn
函数的差异仅仅有必要设置默许值,stateIn
转化的同享数据流只缓存一个最新值。这儿不多说
-
StateFlow的原理
1.订阅者的管理类->StateFlowSlot
上面剖析SharedFlow原理剖析中没有单独列出来说SharedFlowSlot是SharedFlowImpl的订阅者的管理类。这儿阐明一下:SharedFlowImpl类中,订阅者数组中存储的目标类型为SharedFlowSlot,而在StateFlowImpl类中,订阅者数组存储的目标类型为StateFlowSlot。
@SharedImmutable
private val NONE = Symbol("NONE") // 状况标识
@SharedImmutable
private val PENDING = Symbol("PENDING") // 状况标识
private class StateFlowSlot : AbstractSharedFlowSlot<StateFlowImpl<*>>() {
// _state中默许值为null
private val _state = atomic<Any?>(null)
...
}
依据_state保存目标的不同,能够确认StateFlowSlot类型的目标的状况。StateFlowSlot类型的目标共有四种状况:
- null:假如_state保存的目标为空,表明当时StateFlowSlot类型的目标没有被任何订阅者运用。
- NONE:假如_state保存的目标为NONE标识,表明当时StateFlowSlot类型的目标现已被对应的订阅者运用,但既没有挂起,也没有在处理当时的数据。
- PENDING:假如_state保存的目标为PENDING标识,表明当时StateFlowSlot类型的目标现已被对应的订阅者运用,而且将开端处理当时的数据。
- CancellableContinuationImpl :假如_state保存的目标为续体,表明当时StateFlowSlot类型的目标现已被对应的订阅者运用,可是订阅者已处理完当时的数据,地点的协程已被挂起,等候新的数据到来。
在StateFlowSlot类中,重写了AbstractSharedFlowSlot类的allocateLocked办法与freeLocked办法,两个办法会对订阅者的初始状况和终究状况进行改变,代码如下:
// 新订阅者申请运用当时的StateFlowSlot类型的目标
override fun allocateLocked(flow: StateFlowImpl<*>): Boolean {
// 假如_state保存的目标不为空,
// 阐明当时StateFlowSlot类型的目标现已被其他订阅者运用
// 回来false
if (_state.value != null) return false
// 走到这儿,阐明没有被其他订阅者运用,分配成功
// 修正状况值为NONE
_state.value = NONE
// 回来true
return true
}
// 订阅者开释现已运用的StateFlowSlot类型的目标
override fun freeLocked(flow: StateFlowImpl<*>): Array<Continuation<Unit>?> {
// 修正状况值为null
_state.value = null
// 回来空数组
return EMPTY_RESUMES
}
@JvmField
@SharedImmutable
internal val EMPTY_RESUMES = arrayOfNulls<Continuation<Unit>?>(0)
为了完成上述对订阅者状况的管理,在StateFlowSlot类中,还额定供给了三个办法用于完成对订阅者的状况的切换,代码如下:
// 当有状况更新成功时,会调用makePending办法,告诉订阅者能够开端处理新数据
@Suppress("UNCHECKED_CAST")
fun makePending() {
// 依据当时状况判别
_state.loop { state ->
when {
// 假如未被订阅者运用,则直接回来
state == null -> return
// 假如现已处于PENDING状况,则直接回来
state === PENDING -> return
// 假如当时状况为NONE
state === NONE -> {
// 经过CAS的办法,将状况修正为PENDPENDING,并回来
if (_state.compareAndSet(state, PENDING)) return
}
// 假如为挂起状况
else -> {
// 经过CAS的办法,将状况修正为NONE
if (_state.compareAndSet(state, NONE)) {
// 假如修正成功,则康复对应续体的履行,并回来
(state as CancellableContinuationImpl<Unit>).resume(Unit)
return
}
}
}
}
}
// 当订阅者每次处理完新数据(不一定处理成功)后,会调用takePending办法,表明完成处理
// 获取当时的状况,并修正新状况为NONE
fun takePending(): Boolean = _state.getAndSet(NONE)!!.let { state ->
assert { state !is CancellableContinuationImpl<*> }
// 假如之前的状况为PENDING,则回来true
return state === PENDING
}
// 当订阅者没有新数据需求处理时,会调用awaitPending办法挂起
@Suppress("UNCHECKED_CAST")
// 直接挂起,获取续体
suspend fun awaitPending(): Unit = suspendCancellableCoroutine sc@ { cont ->
assert { _state.value !is CancellableContinuationImpl<*> }
// 经过CAS的办法,将当时的状况修正为挂起,并回来
if (_state.compareAndSet(NONE, cont)) return@sc
// 走到这儿代表状况修正失利,阐明又发射了新数据,当时的状况被修正为PENDING
assert { _state.value === PENDING }
// 引发订阅者续体的履行
cont.resume(Unit)
}
-
发送数据
在StateFlowImpl类中,当需求发射数据时,能够调用emit办法、tryEmit办法、compareAndSet办法,代码如下:
override fun tryEmit(value: T): Boolean {
this.value = value
return true
}
override suspend fun emit(value: T) {
this.value = value
}
override fun compareAndSet(expect: T, update: T): Boolean =
updateState(expect ?: NULL, update ?: NULL)
@Suppress("UNCHECKED_CAST")
public override var value: T
// 拆箱
get() = NULL.unbox(_state.value)
// 更新数据
set(value) { updateState(null, value ?: NULL) }
// 拆箱操作
@Suppress("UNCHECKED_CAST", "NOTHING_TO_INLINE")
inline fun <T> unbox(value: Any?): T = if (value === this) null as T else value as T
能够发现,无论是经过emit办法、tryEmit办法仍是compareAndSet办法,终究都是经过updateState办法完成数据的更新,代码如下:
// sequence是一个全局变量,当新的数据更新时,sequence会产生变化
// 当sequence为奇数时,表明当时数据正在更新
private var sequence = 0
// CAS办法更新当时数据的值
private fun updateState(expectedState: Any?, newState: Any): Boolean {
var curSequence = 0
// 获取一切的订阅者
var curSlots: Array<StateFlowSlot?>? = this.slots
// 加锁
synchronized(this) {
// 获取当时数据的值
val oldState = _state.value
// 假如等待数据不为空,一起当时数据不等于等待数据,则回来false
if (expectedState != null && oldState != expectedState) return false
// 假如新数据与老数据相同,即前后数据没有产生变化,则直接回来true
if (oldState == newState) return true
// 更新当时数据
_state.value = newState
// 获取全局变量
curSequence = sequence
// 假如为偶数,阐明updateState办法没有被其他协程调用,没有并发
if (curSequence and 1 == 0) {
// 自添加1,表明当时正在更新数据
curSequence++
// 将新值保存到全局变量中
sequence = curSequence
} else { // 假如为奇数,阐明updateState办法正在被其他协程调用,处于并发中
// 加2后不改变奇偶性,仅仅表明当时数据产生了变化
sequence = curSequence + 2
// 回来true
return true
}
// 获取当时一切的订阅者
curSlots = slots
}
// 走到这儿,阐明上面不是并发调用updateState办法的状况
// 循环,告诉订阅者
while (true) {
// 遍历,修正订阅者的状况,告诉订阅者
curSlots?.forEach {
it?.makePending()
}
// 加锁,判别在告诉订阅者的进程中,数据是否又被更新了
synchronized(this) {
// 假如数据没有被更新
if (sequence == curSequence) {
// 加1,让sequence变成偶数,表明更新完毕
sequence = curSequence + 1
// 回来true
return true
}
// 假如数据有被更新,则获取sequence和订阅者
// 再次循环
curSequence = sequence
curSlots = slots
}
}
}
-
承受数据
调用StateFlow类型目标的collect办法,会触发订阅进程,接纳emit办法发送的数据,这部分在 StateFlowImpl中完成,代码如下:
override suspend fun collect(collector: FlowCollector<T>) {
// 为当时的订阅者分配一个StateFlowSlot类型的目标
val slot = allocateSlot()
try {
// 假如collector类型为SubscribedFlowCollector,
// 阐明订阅者监听了订阅进程的发动,则先回调
if (collector is SubscribedFlowCollector) collector.onSubscription()
// 获取订阅者地点的协程
val collectorJob = currentCoroutineContext()[Job]
// 局部变量,保存上一次发射的数据,初始值为null
var oldState: Any? = null
// 死循环
while (true) {
// 获取当时的数据
val newState = _state.value
// 判别订阅者地点协程是否是存活的,假如不是则抛出反常
collectorJob?.ensureActive()
// 假如订阅者是第一次处理数据或许当时数据与上一次数据不同
if (oldState == null || oldState != newState) {
// 将数据发送给下流
collector.emit(NULL.unbox(newState))
// 保存当时发射数据到局部变量
oldState = newState
}
// 修正状况,假如之前不是PENGDING状况
if (!slot.takePending()) {
// 则挂起等候新数据更新
slot.awaitPending()
}
}
} finally {
// 开释已分配的StateFlowSlot类型的目标
freeSlot(slot)
}
}
-
新订阅者怎样获取获取缓存数据呢
当新订阅者呈现时,StateFlow会将当时最新的数据发送给订阅者。能够经过调用StateFlowImpl类重写的常量replayCache获取当时最新的数据,代码如下:
override val replayCache: List<T>
get() = listOf(value)
看到这儿就能明白为什么新订阅者能拿到数据。由于每次都是获取最新的value..而且还不能清楚缓存。假如清楚就会报错
override fun resetReplayCache() {
throw UnsupportedOperationException("MutableStateFlow.resetReplayCache is not supported")
}
到此咱们常用的sharedFlow和stateFlow就剖析完了。。。。
当然我这儿剖析sharedFlow和stateFlow里边的原理仍是没有剖析很细心。仅仅依据注释和流程对它们的有所了解。后续假如咱们开发起来遇到问题就能很快去翻看源码找到答案。。用起来也能很随心应手~~~