一、Flow的创立办法
常见的创立Flow
的办法有flow{}
、flowof{}
、asFlow
等,下面是示例代码:
fun main() {
runBlocking {
//办法一
flow<Int> {
emit(0)
}.collect {
printMsg(it.toString())
}
//办法二
flowOf(1,2,3).collect{
printMsg(it.toString())
}
//办法三
listOf(4,5,6).asFlow().collect{
printMsg(it.toString())
}
}
}
//日志
main @coroutine#1 0
main @coroutine#1 1
main @coroutine#1 2
main @coroutine#1 3
main @coroutine#1 4
main @coroutine#1 5
main @coroutine#1 6
二、Flow的常见操作符
1、filter,map,take
比较常用也比较简单,代码后边的比方会表现,此处略。
2、onStart,onCompletion
onStart
和onCompletion
是协程创立和完结时的回调,与在事情处理的上下游方位无关
flowOf(1, 2, 3, 4)
.filter { it > 1 } //过滤
.onStart { printMsg("Flow onStart") }
.onCompletion { printMsg("Flow onCompletion $it") }
.map { it * 2 } //改换
.take(2) //截取
.collect {
printMsg("Flow result:$it")
}
//日志
main @coroutine#1 Flow onStart
main @coroutine#1 Flow result:4
main @coroutine#1 Flow result:6
main @coroutine#1 Flow onCompletion kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed
测验在半途抛出一个反常
flowOf(1, 2, 3, 4)
.filter { it > 1 } //过滤
.onStart { printMsg("Flow onStart") }
.onCompletion { printMsg("Flow onCompletion $it") }
.map { it * 2 } //改换
.take(2) //截取
.collect {
printMsg("Flow result:$it")
//收到第一个数据就抛出反常
throw IllegalStateException() <----------------------变化在这儿
}
//日志,程序报错
main @coroutine#1 Flow onStart
main @coroutine#1 Flow result:4
main @coroutine#1 Flow onCompletion java.lang.IllegalStateException <------打印反常信息
Exception in thread "main" java.lang.IllegalStateException
...略...
3、catch反常处理
假如flow
呈现反常应该怎么捕获反常? 看二段代码:
第一段
val flow = flow {
emit(1)
throw IllegalStateException() <-------------------发送数据抛反常
emit(2)
}
flow.catch { <----------------------这儿catch
printMsg("Flow catch $it")
}.onCompletion {
printMsg("Flow onCompletion $it")
}.collect{ <-------------完毕操作符
printMsg("Flow collect $it")
}
//日志
main @coroutine#1 Flow collect 1
main @coroutine#1 Flow catch java.lang.IllegalStateException
main @coroutine#1 Flow onCompletion null
第二段
flowOf(1, 2)
.catch { printMsg("Flow catch $it") } <------------------这儿catch
.onCompletion { printMsg("Flow onCompletion $it") }
.collect { <-------------完毕操作符
printMsg("Flow collect $it")
//收到第一个数据就抛出反常
throw IllegalStateException() <---------------处理数据抛反常
}
//日志,程序报错
main @coroutine#1 Flow collect 1
main @coroutine#1 Flow onCompletion java.lang.IllegalStateException
Exception in thread "main" java.lang.IllegalStateException
...略...
所以要留意发送时分的反常是能够捕获的,可是处理数据时的反常是没有办法捕获的。catch 的效果域,仅限于 catch 的上游。
4、flowOn、launchIn
flowOn
和launchIn
首要效果是切换线程。可是它们的运用有一些需求留意的当地。
(1)、flowOn
flow {
emit(1)
emit(2)
}.filter {
printMsg("filter $it")
it > 1
}.flowOn(Dispatchers.IO) <----------切换线程
.map {
printMsg("map $it")
it * 2
}.collect {
printMsg("collect $it")
}
//日志
DefaultDispatcher-worker-1 @coroutine#2 filter 1 <------filter在子线程
DefaultDispatcher-worker-1 @coroutine#2 filter 2 <------filter在子线程
main @coroutine#1 map 2 <------map在主线程
main @coroutine#1 collect 4 <------collect在主线程
能够看到flowOn
切换线程只在它的上游起效果。
(2)、launchIn
假如想让flowOn
上游的代码在子线程履行,而flowOn
下面的代码在主线程履行怎么办?下面是一个比方:
lifecycleScope.launch(Dispatchers.Main) { <------lifecycleScope上下文地点的线程是主线程main
flow { _______
log("flow emit 1") |
emit(1) |
log("flow emit 2") |
emit(2) IO线程履行
}.map { |
log("flow map $it") |
it * 2 |
}.flowOn(Dispatchers.IO) ___|____
.onEach { |
log("flow onEach $it") |
}.onCompletion { |
log("flow onCompletion $it") Main线程履行
}.catch { |
log("flow catch $it") |
}.launchIn(lifecycleScope) |
} ____|___
//日志
flow emit 1 线程:DefaultDispatcher-worker-1 @coroutine#4
flow map 1 线程:DefaultDispatcher-worker-1 @coroutine#4
flow emit 2 线程:DefaultDispatcher-worker-1 @coroutine#4
flow map 2 线程:DefaultDispatcher-worker-1 @coroutine#4
flow onEach 2 线程:main @coroutine#3
flow onEach 4 线程:main @coroutine#3
flow onCompletion null 线程:main @coroutine#3
launchIn
将它上游的代码切换到给定上下文lifecycleScope
的线程履行,这样就能够让上面的业务代码在IO
线程履行,而成果回调到UI
线程履行。这儿没有collect
作为完毕符,而是运用的onEach
,但追踪onEach
的源码,调用的还是collect
。
5、retry,takeWhile,buffer
retry
反常时重试次数,示例如下:
fun main() = runBlocking {
flow {
printMsg("emit 1")
emit(1)
printMsg("emit 2")
emit(2)
}.onEach {
printMsg("onEach $it")
if (it == 2) {
throw RuntimeException("Exception on $it") // 抛出反常
}
}.retry(1) // 重试2次
.catch { printMsg("catch $it") }
.collect { printMsg("collect $it") }
}
//日志
main @coroutine#1 emit 1
main @coroutine#1 onEach 1
main @coroutine#1 collect 1
main @coroutine#1 emit 2
main @coroutine#1 onEach 2
main @coroutine#1 emit 1
main @coroutine#1 onEach 1
main @coroutine#1 collect 1
main @coroutine#1 emit 2
main @coroutine#1 onEach 2
main @coroutine#1 catch java.lang.RuntimeException: Exception on 2
留意retry
重试的是整个代码块,代码中的1
其实是没有问题的也重试了。
takeWhile
– 依照给定的条件从流中获取元素,直到条件不成立
fun main() = runBlocking<Unit> {
// 输出小于5的数字
(1..10)
.asFlow()
.takeWhile { it < 5 }
.collect { println(it) } // 输出 1 2 3 4
}
buffer
-在中心处理和搜集操作之间缓冲流
.buffer() // 缓冲流
6、zip,combine
zip
和combine
组合不同flow
的元素
fun main() = runBlocking {
val nums = (1..4).asFlow()
val strs = flowOf("one", "two", "three")
nums.zip(strs) { a, b -> "$a -> $b" }
.collect { printMsg(it) }
}
//日志
main @coroutine#1 1 -> one
main @coroutine#1 2 -> two
main @coroutine#1 3 -> three
替换上面的zip
为combine
,输出日志为:
main @coroutine#1 1 -> one
main @coroutine#1 2 -> two
main @coroutine#1 3 -> three
main @coroutine#1 4 -> three <------差异首要表现在这儿
zip
和combine
操作符的差异在于它们组合Flow
的办法。zip
操作符会按次序一一对应地组合两个Flow
中的项,而combine
操作符则会将每个Flow
中的最新项作为参数应用于给定的lambda
函数。因而,zip
操作符只要当两个Flow
的项数相一起才会发生输出,而combine
操作符则能够组合项数不同的Flow
并能当即输出组合成果。
7、flatMapConcat,flatMapMerge
这二个操作符都归于展平流,怎么理解和差异呢? 看下二个操作符的代码
fun main() = runBlocking {
(1..2).asFlow()
.flatMapConcat {
//将元素转化成了一个新的flow
zipElement(it)
}.collect { printMsg(it) }
}
//将元素转化成了一个新的flow
fun zipElement(int: Int): Flow<String> =
flow {
emit("$int -> Hello")
delay(1000)
emit("$int -> Kotlin")
}
//日志
main @coroutine#1 1 -> Hello
main @coroutine#1 1 -> Kotlin
main @coroutine#1 2 -> Hello
main @coroutine#1 2 -> Kotlin
替换上面的flatMapConcat
为flatMapMerge
,输出日志为:
main @coroutine#1 1 -> Hello
main @coroutine#1 2 -> Hello
main @coroutine#1 1 -> Kotlin
main @coroutine#1 2 -> Kotlin
flatMapConcat
和flatMapMerge
操作符都是用于将一个Flow
的项转化为其他Flow
的操作符,但运用的组合策略不同。
flatMapConcat
操作符会次序地组合每个转化后的Flow
,即它会等候前一个转化后的Flow
完结后再开始下一个转化。因而,它会按次序发出一切转化后的Flow
的项。常用于串行的数据传递和转化,比方串行的网络恳求等。
flatMapMerge
操作符会并发地组合每个转化后的Flow
,即它会当即开始下一个转化而不等候前一个转化完结。因而,它会当即发出一切转化后的Flow
的项,并经过任意次序发射它们。
8、sample,debounce
sample
操作符会在必定的时刻距离内,发射最新的元素。例如,假如你设置了一个500ms的距离,那么该操作符会每隔500ms发射最近的元素。因而,sample
操作符一般用于操控发射元素的数量和频率。
另一方面,debounce
操作符需求等候在必定时刻内没有新的元素,才会将最终一个元素发射出去。例如,假如你设置了500ms的时刻距离,那么假如500ms内有新元素抵达,那么之前的元素都会被疏忽,只要最新的元素会发射出去。因而,debounce
操作符一般用于制止在时刻距离内接连发射相同的元素。
综上所述,sample
操作符会在必定的时刻距离内,发射最新的元素,而debounce
操作符则需求等候在必定时刻内没有新的元素,才会将最终一个元素发射出去。这两个操作符有着不同的用途,需求根据详细状况来挑选运用。
fun main() = runBlocking {
//sample
flowOf("A", "B", "C", "D", "E", "F")
.onEach { delay(100) }
.sample(250)
.collect { printMsg(it) }
// 运用 debounce 操作符来从一个 Flow 中去重接连发送的相同音讯
flowOf(1, 2, 2, 3, 3, 3, 4, 4, 4, 4)
.onEach { delay(100) }
.debounce(150) // 用于制止在 150ms 内发射相同值的操作符
.collect { printMsg("Distinct value: $it") }
}
fun printMsg(msg: String) {
println("${Thread.currentThread().name} $msg")
}
//日志
main @coroutine#1 B
main @coroutine#1 D
main @coroutine#1 Distinct value: 4
9、distinctUntilChanged
过滤重复的值
val flow = flowOf(1, 2, 2, 3, 3, 3, 4, 5, 5)
// 运用 distinctUntilChanged 过滤接连重复的元素
flow.distinctUntilChanged().collect {
printMsg("flow $it")
}
//日志
main @coroutine#1 flow 1
main @coroutine#1 flow 2
main @coroutine#1 flow 3
main @coroutine#1 flow 4
main @coroutine#1 flow 5
三、StateFlow
1、StateFlow,MutableStateFlow
MutableStateFlow
是 StateFlow
的子类,与其具有相同的行为和功用,一起还具有一些不同之处。与 StateFlow
只能在创立时设置其初始值不同,MutableStateFlow
能够在任何时分更改其值。这使得 MutableStateFlow
更适合于表明可变状况。所以咱们在运用时经常能够看到如下的代码:
class MyViewModel : ViewModel() {
//可变的设置为私有的不对外露出
private val _counter = MutableStateFlow(0)
//不可变的对外露出
val counter: StateFlow<Int> = _counter或_counter.asStateFlow()
fun incrementCounter() {
//修正值
_counter.value++
}
}
class MainActivity : AppCompatActivity() {
private val viewModel: MyViewModel by viewModels()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
viewModel.counter.collect { count ->
// Display the current count
}
button.setOnClickListener {
viewModel.incrementCounter()
}
}
}
2、简易案例
下面持续看一个查找防抖的雏形案例,简易代码如下:
class MainActivity : AppCompatActivity() {
private val etFlow = MutableStateFlow("et_Hint")
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
//打印协程称号
System.setProperty("kotlinx.coroutines.debug", "on")
val et = findViewById<AppCompatEditText>(R.id.et)
et.doAfterTextChanged {
//假如不为空或许null就发射流
if (!TextUtils.isEmpty(it)) {
etFlow.value = it.toString()
}
}
lifecycleScope.launch {
//在1秒采样周期内只发射最新的值
etFlow.sample(1000).collect {
//获取值去查找,这儿就打印日志
log(it)
}
}
}
/**
* 打印日志
*/
private fun log(msg: String) {
Log.d("LOG_PRINT", "内容:$msg 线程:${Thread.currentThread().name}")
}
}
//日志
内容:et_Hint 线程:main @coroutine#2
内容:123 线程:main @coroutine#2
能够看到创立MutableStateFlow
需求给默许参数,上面代码中是et_Hint
,而且这个默许值会首要发射出来,经过日志能够看到后边在EditText
中输入123
也打印出来了。
上面的代码其实有一个问题,当咱们先输入1
这个时分发射1
,然后咱们删除1
输入框内容为空不发射任何数据,当咱们再次输入1
,这次的1
是不会被发送的,这是因为给MutableStateFlow
赋值,假如二次的数据的哈希值一样,后边的数据不会被发送(能够看下源码),这明显不契合查找的需求,解决办法就是包一下并重写hashCode
办法回来随机数让它每次的值都不相同:
data class MutableStateFlowBean(var content:String) {
override fun equals(other: Any?): Boolean=false
override fun hashCode(): Int {
return Random.nextInt()
}
}
3、运用场景
(1)在ViewModel中运用StateFlow
这是最常用的办法,能够参阅上面三、1
中的一段代码
(2)怎么将StateFlow
的值安全露出给UI层,不受生命周期困扰?
能够经过asLiveData()
将StateFlow
转化为LiveData
。
先导包implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0"
看如下的代码,界说一个按钮,发送5个数据,经过StateFlow
的asLiveData()
和一般的LiveData
接纳:
class MainViewModel : ViewModel() {
//flow
private val mutableStateFlow = MutableStateFlow(0)
val flowLiveData = mutableStateFlow.asLiveData() <------这儿asLiveData()
//liveData
val liveData = MutableLiveData<Int>()
fun sendData(value: Int) {
//flow发送值
mutableStateFlow.value = value
//livedata发送值
liveData.value = value
}
}
class MainActivity : AppCompatActivity() {
private val mainViewModel: MainViewModel by viewModels()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
//打印协程称号
System.setProperty("kotlinx.coroutines.debug", "on")
//点击按钮发送5个数据
val bt = findViewById<AppCompatButton>(R.id.bt)
bt.setOnClickListener {
lifecycleScope.launch {
(1..5).forEach {
delay(1500)
log("sendData:$it")
mainViewModel.sendData(it)
}
}
}
//flow转的livedata
mainViewModel.flowLiveData.observe(this) {
log("flowLiveData:$it")
}
//livedata
mainViewModel.liveData.observe(this) {
log("liveData:$it")
}
}
}
半途息屏一段时刻再亮屏,日志输出如下:
内容:flowLiveData:0 线程:main @coroutine#1 <------asLiveData后flowLiveData收到了默许值0
内容:sendData:1 线程:main @coroutine#4 <------点击按钮开始发射数据
内容:flowLiveData:1 线程:main @coroutine#1
内容:liveData:1 线程:main @coroutine#4
内容:sendData:2 线程:main @coroutine#4
内容:flowLiveData:2 线程:main @coroutine#1
内容:liveData:2 线程:main @coroutine#4
内容:sendData:3 线程:main @coroutine#4 <------息屏,flowLiveData和liveData都没接纳数据
内容:sendData:4 线程:main @coroutine#4
内容:sendData:5 线程:main @coroutine#4
内容:flowLiveData:5 线程:main @coroutine#6 <------亮屏,flowLiveData和liveData都接纳到数据
内容:liveData:5 线程:main
除了先会收到默许值,其他的契合预期。
(3) 一处接纳多处订阅
比方经过MQTT接纳机器的多种状况,然后许多页面都需求订阅机器的实时状况,那么能够测验这样做。
界说一个单例的类,类里面有设置Flow值的办法,也有二个获取Flow值的办法,其中一个关怀页面的生命周期,另一个不关怀:
class SimpleFlowClient {
private val mutableStateFlow = MutableStateFlow("offline")
private val stateFlow: StateFlow<String> = mutableStateFlow
/**
* 单例模式
*/
companion object {
private var instance: SimpleFlowClient? = null
@Synchronized
fun getInstance(): SimpleFlowClient {
return instance ?: SimpleFlowClient().also { instance = it }
}
}
/**
* 设置Flow的值
*/
fun setFlowValue(value: String) {
mutableStateFlow.value = value
}
/**
* 获取Flow的值,不关怀页面的生命周期,
* 运用LifecycleCoroutineScope是页面封闭时一起封闭协程
* @param scope lifecycleScope
*/
fun getFlowValue(
scope: LifecycleCoroutineScope,
result: (value: String) -> Unit = { _ -> }
) {
scope.launch {
stateFlow.collect {
result.invoke(it)
}
}
}
/**
* 获取Flow的值,转化成LiveData,关怀页面的生命周期
* @param owner lifecycleScope
*/
fun getFlowValueAsLiveData(
owner: LifecycleOwner,
observer: (value: String) -> Unit = { _ -> }
) {
stateFlow.asLiveData().observe(owner) {
observer(it)
}
}
}
分别在MainActivity
和TwoActivity
中做测验:
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
//打印协程称号
System.setProperty("kotlinx.coroutines.debug", "on")
//点击按钮跳转
val bt = findViewById<AppCompatButton>(R.id.bt)
bt.setOnClickListener {
//跳转到TwoActivity
Intent(this, TwoActivity::class.java).also { startActivity(it) }
//2秒后修正了StateFlow的值
Handler(Looper.myLooper()!!).postDelayed({
SimpleFlowClient.getInstance().setFlowValue("online")
}, 2000)
}
//盯梢Flow的数据
SimpleFlowClient.getInstance().getFlowValue(lifecycleScope) {
log("MainActivity:flow:$it")
}
//盯梢Flow的数据
SimpleFlowClient.getInstance().getFlowValueAsLiveData(this) {
log("MainActivity:asLiveData:$it")
}
}
}
class TwoActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_two)
//盯梢Flow的数据
SimpleFlowClient.getInstance().getFlowValue(lifecycleScope) {
log("TwoActivity:flow:$it")
}
//盯梢Flow的数据
SimpleFlowClient.getInstance().getFlowValueAsLiveData(this) {
log("TwoActivity:asLiveData:$it")
}
}
}
日志如下:
------------------------进入MainActivity收到的值----------------------
内容:MainActivity:flow:offline 线程:main @coroutine#2
内容:MainActivity:asLiveData:offline 线程:main @coroutine#3
------------------------点击按钮进入TwoActivity收到的值------------------------
内容:TwoActivity:flow:offline 线程:main @coroutine#5
内容:TwoActivity:asLiveData:offline 线程:main @coroutine#6
------------------------在TwoActivity页面2秒后收到的值-------------------
内容:MainActivity:flow:online 线程:main @coroutine#2
内容:TwoActivity:flow:online 线程:main @coroutine#5
内容:TwoActivity:asLiveData:online 线程:main @coroutine#6
------------------------回来MainActivity页面后收到的值-------------------
内容:MainActivity:asLiveData:online 线程:main @coroutine#8
全体契合预期。
4、需求留意的问题
(1) StateFlow是SharedFlow高度封装的
跟下StateFlow
的源码,能够看到有如下的代码:
/**
* [StateFlow] that represents the number of subscriptions.
* ------看这儿↓-------
* It is exposed as a regular [StateFlow] in our public API, but it is implemented as [SharedFlow] undercover to
* avoid conflations of consecutive updates because the subscription count is very sensitive to it.
*
* The importance of non-conflating can be demonstrated with the following example:
* ```
* val shared = flowOf(239).stateIn(this, SharingStarted.Lazily, 42) // stateIn for the sake of the initial value
* println(shared.first())
* yield()
* println(shared.first())
* ```
* If the flow is shared within the same dispatcher (e.g. Main) or with a slow/throttled one,
* the `SharingStarted.Lazily` will never be able to start the source: `first` sees the initial value and immediately
* unsubscribes, leaving the asynchronous `SharingStarted` with conflated zero.
*
* To avoid that (especially in a more complex scenarios), we do not conflate subscription updates.
*/
private class SubscriptionCountStateFlow(initialValue: Int) : StateFlow<Int>,
SharedFlowImpl<Int>(1, Int.MAX_VALUE, BufferOverflow.DROP_OLDEST) <------留意这儿
{
init { tryEmit(initialValue) }
override val value: Int
get() = synchronized(this) { lastReplayedLocked }
fun increment(delta: Int) = synchronized(this) {
tryEmit(lastReplayedLocked + delta)
}
}
public enum class BufferOverflow {
/**
* Suspend on buffer overflow.
*/
SUSPEND,
/**
* Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
*/
DROP_OLDEST, ------看这儿↑-------
/**
* Drop **the latest** value that is being added to the buffer right now on buffer overflow
* (so that buffer contents stay the same), do not suspend.
*/
DROP_LATEST
}
需求留意一些小细节:
① StateFlow是根据SharedFlow完成的,它是高度封装的SharedFlow。
② StateFlow的缓存容量为Int.MAX_VALUE
③ StateFlow的缓存策略为丢掉最旧的值且不挂起
(2) StateFlow中怎么界说什么是新的值?
根据以上的源码,看下面的代码会输出什么?
fun main() {
runBlocking {
val mutableStateFlow = MutableStateFlow(0)
//提早订阅的协程
launch {
mutableStateFlow.collect {
printMsg("collect before $it")
}
}
//修正StateFlow的值,分别修正为0-100
launch {
for (i in 0..100) {
mutableStateFlow.value = i
printMsg(" send $i")
}
}
//稍后订阅的协程
launch {
mutableStateFlow.collect {
printMsg("collect after $it")
}
}
}
}
//日志
main @coroutine#2 collect before 0
main @coroutine#3 send 0
main @coroutine#3 send 1
...省掉send 2-98的输出...
main @coroutine#3 send 99
main @coroutine#3 send 100
main @coroutine#4 collect after 100
main @coroutine#2 collect before 100
能够看到提早订阅的协程的只收到了默许值0
和最终的100
,而稍后订阅的协程只收到了最终的100
,中心的值都没有收到,这是为什么?
经过源码打断点,和官方的注释能够看到
只呈现了0
和100
二个值,其他的值都没有呈现在断点中……官方注释(上图红框内)的解说是:
Here the coroutine could have waited for a while to be dispatched, so we use the most recent state here to ensure the best possible conflation of stale values
这儿协程可能会等候一段时刻才能被调度,所以咱们在这儿运用最近的状况来保证陈旧值的最佳可能合并
深层次的原因不得而知。假如改形成发送一个数据就挂起一次会怎么样呢? 把上面的代码做一个小小的修正:
//修正StateFlow的值,分别修正为0-100
launch {
for (i in 0..100) {
mutableStateFlow.value = i
printMsg(" send $i")
delay(1) <----------------改动在这儿:加了一个挂起函数
}
}
成果发现一切的中心值都收到了。根据现象能够得出结论:协程挂起或许履行完才会被判定有新的更新,就会更新StateFlow
的值。
假如运用StateFlow
发送了多个值,且中心的每个值都需求处理,那就需求留意上面的问题。假如多个值建议换成SharedFlow
。
(3) 为什么引荐直接给value赋值,而不是经过emit和tryEmit发射新的值?
首要,需求清晰的是,StateFlow
的状况值是不可变的。也就是说,一旦状况值被赋值,就不能再次修正。因而,假如咱们运用emit
或tryEmit
办法来发射新的状况值,就需求创立一个新的目标来代表新的状况值。这样做会导致内存分配和废物收回的开支,然后影响程序的功能。
比较之下,直接给value
特点赋值能够防止这种开支。因为value
特点是一个可变的变量,咱们能够直接修正它的值,而不需求创立新的目标。这样做不仅能够进步程序的功能,还能够防止因为频频创立目标而导致的内存泄漏等问题。
别的,需求留意的是,emit
和tryEmit
办法是异步的,它们会将新的状况值放入一个队列中,等候下一次事情循环时再进行处理。而直接给value
特点赋值是同步的,它会当即更新状况值,并通知一切的观察者。因而,在某些状况下,直接给value
特点赋值可能愈加方便和牢靠。
综上所述,虽然StateFlow
提供了多种办法来更新状况值,可是引荐运用直接给value
特点赋值的办法。这样做能够防止内存分配和废物收回的开支,进步程序的功能,一起也愈加方便和牢靠。
(4) distinctUntilChanged无效
关于一般的flow
来说distinctUntilChanged
是收效的(见上面二、9、distinctUntilChanged操作符
),可是关于StateFlow
来说因为自身就会比照前后的值,所以不建议在StateFlow
中运用这个操作符。
(5) 假如不想处理默许值能够怎么办
提早订阅(会收到默许值)的前提下,运用drop
操作符
val mutableStateFlow = MutableStateFlow(0)
//提早订阅
launch {
mutableStateFlow.drop(1).collect {
printMsg("collect $it")
}
}
(6) emit和tryEmit的差异?
emit 函数是一个挂起函数,它能够在流中发射一个元素。它将暂停当时协程的履行,直到一切已订阅该流的搜集器都成功接纳到这个元素,然后该协程才会持续履行。假如在发射元素时发生反常,则该反常会向上抛出,而且该流的一切搜集器都将失败。
tryEmit 函数也是用于发射元素的函数,但与 emit 不同,它不是一个挂起函数。相反,它会当即回来一个布尔值,指示元素是否已成功发射。假如已成功发射元素,则 tryEmit 函数回来 true;假如已有搜集器撤销了订阅,或许流现已被中止,则 tryEmit 函数回来 false。
因而,emit 和 tryEmit 的首要差异在于,emit 函数是一个挂起函数,有必要等候一切订阅者接纳到元素后才会回来,而 tryEmit 函数是一个非挂起函数,会当即回来一个布尔值来指示元素是否已成功发射。在运用时,需求根据详细的状况来挑选运用哪个函数。
四、SharedFlow
1、SharedFlow,MutableSharedFlow
看一下MutableSharedFlow
办法的源码
@Suppress("FunctionName", "UNCHECKED_CAST")
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
...略...
}
MutableSharedFlow
的结构函数包含三个参数,分别是replay
、extraBufferCapacity
和onBufferOverflow
。
(1) replay
参数表明是否需求缓存最新的元素值,并在新的观察者加入时自动发送。假如该参数为0,则不会缓存最新的元素值;假如该参数为正整数n,则会缓存最新的n个元素值,并在新的观察者加入时自动发送这些元素值。该参数的默许值为0。
(2)extraBufferCapacity
参数表明缓冲区的额定容量。假如该参数为0,则缓冲区的容量与replay
参数指定的容量相同;假如该参数为正整数n,则缓冲区的容量为replay
+n。该参数的默许值为0。
(3)onBufferOverflow
参数表明当缓冲区溢出时的处理办法。假如缓冲区已满,而又有新的元素要发射时,就会发生缓冲区溢出。该参数能够是以下三种值之一:
-
BufferOverflow.SUSPEND
:挂起当时协程,直到缓冲区有空间能够存储新的元素。 -
BufferOverflow.DROP_OLDEST
:丢掉最旧的元素,以腾出空间存储新的元素。 -
BufferOverflow.DROP_LATEST
:丢掉最新的元素,不将其存储到缓冲区中。
假如不指定该参数,则默许为BufferOverflow.SUSPEND
。
2、shareIn
ShareIn
办法的源码为:
public fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T> {
...略...
}
三个参数的意义分别为:
(1) 参数一:CoroutineScope
指定了同享流的效果域,即同享流将在哪个协程范围内履行。这个参数的值应该是一个CoroutineScope类型的目标。
在Kotlin协程中,CoroutineScope代表了一个协程的上下文和生命周期,它能够用来发动协程,或许用来撤销一个协程以及其子协程。
在运用shareIn创立同享流时,你能够传递一个协程效果域,表明同享流将在该效果域内履行。这样,当协程效果域被撤销时,同享流也将被撤销,然后防止资源泄漏。
(2) 参数二:SharingStarted
SharingStarted 具有以下几个选项:
-
WhileSubscribed:默许选项,表明当至少有一个订阅者时才发动同享暖流。也就是说,在第一个订阅者订阅同享暖流之前,同享暖流是不会发动的,而在最终一个订阅者撤销订阅后,同享暖流会自动中止。
传入时刻(毫秒值)的意义是什么?
public fun WhileSubscribed( stopTimeoutMillis: Long = 0, //毫秒值 replayExpirationMillis: Long = Long.MAX_VALUE ): SharingStarted = StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)
同享流在没有订阅者时最长的存活时刻,假如在此期间没有订阅者,同享流将自动封闭。
-
Eagerly:表明在调用 shareIn 时当即发动同享暖流,无论是否有订阅者。这意味着即使没有订阅者,同享暖流也会一直运转,直到手动撤销。
-
Lazily:表明在第一个订阅者订阅同享暖流时才发动同享暖流,而在最终一个订阅者撤销订阅后,同享暖流会自动中止。与 WhileSubscribed 比较,Lazily 选项会在第一个订阅者订阅之前等候一段时刻,因而能够防止在没有订阅者的状况下糟蹋资源。
-
Started:表明在调用 shareIn 时当即发动同享暖流,而且不会自动中止,直到手动撤销。
(3) 参数三:replay
假如有新的订阅者订阅,回来缓存数据的最新数据的个数。
(4) shareIn的优势在哪里?
-
节约内存和核算资源:当多个观察者订阅同一数据流时,运用
shareIn
能够防止为每个观察者创立新的数据流。相反,一切观察者都能够同享同一数据流。这能够大大削减内存和核算资源的运用,特别是在订阅者数量很大的状况下。 -
进步功能和呼应性:当多个观察者同享同一数据流时,数据只需求被核算和分发一次,而不需求为每个观察者核算和分发一次。这能够削减核算时刻和延迟,并进步应用程序的呼应性。
-
简化代码:运用
shareIn
能够使代码更简单和更易于维护。它能够防止重复代码,使数据同享和订阅愈加直观和易于理解。
总之,shareIn
函数能够进步应用程序的功能和呼应性,削减内存和核算资源的运用,并简化代码。这使得它在 Android 开发中非常有用,尤其是在需求同享数据的状况下。
(5) shareIn运用场景
操作同一份数据源,需求运用数据时将冷流转化为暖流
//同一份数据源
val flow = flow {
for (i in 1..3) {
delay(1000)
emit(i)
printMsg("emit $i")
}
}
//Flow是冷流不会自动发射数据,转化成暖流才会自动发射数据。这样在需求的时分再转化,能够节约资源。
val shareInFlow =
flow.shareIn(lifecycleScope, SharingStarted.WhileSubscribed(5000), 1)
// 在协程中搜集数据。Activity封闭,lifecycleScope效果域的协程中止,暖流也会中止
lifecycleScope.launchWhenCreated {
shareInFlow.collect {
printMsg("collect before $it")
}
}
lifecycleScope.launchWhenCreated {
shareInFlow.collect {
printMsg("collect after $it")
}
}
//日志
内容:collect before 1 线程:main @coroutine#6
内容:collect after 1 线程:main @coroutine#7
内容:emit 1 线程:main @coroutine#9
内容:collect before 2 线程:main @coroutine#6
内容:collect after 2 线程:main @coroutine#7
内容:emit 2 线程:main @coroutine#9
内容:collect before 3 线程:main @coroutine#6
内容:collect after 3 线程:main @coroutine#7
内容:emit 3 线程:main @coroutine#9
更有用一点的场景: 比方做蓝牙开发,许多时分,咱们会将蓝牙设备给App的数据处理后直接发射出去,不管页面需不需求,就一直发,这会糟蹋必定的资源,咱们能够把蓝牙的数据包装成冷流,需求的时分转化为暖流,下面是一个粗陋的比方:
class MainActivity : AppCompatActivity() {
private var mBleData = ""
private var flow: Flow<String>? = null
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
//打印协程称号
System.setProperty("kotlinx.coroutines.debug", "on")
//发射蓝牙的数据
emitBleData()
//需求运用数据时将冷流转化成暖流
val shareInFlow =
flow!!.shareIn(lifecycleScope, SharingStarted.WhileSubscribed(5000), 1)
//假如不想收到的数据重复,能够经过stateIn转化成StateFlow
// flow!!.stateIn(lifecycleScope, SharingStarted.WhileSubscribed(5000), 1)
// 需求数据时订阅
lifecycleScope.launchWhenCreated {
shareInFlow.collect {
printMsg("collect: $it")
}
}
}
private fun emitBleData() {
flow = flow {
while (true) {
//假设蓝牙数据3秒发射一次,这儿也延迟3秒
delay(3000)
//收到的数据往往不在这儿,可是咱们能够把它界说为成员变量,然后把新的成员变量发射出来
mBleData = getBleData()
emit(mBleData)
printMsg("emit: $mBleData $flow")
}
}
}
//模仿蓝牙数据: 随机发生数据
private fun getBleData(): String = "ble data ${Random.nextInt(100)}"
}
3、运用场景
-
从一个单一数据源同享数据
-
假如将参数界说为
BufferOverflow.DROP_OLDEST
(丢掉旧数据),MutableSharedFlow
能够作为一个固定长度的缓冲区运用。 -
SharedFlow在同享的ViewModel中同享数据。(这儿涉及一个知识点怎么在项目中设计一个大局可用的ViewModel目标,这样都能访问同一份数据源?)
4、SharedFlow中的数据是怎么移动的?
一般状况下,当调用emit办法发射数据时,假如缓存数组的buffered values未达到最大容量,则发射的数据将保存到缓存中,并当即回来emit办法。假如缓存数组的buffered values已达到最大容量,则调用emit办法的协程会被当即挂起,而且它的续体和数据会被封装成一个Emitter类型的目标,保存到缓存数组的queued emitters中。
当buffered values中方位为0的数据被一切的订阅者都处理后,buffered values会前移动一位。这时,queued emitters中方位为7的Emitter类型的目标就会被“拆箱”,将其中保存的数据寄存到方位7,一起恢复其中保存的emit办法地点续体的履行。之后,方位7将作为buffered values的一部分。
参阅了以下内容:
为什么说Flow是冷的?
Kotlin协程:MutableSharedFlow的完成原理