“我报名参加金石方案1期挑战——瓜分10万奖池,这是我的第2篇文章,点击查看活动概况”
一、简介
了解过协程Flow的同学知道是典型的冷数据流,而SharedFlow
与StateFlow
则是热数据流。
- 冷流:只要当订阅者发起订阅时,事情的发送者才会开始发送事情。
- 暖流:不论订阅者是否存在,只需发送了事情就会被消费,意思是不论承受方是否能够接纳到,在这一点上有点像我们Android的
LiveData
。
解说:LiveData新的订阅者不会接纳到之前发送的事情,只会收到之前发送的最后一条数据,
这个特性和SharedFlow的参数replay设置为1类似
二、运用剖析
最好的剖析是从运用时下手冷流flow
,暖流SharedFlow和StateFlow
暖流的具体的实现类分别是MutableSharedFlow和MutableStateFlow
用一个简略的例子来说明什么是冷流,什么是暖流。
冷流flow:
private fun testFlow() {
val flow = flow<Int> {
(1..5).forEach {
delay(1000)
emit(it)
}
}
mBind.btCollect.setOnClickListener {
lifecycleScope.launch {
flow.collect {
Log.d(TAG, "testFlow 第一个搜集器: 我是冷流:$it")
}
}
lifecycleScope.launch {
delay(5000)
flow.collect {
Log.d(TAG, "testFlow:第二个搜集器 我是冷流:$it")
}
}
}
}
我点击搜集按钮响应事情后,打印成果如下图:
这便是冷流
,需要去触发搜集,才能接纳到成果。
从上图时间可知flow每次从头订阅搜集都会将所有事情从头发送一次
暖流MutableSharedFlow和
private fun testSharedFlow() {
val sharedFlow = MutableSharedFlow<Int>(
replay = 0,//相当于粘性数据
extraBufferCapacity = 0,//承受的慢时分,发送的入栈
onBufferOverflow = BufferOverflow.SUSPEND
)
lifecycleScope.launch {
launch {
sharedFlow.collect {
println("collect1 received ago shared flow $it")
}
}
launch {
(1..5).forEach {
println("emit1 send ago flow $it")
sharedFlow.emit(it)
println("emit1 send after flow $it")
}
}
// wait a 100
delay(100)
launch {
sharedFlow.collect {
println("collect2 received shared flow $it")
}
}
}
}
第二个流搜集被延迟,晚了100毫秒后就收不到了,想当于不论是否订阅,流都会发送,只管发,而collect1能够搜集到是由于他在发送之前进行了订阅搜集。
三、剖析MutableSharedFlow中参数的具体意义
以上面testSharedFlow()办法中目标为例,上面的装备便是,当时目标的默许装备 源码如下图:
val sharedFlow = MutableSharedFlow<Int>(
replay = 0,//相当于粘性数据
extraBufferCapacity = 0,//承受的慢时分,发送的入栈
onBufferOverflow = BufferOverflow.SUSPEND //发生背压现象后的,履行战略
)
3.1、 reply:事情粘滞数
reply:事情粘滞数以testSharedFlow
办法为例如果设置了数目的话,那么其他订阅者不论什么时分订阅都能够收到replay数目的最新的事情,reply=1的话
有点类似Android中运用的livedata。
eg:和testSharedFlow
办法差异在于 replay = 2
private fun testSharedFlowReplay() {
val sharedFlow = MutableSharedFlow<Int>(
replay = 2,//相当于粘性数据
extraBufferCapacity = 0,//承受的慢时分,发送的入栈
onBufferOverflow = BufferOverflow.SUSPEND
)
lifecycleScope.launch {
launch {
sharedFlow.collect {
println("collect1 received ago shared flow $it")
}
}
launch {
(1..5).forEach {
println("emit1 send ago flow $it")
sharedFlow.emit(it)
println("emit1 send after flow $it")
}
}
// wait a minute
delay(100)
launch {
sharedFlow.collect {
println("collect2 received shared flow $it")
}
}
}
}
依照上面的解说collect2会搜集到最新的4,5两个事情如下图:
3.2 extraBufferCapacity:缓存容量
extraBufferCapacity
:缓存容量,便是先发送几个事情,不论现已订阅的顾客是否接纳,这种只管发不论顾客消费才能的情况就会呈现背压,参数onBufferOverflow
便是用于处理背压问题
eg:和testSharedFlow
办法差异在于 extraBufferCapacity = 2
private fun testSharedFlowCapacity() {
val sharedFlow = MutableSharedFlow<Int>(
replay = 0,//相当于粘性数据
extraBufferCapacity = 2,//承受的慢时分,发送的入栈
onBufferOverflow = BufferOverflow.SUSPEND
)
lifecycleScope.launch {
launch {
sharedFlow.collect {
println("collect1 received ago shared flow $it")
}
}
launch {
(1..5).forEach {
println("emit1 send ago flow $it")
sharedFlow.emit(it)
println("emit1 send after flow $it")
}
}
// wait a minute
delay(100)
launch {
sharedFlow.collect {
println("collect2 received shared flow $it")
}
}
}
}
成果如下图:
优先发送将其缓存起来,testSharedFlow
测验中发送与接纳在没有干扰(延时之类的干扰)的情况下 是一条顺序链,而设置了extraBufferCapacity
优先发送两条,不论消费情况,不设置的话(extraBufferCapacity = 0)
这时如果在collect1
里面设置延时delay(100)
,send会被堵塞(由于默许是 onBufferOverflow = BufferOverflow.SUSPEND的战略)
3.3、onBufferOverflow
onBufferOverflow:由背压就有处理战略,sharedflow默许为BufferOverflow.SUSPEND ,也便是如果当事情数量超过缓存,发送就会被挂起,上面提到了一句,DROP_OLDEST毁掉最旧的值,DROP_LATEST毁掉最新的值
三种参数意义
public enum class BufferOverflow {
/**
* 在缓冲区溢出时挂起。
*/
SUSPEND,
/**
* 在缓冲区溢出时删除** *旧的**值,添加新的值到缓冲区,不挂起。
*/
DROP_OLDEST,
/**
* 在缓冲区溢出时,删除当时添加到缓冲区的最新的**值\
*(使缓冲区内容保持不变),不要挂起。
*/
DROP_LATEST
}
eg:和testSharedFlowCapacity
办法差异在于 多了个delay(100)
- SUSPEND形式
private fun testSharedFlow2() {
val sharedFlow = MutableSharedFlow<Int>(
replay = 0,//相当于粘性数据
extraBufferCapacity = 2,//承受的慢时分,发送的入栈
onBufferOverflow = BufferOverflow.SUSPEND
)
lifecycleScope.launch {
launch {
sharedFlow.collect {
println("collect1 received ago shared flow $it")
delay(100)
}
}
launch {
(1..5).forEach {
println("emit1 send ago flow $it")
sharedFlow.emit(it)
println("emit1 send after flow $it")
}
}
// wait a minute
delay(100)
launch {
sharedFlow.collect {
println("collect2 received shared flow $it")
}
}
}
}
SUSPEND
情况下从第一张图知道collect1
都搜集了,第二张图发现collect2
也打印了两次
,为什么只要两次呢?
由于 extraBufferCapacity = 2,
等于2,错过了两次的事情发送的接纳,不信的话能够试一下extraBufferCapacity = 0
,这时分肯定打印了4次
,可能有人问为什么是4次
呢,由于collect2
的订阅者延时了100毫秒
才开始订阅,
- DROP_LATEST形式
private fun testSharedFlow2() {
val sharedFlow = MutableSharedFlow<Int>(
replay = 0,//相当于粘性数据
extraBufferCapacity = 2,//承受的慢时分,发送的入栈
onBufferOverflow = BufferOverflow.DROP_LATEST
)
lifecycleScope.launch {
launch {
sharedFlow.collect {
println("collect1 received ago shared flow $it")
delay(100)
}
}
launch {
(1..5).forEach {
println("emit1 send ago flow $it")
sharedFlow.emit(it)
println("emit1 send after flow $it")
}
}
// wait a minute
delay(100)
launch {
sharedFlow.collect {
println("collect2 received shared flow $it")
}
}
}
}
发送过快的话,毁掉最新的,只保存最老的两条事情,我们能够知道1,2,肯定保存其他丢掉
要想不丢
是怎么办呢,很简略不要发生背压现象
就行,在emit中延时delay(200)
,比搜集耗时长就行。
-
DROP_OLDEST形式
该形式同理DROP_LATEST形式,
保存最新的extraBufferCapacity = 2(多少)的数据就行
。
四、StateFlow
val stateFlow = MutableStateFlow<Int>(value = -1)
由上图的承继联系可知stateFlow其实便是一种特别的SharedFlow
,它多了个初始值value
由上图可知:每次更新数据都会和旧数据做一次比较,只要不一起分才会更新数值。
SharedFlow和StateFlow的侧重点
- StateFlow便是一个replaySize=1的sharedFlow,一起它必须有一个初始值,此外,每次更新数据都会和旧数据做一次比较,只要不一起分才会更新数值。
- StateFlow重点在状况,ui永远有状况,所以StateFlow必须有初始值,一起对ui而言,过期的状况毫无意义,所以stateFLow永远更新最新的数据(和liveData类似),所以必须有粘滞度=1的粘滞事情,让ui状况保持到最新。
另外在一个时间内发送多个事情,不会管中心事情有没有消费完结都会履行最新的一条.(中心值会丢掉)
- SharedFlow侧重在事情,当某个事情触发,发送到行列之中,依照挂起或者非挂起、缓存战略等将事情发送到承受方,在具体运用时,SharedFlow更适合告诉ui界面的一些事情,比如toast等,也适合作为viewModel和repository之间的桥梁用作数据的传输。
eg测验如下中心值丢掉:
private fun testSharedFlow2() {
val stateFlow = MutableStateFlow<Int>(value = -1)
lifecycleScope.launch {
launch {
stateFlow.collect {
println("collect1 received ago shared flow $it")
}
}
launch {
(1..5).forEach {
println("emit1 send ago flow $it")
stateFlow.emit(it)
println("emit1 send after flow $it")
}
}
// wait a minute
delay(100)
launch {
stateFlow.collect {
println("collect2 received shared flow $it")
}
}
}
}
由下图可知,中心值丢掉,collect2成果
可知永远有状况
好了到这儿文章就完毕了,源码剖析后续再写。
参考:/post/705417…