“我报名参加金石方案1期挑战——瓜分10万奖池,这是我的第2篇文章,点击查看活动概况”

一、简介

了解过协程Flow的同学知道是典型的冷数据流,而SharedFlowStateFlow则是热数据流。

  • 冷流:只要当订阅者发起订阅时,事情的发送者才会开始发送事情。
  • 暖流:不论订阅者是否存在,只需发送了事情就会被消费,意思是不论承受方是否能够接纳到,在这一点上有点像我们AndroidLiveData

解说:LiveData新的订阅者不会接纳到之前发送的事情,只会收到之前发送的最后一条数据,这个特性和SharedFlow的参数replay设置为1类似

二、运用剖析

最好的剖析是从运用时下手冷流flow暖流SharedFlow和StateFlow暖流的具体的实现类分别是MutableSharedFlow和MutableStateFlow

用一个简略的例子来说明什么是冷流,什么是暖流。

  • 冷流flow:
private fun testFlow() {
    val flow = flow<Int> {
        (1..5).forEach {
            delay(1000)
            emit(it)
        }
    }
    mBind.btCollect.setOnClickListener {
        lifecycleScope.launch {
            flow.collect {
                Log.d(TAG, "testFlow 第一个搜集器: 我是冷流:$it")
            }
        }
        lifecycleScope.launch {
            delay(5000)
            flow.collect {
                Log.d(TAG, "testFlow:第二个搜集器 我是冷流:$it")
            }
        }
    }
}

我点击搜集按钮响应事情后,打印成果如下图:

Kotlin中 Flow、SharedFlow与StateFlow区别
这便是冷流,需要去触发搜集,才能接纳到成果。

从上图时间可知flow每次从头订阅搜集都会将所有事情从头发送一次

  • 暖流MutableSharedFlow和
private fun testSharedFlow() {
    val sharedFlow = MutableSharedFlow<Int>(
        replay = 0,//相当于粘性数据
        extraBufferCapacity = 0,//承受的慢时分,发送的入栈
        onBufferOverflow = BufferOverflow.SUSPEND
    )
    lifecycleScope.launch {
        launch {
            sharedFlow.collect {
                println("collect1 received ago shared flow $it")
            }
        }
        launch {
            (1..5).forEach {
                println("emit1  send ago  flow $it")
                sharedFlow.emit(it)
                println("emit1 send after flow $it")
            }
        }
        // wait a 100
        delay(100)
        launch {
            sharedFlow.collect {
                println("collect2 received shared flow $it")
            }
        }
    }
}

Kotlin中 Flow、SharedFlow与StateFlow区别

第二个流搜集被延迟,晚了100毫秒后就收不到了,想当于不论是否订阅,流都会发送,只管发,而collect1能够搜集到是由于他在发送之前进行了订阅搜集。

三、剖析MutableSharedFlow中参数的具体意义

以上面testSharedFlow()办法中目标为例,上面的装备便是,当时目标的默许装备 源码如下图:

Kotlin中 Flow、SharedFlow与StateFlow区别

val sharedFlow = MutableSharedFlow<Int>(
    replay = 0,//相当于粘性数据
    extraBufferCapacity = 0,//承受的慢时分,发送的入栈
    onBufferOverflow = BufferOverflow.SUSPEND //发生背压现象后的,履行战略
)

3.1、 reply:事情粘滞数

reply:事情粘滞数以testSharedFlow办法为例如果设置了数目的话,那么其他订阅者不论什么时分订阅都能够收到replay数目的最新的事情,reply=1的话有点类似Android中运用的livedata。

eg:和testSharedFlow办法差异在于 replay = 2

private fun testSharedFlowReplay() {
    val sharedFlow = MutableSharedFlow<Int>(
        replay = 2,//相当于粘性数据
        extraBufferCapacity = 0,//承受的慢时分,发送的入栈
        onBufferOverflow = BufferOverflow.SUSPEND
    )
    lifecycleScope.launch {
        launch {
            sharedFlow.collect {
                println("collect1 received ago shared flow $it")
            }
        }
        launch {
            (1..5).forEach {
                println("emit1  send ago  flow $it")
                sharedFlow.emit(it)
                println("emit1 send after flow $it")
            }
        }
        // wait a minute
        delay(100)
        launch {
            sharedFlow.collect {
                println("collect2 received shared flow $it")
            }
        }
    }
}

依照上面的解说collect2会搜集到最新的4,5两个事情如下图:

Kotlin中 Flow、SharedFlow与StateFlow区别

3.2 extraBufferCapacity:缓存容量

extraBufferCapacity:缓存容量,便是先发送几个事情,不论现已订阅的顾客是否接纳,这种只管发不论顾客消费才能的情况就会呈现背压,参数onBufferOverflow便是用于处理背压问题

eg:和testSharedFlow办法差异在于 extraBufferCapacity = 2

private fun testSharedFlowCapacity() {
    val sharedFlow = MutableSharedFlow<Int>(
        replay = 0,//相当于粘性数据
        extraBufferCapacity = 2,//承受的慢时分,发送的入栈
        onBufferOverflow = BufferOverflow.SUSPEND
    )
    lifecycleScope.launch {
        launch {
            sharedFlow.collect {
                println("collect1 received ago shared flow $it")
            }
        }
        launch {
            (1..5).forEach {
                println("emit1  send ago  flow $it")
                sharedFlow.emit(it)
                println("emit1 send after flow $it")
            }
        }
        // wait a minute
        delay(100)
        launch {
            sharedFlow.collect {
                println("collect2 received shared flow $it")
            }
        }
    }
}

成果如下图:

优先发送将其缓存起来,testSharedFlow测验中发送与接纳在没有干扰(延时之类的干扰)的情况下 是一条顺序链,而设置了extraBufferCapacity优先发送两条,不论消费情况,不设置的话(extraBufferCapacity = 0)这时如果在collect1里面设置延时delay(100),send会被堵塞(由于默许是 onBufferOverflow = BufferOverflow.SUSPEND的战略)

Kotlin中 Flow、SharedFlow与StateFlow区别

3.3、onBufferOverflow

onBufferOverflow:由背压就有处理战略,sharedflow默许为BufferOverflow.SUSPEND ,也便是如果当事情数量超过缓存,发送就会被挂起,上面提到了一句,DROP_OLDEST毁掉最旧的值,DROP_LATEST毁掉最新的值

三种参数意义

public enum class BufferOverflow {
    /**
     * 在缓冲区溢出时挂起。
     */
    SUSPEND,
    /**
     * 在缓冲区溢出时删除** *旧的**值,添加新的值到缓冲区,不挂起。
     */
    DROP_OLDEST,
    /**
     * 在缓冲区溢出时,删除当时添加到缓冲区的最新的**值\
*(使缓冲区内容保持不变),不要挂起。
     */
    DROP_LATEST
}

eg:和testSharedFlowCapacity办法差异在于 多了个delay(100)

  • SUSPEND形式
private fun testSharedFlow2() {
    val sharedFlow = MutableSharedFlow<Int>(
        replay = 0,//相当于粘性数据
        extraBufferCapacity = 2,//承受的慢时分,发送的入栈
        onBufferOverflow = BufferOverflow.SUSPEND
    )
    lifecycleScope.launch {
        launch {
            sharedFlow.collect {
                println("collect1 received ago shared flow $it")
                delay(100)
            }
        }
        launch {
            (1..5).forEach {
                println("emit1  send ago  flow $it")
                sharedFlow.emit(it)
                println("emit1 send after flow $it")
            }
        }
        // wait a minute
        delay(100)
        launch {
            sharedFlow.collect {
                println("collect2 received shared flow $it")
            }
        }
    }
}

Kotlin中 Flow、SharedFlow与StateFlow区别

Kotlin中 Flow、SharedFlow与StateFlow区别

SUSPEND情况下从第一张图知道collect1都搜集了,第二张图发现collect2也打印了两次,为什么只要两次呢?

由于 extraBufferCapacity = 2,等于2,错过了两次的事情发送的接纳,不信的话能够试一下extraBufferCapacity = 0,这时分肯定打印了4次,可能有人问为什么是4次呢,由于collect2的订阅者延时了100毫秒才开始订阅,

  • DROP_LATEST形式
private fun testSharedFlow2() {
    val sharedFlow = MutableSharedFlow<Int>(
        replay = 0,//相当于粘性数据
        extraBufferCapacity = 2,//承受的慢时分,发送的入栈
        onBufferOverflow = BufferOverflow.DROP_LATEST
    )
    lifecycleScope.launch {
        launch {
            sharedFlow.collect {
                println("collect1 received ago shared flow $it")
                delay(100)
            }
        }
        launch {
            (1..5).forEach {
                println("emit1  send ago  flow $it")
                sharedFlow.emit(it)
                println("emit1 send after flow $it")
            }
        }
        // wait a minute
        delay(100)
        launch {
            sharedFlow.collect {
                println("collect2 received shared flow $it")
            }
        }
    }
}

发送过快的话,毁掉最新的,只保存最老的两条事情,我们能够知道1,2,肯定保存其他丢掉

Kotlin中 Flow、SharedFlow与StateFlow区别

要想不丢是怎么办呢,很简略不要发生背压现象就行,在emit中延时delay(200),比搜集耗时长就行。

  • DROP_OLDEST形式 该形式同理DROP_LATEST形式,保存最新的extraBufferCapacity = 2(多少)的数据就行

四、StateFlow

初始化

val stateFlow = MutableStateFlow<Int>(value = -1)

Kotlin中 Flow、SharedFlow与StateFlow区别

Kotlin中 Flow、SharedFlow与StateFlow区别

由上图的承继联系可知stateFlow其实便是一种特别的SharedFlow,它多了个初始值value

Kotlin中 Flow、SharedFlow与StateFlow区别
由上图可知:每次更新数据都会和旧数据做一次比较,只要不一起分才会更新数值。

SharedFlow和StateFlow的侧重点

  • StateFlow便是一个replaySize=1的sharedFlow,一起它必须有一个初始值,此外,每次更新数据都会和旧数据做一次比较,只要不一起分才会更新数值。
  • StateFlow重点在状况,ui永远有状况,所以StateFlow必须有初始值,一起对ui而言,过期的状况毫无意义,所以stateFLow永远更新最新的数据(和liveData类似),所以必须有粘滞度=1的粘滞事情,让ui状况保持到最新。另外在一个时间内发送多个事情,不会管中心事情有没有消费完结都会履行最新的一条.(中心值会丢掉)
  • SharedFlow侧重在事情,当某个事情触发,发送到行列之中,依照挂起或者非挂起、缓存战略等将事情发送到承受方,在具体运用时,SharedFlow更适合告诉ui界面的一些事情,比如toast等,也适合作为viewModel和repository之间的桥梁用作数据的传输。

eg测验如下中心值丢掉:

    private fun testSharedFlow2() {
        val stateFlow = MutableStateFlow<Int>(value = -1)
        lifecycleScope.launch {
            launch {
                stateFlow.collect {
                    println("collect1 received ago shared flow $it")
                }
            }
            launch {
                (1..5).forEach {
                    println("emit1  send ago  flow $it")
                    stateFlow.emit(it)
                    println("emit1 send after flow $it")
                }
            }
            // wait a minute
            delay(100)
            launch {
                stateFlow.collect {
                    println("collect2 received shared flow $it")
                }
            }
        }
    }

由下图可知,中心值丢掉,collect2成果可知永远有状况

Kotlin中 Flow、SharedFlow与StateFlow区别
好了到这儿文章就完毕了,源码剖析后续再写。

参考:/post/705417…