Flow 流啊流,游啊游,数据你该流向何方?
冷流运用
冷流是指一种不自动发生数据的流,只要在被下流订阅时才开端发生数据,而且当多个流组合在一起,上游是冷流下流是暖流时,调用屡次上游冷流emit发送数据,下流暖流只会收到一次冷流发送的数据,也便是最开端那次,后续的都收不到,这点后边会具体讲。冷流运用如下
暖流运用
暖流和冷流相反,暖流会自动发生数据并把数据放入缓冲区,不管下流订不订阅暖流都存有数据,根据运用场景有SharedFlow 和 StateFlow , StateFlow承继自SharedFlow ,而且StateFlow 里边有个value ,如下
关于StateFlow 这个value 的作用后边具体剖析,咱们先来看下SharedFlow 同享暖流 ,为什么叫同享流后边看完的剖析就能理解其同享的意义了,咱们先来看看SharedFlow的运用吧。官方给咱们供给了MutableSharedFlow 办法 ,用来生成一个可读可写的同享暖流,可读是指下流可订阅collect , 可写是指上游可发送数据emit,如下
MutableSharedFlow 运用
办法中三个参数意义如下,
- replay:重读replay的数据项数量,默以为 0。当有新的订阅者参加时,会从缓冲区取最近replay个数据项重新发送给它们,所以这个也能够理解为缓冲区的根底容量。
- extraBufferCapacity:指额定缓冲区容量,默以为 0。当缓冲区超越根底容量时,能够运用额定的缓冲容量来存储数据项。
- onBufferOverflow:指定缓冲区溢出时的处理战略,默以为 BufferOverflow.SUSPEND。能够选择 BufferOverflow.DROP_OLDEST 或 BufferOverflow.DROP_LATEST,以确定要丢掉最旧的数据项仍是最新的数据项,仍是选择挂起等候有空间不丢掉任何数据。
总结一下: 最大缓冲区容量 = replay +extraBufferCapacity。 缓冲区溢出战略有三种,丢掉新的(丢掉新的指不让新的参加缓存区了,而不是指从缓冲区移除最前面现已缓存了的)和丢掉旧的的和都不丢掉挂起等候。
好了,了解清楚了之后咱们来看个比如。
咱们运用MutableSharedFlow 来生成一个可读可写的同享暖流,将参数replay =3 , extraBufferCapacity = 0 ,onBufferOverflow = BufferOverflow.DROP_LATEST ,标明指定订阅者订阅的时分从缓冲区中读取三个数据,以及额定缓冲区为 0 ,此刻最大缓冲区 = 0 + 3 =3, 缓冲区满了战略为不允许新的参加。所以不出意外的话此刻运转应该会打印1,2,3,实践出真知,咱们运转一把看看。
看了实践运转成果,仍是出了意外了,两个下流都会收到 4 ,5 ,6 ,为什么会这样呢?read fuck source code,发现由于此刻还没有订阅者,所以extraBufferCapacity , onBufferOverflow 这两个参数此刻是没作用的 ,当没有下流订阅者的时分,不管emit 多少个数据,都只会缓存replay个最近的数据,超越replay了会把前面老的已缓存的数据丢掉,源码如下 。
看源码也很简单理解,nCollectors =0 标明订阅者数量为0, 不走下面溢出逻辑, 而且当bufferSize > replay,会把前面的老的数据丢掉。
当咱们把订阅者collect 挪到 emit 之前,就会呈现咱们之前预料的答案了,两个订阅者只会打印1,2,3
而且这种状况不管你把for循环中5改成10000或许多大,下流都只会收到1,2,3,由于后边的数据不会放入缓冲区中。
MutableSharedFlow添加最大缓冲区的问题
咱们再改一下这个比如,把for循环最大改成1000,然后把溢出战略改成 BufferOverflow.DROP_OLDEST 移除旧的,然后额定容量加个4,此刻最大缓冲区容量为7,这个时分会打印什么?两个搜集者会打印1到1000,仍是打印994到1000这七个?
实践运转两个下流订阅者会打印994到1000
由此可见上游同一时间emit 1000次数据,下流并不会立即收到,而是先把值放入缓冲区,然后同等一时间段内1000次emit都完毕后再从缓冲区拿数据发送给下流。这儿由于最大缓冲区最大值为7个,所以只打印七个数据。
有没有让下流收到1000次数据呢?当然是有的,只在 for 循环中加个delay(1)就行了,有没有不加delay(1)更高雅的处理办法呢?
不加delay 把额定缓冲区改成1000或许Int.MaxValue,然后溢出战略改成挂起/移除,能够吗?按理说这种状况应该是最好的,可是实践试了之后,成果并非如此,运转了十几次,只要2-3次两个搜集器都收到1到1000的状况,大部分状况都是一个搜集器收到1000另一个呈现数据丢掉的状况,不按套路出牌啊,不是说好溢出战略为挂起就不会呈现数据丢掉吗?办法没用对?将tryEmit改成emit ,令人遗憾成果仍是相同数据丢掉。
如何在不加delay确保数据不丢掉呢?添加根底容量replay 能够吗,经测验把replay 改成2000也仍是会有数据丢掉的状况。
最后测验把最大缓冲区改成1或许0 ,即0<=replay+extraBufferCapacity<=1,然后溢出战略改成挂起,这种也是默许构建同享暖流的办法 ,用emit就不会呈现数据丢掉状况了,所以通过增大缓冲区来减少数据丢掉反而有点南辕北辙了。
总结一下
假如暖流MutableSharedFlow,添加了最大缓冲区,而且最大缓冲区大于1,当一次性发送很多数据的时分,会呈现数据丢掉的状况,这种状况也不是多线程的原因导致的,上述测验代码都跑在主线程中。关于SharedFlow就先了解到这儿,接下来看下StateFlow。
StateFlow 运用
官方给我供给了MutableStateFlow办法 ,参数能够传入恣意值。
是不是当这个值/地址改变了,下流就会收到,没改变就不会收到?下流一订阅就会收到这个传入的默许值?写个比如试试
果不其然和咱们想象相同,与SharedFlow相比 StateFlow运用仍是比较简单的,由于SharedFlow多了个缓冲区链表,而StateFlow就只要个value,StateFlow emit办法完成也是比较简单的。
接下来咱们来看下flow那些操作符的作用和运用。
Flow 操作符
combine
用于将两个流(Flow)合并为一个新的流,并根据供给的转换函数对它们进行组合处理。
看官方注释就很好理解了,combine 回来的是一个冷流,collect 时分会先从另外两个流拿到数据,再通过闭包里的函数处理再发送给下流。
flatMapLatest
先看官方注释
根据注释,咱们能够看到 flow 发射的数据会通过flatMapLatest 中的那条流,所以最终到下流的数据全取决于flatMapLatest中的那条流。
flatOn
定义上游在什么线程履行
举个栗子
下面来个实践比如来解说这些操作符
flow1 状态暖流 和 flow2 冷流并成一条新流combineFlow 冷流 ,然后combineFlow.flatMapLatest 插入了一条中流,中流之中又有中流。然后combineFlow .stateIn 转成暖流 hotFlow,stateIn中设置了协程域以及中止订阅上面这些流的超时时间和状态暖流的初始值。流都定义好了,就要运用了,下面是对这些流的运用
先给hotFlow注册下流搜集器搜集数据,然后 上游暖流flow1,冷流flow2 ,中游暖流 1,2,3 依次发送数据,猜会打印什么?直接宣布来吧
嘿嘿上游全军覆没,上游发的数据下流都收不到。看打印咱们发现只要第一次hotFlow.collect()的时分冷流flow2有作用 ,后续调用冷流flow2.collect触发emit 都没用,在主线程履行,下流中游都不会收到数据,为什么会这样,后边讲了冷流的原理咱们就知道了。
上游暖流flow1发送了数据,中游能收到,可是下流收不到,下流之所有收不到数据是由于暖流发送的 10 + 第一次冷流的数据7964 大于 10 了,再一次履行midFlow1.flatMapLatest 其实啥事没干,没触发midFlow3.emit 动作,只要触发midFlow3.emit 动作下流才会收到数据,也就解说了中游发送数据为什么下流都能收到。
冷流完成原理
最简单的flow运用比如如下
MainScope().launch() {
(1..3).asFlow().collect{}
}
在这个实例中,上游发送1,2,3 三个数据,下流会按次序收到1,2,3。asFlow办法会将IntRange转为Flow
持续跟进flow{ } 闭包,会调用到unsafeFlow(block)
@PublishedApi
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
return object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
collector.block()
}
}
}
Flow是一个接口,这儿构建了一个flow的匿名内部类并回来,内部类里完成了Flow的collect办法,参数 block: suspend FlowCollector.() -> Unit 能够理解为FlowCollector类的一个参数扩展办法,或许就理解为扩展办法也能够,但这个扩展办法只能在这办法才能里调用,作用域便是此办法。
调用collect要求传一个流搜集器FlowCollector ,FlowCollector 有个挂起办法 emit ,通过完成这个办法就能够搜集上游发送过来的数据了。
public fun interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
总结一下: 构建Flow会完成collect办法,collect办法里边调用了FlowCollector 的 block 扩展办法,FlowCollector的block 扩展办法中,又会调用FlowCollector的emit 办法,emit 办法发送数据给FlowCollector,至此数据就传输完了。
能够看到这种传输办法,控制权完全是在下流,只要下流调用了collect办法,Flow才会传输数据,这种办法又被称为冷流。
冷流完成原理也很简单,上游(发送端)完成了FlowCollector的扩展办法block , 并在Flow的collect办法中调用block办法,block办法内部调用了emit。下流(搜集端)完成了FlowCollector的emit办法承受数据。下流调用flow.collect 办法传入搜集器,并触发上游发送操作。
操作符flowOn切换线程源码剖析
flowOn用于指定上游在哪个线程中履行,其实这样说法其实也并不精确,当咱们指定flowOn(Dispatchers.IO) , 上游便是在IO线程中履行了,所以这种说法又是精确的。
flowOn 浅显一点说便是修改上游的协程上下文,而且回来一个Flow。
以上面比如和flowOn(Dispatchers.IO)为例
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
return when {
context == EmptyCoroutineContext -> this
this is FusibleFlow -> fuse(context = context)
else -> ChannelFlowOperatorImpl(this, context = context)
}
}
调用flowOn ,最终会回来ChannelFlowOperatorImpl ,看着有点像静态署理模式的感觉,传入原始的Flow并回来一个承继自FLow的ChannelFlowOperatorImpl。承继链如下
ChannelFlowOperatorImpl <- ChannelFlowOperator <- ChannelFlow <-FusibleFlow <- Flow。
能够看到为了支撑上游切换到IO线程,足足多了4层承继关系。
ChannelFlowOperator 的collect办法, 先判断传入的coroutineContext, 假如和当时的协程上下文相同则啥事不干直接调用本来的flow.collect,假如 newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor] ,新的和旧的上下文要调度的线程相同,即不切换线程,这种状况不需要运用channel,上游会新建并敞开一个协程并在新协程中调用flow.collect 。
假如newContext[ContinuationInterceptor] != collectContext[ContinuationInterceptor] 而且 newContext[ContinuationInterceptor] != collectContext[ContinuationInterceptor] ,这种状况下流会调用suspendCoroutineUninterceptedOrReturn 挂起当时协程,并发动新协程 去履行collector.emitAll(produceImpl(this)) , produceImpl 会回来ReceiveChannel目标实例,而且以新的协程上下文发动协程 , 两个协程通过ReceiveChannel连接,而且原FlowCollector会替换成SendingCollector
@InternalCoroutinesApi
public class SendingCollector<T>(
private val channel: SendChannel<T>
) : FlowCollector<T> {
override suspend fun emit(value: T): Unit = channel.send(value)
}
能够看到本来的FlowCollector 的emit办法被channel.sned取代了。这点就能够证明为什么下流emit函数履行的线程和上游为什么不是同一个了,由于在这儿被替换了 ,本来FlowCollector的emit函数不履行了。
那么本来FlowCollector的emit函数 在哪里履行?
本来Flow的 collect(collector) ,collector被替换成SendingCollector ,collect办法会被封装成一个block , 并在新协程中履行。
新Flow的collect(collector)中仍旧运用本来的collector ,而且在本来的协程作用域中发动协程,然后通过channel获取本来Flow emit过来的值 ,最终调用本来的collector 的emit办法把值发送给下流 。
总结 :
调用flowOn 会回来一个新的flow(内部持有本来的flow), 本来的flow 的 collector被替换成了SendingCollector ,而且本来flow的collect 会被封装成一个挂起block 函数,并用flowOn传入的上下文敞开子协程,浅显一点来说便是本来的 flow的collect 会在新敞开的子协程中履行。
回来的新的flow(也便是咱们下流运用的那个flow), collect办法中也会敞开一个协程,但这个协程上下文仍是运用本来的 ,新敞开的协程中会通过Channel获取从旧flow传过来的值,然后调用本来collector的emit 办法,把值转发给下流。
暖流SharedFlow 源码剖析
先看下根底运用
println(“collect”)那里爆黄了,编译器提示Unreachable code ,提示咱们println(“collect”) 永远都不可能调用,其实就算咱们加上,KT编译器最终也会帮咱们把这行代码删了。
collect 办法中有什么黑魔法,导致后边的代码调用不到了。collect办法如下,
override suspend fun collect(collector: FlowCollector<T>): Nothing
仅仅是后边加了个Nothing回来值,
Nothing has no instances. You can use Nothing to represent "a value that never exists": for example,
if a function has the return type of Nothing, it means that it never returns (always throws an exception).
关于Nothing 官方是这样注释的,Nothing没有实例, 可是Nothing是任何类的子类 ,当Nothing作为函数的回来值,这个函数始终会抛反常。
所以当咱们调用完collect之后会抛出反常,抛出反常办法会完毕履行导致后边的println(“collect”)代码履行不到了,关于Nothing先了解到这。
SharedFlow collect办法解析
解说collect 先来看个比如
这个比如会输出什么成果?会堵塞主线程吗?
答案是: 会每隔5s 打印ddd,并堵塞不会主线程。由于suspendCancellableCoroutine 办法会挂起当时协程,虽然在主线程履行 ,可是test办法被 return了,while true 跳出了循环。
跟进SharedFlow#collect办法
override suspend fun collect(collector: FlowCollector<T>): Nothing {
val slot = allocateSlot()
try {
if (collector is SubscribedFlowCollector) collector.onSubscription()
val collectorJob = currentCoroutineContext()[Job]
while (true) {
var newValue: Any?
while (true) {
newValue = tryTakeValue(slot)
if (newValue !== NO_VALUE) break
awaitValue(slot)
}
collectorJob?.ensureActive()
collector.emit(newValue as T)
}
} finally {
freeSlot(slot)
}
}
这个办法主要有三步:
- 给每个搜集者分配一个槽位,slots 的初始大小为2,超越阈值会翻倍扩容。SharedFlowSlot里边有个续体cont,Flow调用emit的时分会调用该续体的resume办法。
- 假如是订阅类型的搜集器,调用onSubscription
- 敞开while true 循环,假如上游没有发送数据则挂起,有数据则调用resume 恢复履行,接着调用emit将数据发送给下流
emit办法里边会先调用非挂起的办法tryEmit去发送数据,假如tryEmit发送失败才调用挂起办法 emitSuspend(value) ,tryEmit如下
override fun tryEmit(value: T): Boolean {
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
val emitted = synchronized(this) {
if (tryEmitLocked(value)) {
resumes = findSlotsToResumeLocked(resumes)
true
} else {
false
}
}
for (cont in resumes) cont?.resume(Unit)
return emitted
}
剩余的源码剖析之后有时间再写续章 。
最后,给自己打个广告!
求职求职求职!!!
个人简介:
两年半 + 两年半 经历老安卓 ,安卓原生开发 / NDK 开发/ SDK 开发/ 逆向 / flutter 都有涉猎。
可内推的大佬们费事直接在直接私聊我!
谢谢!!