一、Flow的创立办法

常见的创立Flow的办法有flow{}flowof{}asFlow等,下面是示例代码:

fun main() {
    runBlocking {
        //办法一
        flow<Int> {
            emit(0)
        }.collect {
            printMsg(it.toString())
        }
        //办法二
        flowOf(1,2,3).collect{
            printMsg(it.toString())
        }
        //办法三
        listOf(4,5,6).asFlow().collect{
            printMsg(it.toString())
        }
    }
}
//日志
main @coroutine#1 0
main @coroutine#1 1
main @coroutine#1 2
main @coroutine#1 3
main @coroutine#1 4
main @coroutine#1 5
main @coroutine#1 6

二、Flow的常见操作符

1、filter,map,take

比较常用也比较简单,代码后边的比方会表现,此处略。

2、onStart,onCompletion

onStartonCompletion是协程创立和完结时的回调,与在事情处理的上下游方位无关

flowOf(1, 2, 3, 4)
    .filter { it > 1 }   //过滤
    .onStart { printMsg("Flow onStart") }
    .onCompletion { printMsg("Flow onCompletion $it") }
    .map { it * 2 }    //改换
    .take(2)   //截取
    .collect {
        printMsg("Flow result:$it")
    }
//日志
main @coroutine#1 Flow onStart
main @coroutine#1 Flow result:4
main @coroutine#1 Flow result:6
main @coroutine#1 Flow onCompletion kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed

测验在半途抛出一个反常

flowOf(1, 2, 3, 4)
    .filter { it > 1 }   //过滤
    .onStart { printMsg("Flow onStart") }
    .onCompletion { printMsg("Flow onCompletion $it") }
    .map { it * 2 }    //改换
    .take(2)   //截取
    .collect {
        printMsg("Flow result:$it")
        //收到第一个数据就抛出反常
        throw IllegalStateException()        <----------------------变化在这儿
    }
//日志,程序报错
main @coroutine#1 Flow onStart
main @coroutine#1 Flow result:4
main @coroutine#1 Flow onCompletion java.lang.IllegalStateException      <------打印反常信息
Exception in thread "main" java.lang.IllegalStateException
...略...

3、catch反常处理

假如flow呈现反常应该怎么捕获反常? 看二段代码:

第一段

val flow = flow {
    emit(1)
    throw IllegalStateException()    <-------------------发送数据抛反常
    emit(2)
}
flow.catch {      <----------------------这儿catch
    printMsg("Flow catch $it")
}.onCompletion {
    printMsg("Flow onCompletion $it")
}.collect{        <-------------完毕操作符
    printMsg("Flow collect $it")
}
//日志
main @coroutine#1 Flow collect 1
main @coroutine#1 Flow catch java.lang.IllegalStateException
main @coroutine#1 Flow onCompletion null

第二段

flowOf(1, 2)
    .catch { printMsg("Flow catch $it") }        <------------------这儿catch
    .onCompletion { printMsg("Flow onCompletion $it") }
    .collect {                    <-------------完毕操作符
        printMsg("Flow collect $it")
        //收到第一个数据就抛出反常
        throw IllegalStateException()       <---------------处理数据抛反常
    }
//日志,程序报错
main @coroutine#1 Flow collect 1
main @coroutine#1 Flow onCompletion java.lang.IllegalStateException
Exception in thread "main" java.lang.IllegalStateException
...略...

所以要留意发送时分的反常是能够捕获的,可是处理数据时的反常是没有办法捕获的。catch 的效果域,仅限于 catch 的上游。

4、flowOn、launchIn

flowOnlaunchIn首要效果是切换线程。可是它们的运用有一些需求留意的当地。

(1)、flowOn

flow {
    emit(1)
    emit(2)
}.filter {
    printMsg("filter $it")
    it > 1
}.flowOn(Dispatchers.IO)     <----------切换线程
.map {
    printMsg("map $it")
    it * 2
}.collect {
    printMsg("collect $it")
}
//日志
DefaultDispatcher-worker-1 @coroutine#2 filter 1    <------filter在子线程
DefaultDispatcher-worker-1 @coroutine#2 filter 2    <------filter在子线程
main @coroutine#1 map 2                             <------map在主线程
main @coroutine#1 collect 4                         <------collect在主线程

能够看到flowOn切换线程只在它的上游起效果。

(2)、launchIn

假如想让flowOn上游的代码在子线程履行,而flowOn下面的代码在主线程履行怎么办?下面是一个比方:

lifecycleScope.launch(Dispatchers.Main) {   <------lifecycleScope上下文地点的线程是主线程main
    flow {                                 _______
        log("flow emit 1")                    |
        emit(1)                               |
        log("flow emit 2")                    |
        emit(2)                           IO线程履行
    }.map {                                   |
        log("flow map $it")                   |
        it * 2                                |
    }.flowOn(Dispatchers.IO)               ___|____
        .onEach {                             |
            log("flow onEach $it")            |
        }.onCompletion {                      |
            log("flow onCompletion $it")  Main线程履行
        }.catch {                             |
            log("flow catch $it")             |
        }.launchIn(lifecycleScope)            |
}                                         ____|___
//日志
flow emit 1 线程:DefaultDispatcher-worker-1 @coroutine#4
flow map 1 线程:DefaultDispatcher-worker-1 @coroutine#4
flow emit 2 线程:DefaultDispatcher-worker-1 @coroutine#4
flow map 2 线程:DefaultDispatcher-worker-1 @coroutine#4
flow onEach 2 线程:main @coroutine#3
flow onEach 4 线程:main @coroutine#3
flow onCompletion null 线程:main @coroutine#3

launchIn将它上游的代码切换到给定上下文lifecycleScope的线程履行,这样就能够让上面的业务代码在IO线程履行,而成果回调到UI线程履行。这儿没有collect作为完毕符,而是运用的onEach,但追踪onEach的源码,调用的还是collect

5、retry,takeWhile,buffer

retry反常时重试次数,示例如下:

fun main() = runBlocking {
    flow {
        printMsg("emit 1")
        emit(1)
        printMsg("emit 2")
        emit(2)
    }.onEach {
        printMsg("onEach $it")
        if (it == 2) {
            throw RuntimeException("Exception on $it") // 抛出反常
        }
    }.retry(1) // 重试2次
        .catch { printMsg("catch $it") }
        .collect { printMsg("collect $it") }
}
//日志
main @coroutine#1 emit 1
main @coroutine#1 onEach 1
main @coroutine#1 collect 1
main @coroutine#1 emit 2
main @coroutine#1 onEach 2
main @coroutine#1 emit 1
main @coroutine#1 onEach 1
main @coroutine#1 collect 1
main @coroutine#1 emit 2
main @coroutine#1 onEach 2
main @coroutine#1 catch java.lang.RuntimeException: Exception on 2

留意retry重试的是整个代码块,代码中的1其实是没有问题的也重试了。

takeWhile – 依照给定的条件从流中获取元素,直到条件不成立

fun main() = runBlocking<Unit> {
    // 输出小于5的数字
    (1..10)
        .asFlow()
        .takeWhile { it < 5 }
        .collect { println(it) } // 输出 1 2 3 4
}

buffer-在中心处理和搜集操作之间缓冲流

.buffer() // 缓冲流

6、zip,combine

zipcombine组合不同flow的元素

fun main() = runBlocking {
    val nums = (1..4).asFlow()
    val strs = flowOf("one", "two", "three")
    nums.zip(strs) { a, b -> "$a -> $b" }
        .collect { printMsg(it) }
}
//日志
main @coroutine#1 1 -> one
main @coroutine#1 2 -> two
main @coroutine#1 3 -> three

替换上面的zipcombine,输出日志为:

main @coroutine#1 1 -> one
main @coroutine#1 2 -> two
main @coroutine#1 3 -> three
main @coroutine#1 4 -> three       <------差异首要表现在这儿

zipcombine操作符的差异在于它们组合Flow的办法。zip操作符会按次序一一对应地组合两个Flow中的项,而combine操作符则会将每个Flow中的最新项作为参数应用于给定的lambda函数。因而,zip操作符只要当两个Flow的项数相一起才会发生输出,而combine操作符则能够组合项数不同的Flow并能当即输出组合成果。

7、flatMapConcat,flatMapMerge

这二个操作符都归于展平流,怎么理解和差异呢? 看下二个操作符的代码

fun main() = runBlocking {
    (1..2).asFlow()
        .flatMapConcat {
            //将元素转化成了一个新的flow
            zipElement(it)
        }.collect { printMsg(it) }
}
//将元素转化成了一个新的flow
fun zipElement(int: Int): Flow<String> =
    flow {
        emit("$int -> Hello")
        delay(1000)
        emit("$int -> Kotlin")
    }
//日志
main @coroutine#1 1 -> Hello
main @coroutine#1 1 -> Kotlin
main @coroutine#1 2 -> Hello
main @coroutine#1 2 -> Kotlin

替换上面的flatMapConcatflatMapMerge,输出日志为:

main @coroutine#1 1 -> Hello
main @coroutine#1 2 -> Hello
main @coroutine#1 1 -> Kotlin
main @coroutine#1 2 -> Kotlin

flatMapConcatflatMapMerge操作符都是用于将一个Flow的项转化为其他Flow的操作符,但运用的组合策略不同。

flatMapConcat操作符会次序地组合每个转化后的Flow,即它会等候前一个转化后的Flow完结后再开始下一个转化。因而,它会按次序发出一切转化后的Flow的项。常用于串行的数据传递和转化,比方串行的网络恳求等。

flatMapMerge操作符会并发地组合每个转化后的Flow,即它会当即开始下一个转化而不等候前一个转化完结。因而,它会当即发出一切转化后的Flow的项,并经过任意次序发射它们。

8、sample,debounce

sample操作符会在必定的时刻距离内,发射最新的元素。例如,假如你设置了一个500ms的距离,那么该操作符会每隔500ms发射最近的元素。因而,sample操作符一般用于操控发射元素的数量和频率。

另一方面,debounce操作符需求等候在必定时刻内没有新的元素,才会将最终一个元素发射出去。例如,假如你设置了500ms的时刻距离,那么假如500ms内有新元素抵达,那么之前的元素都会被疏忽,只要最新的元素会发射出去。因而,debounce操作符一般用于制止在时刻距离内接连发射相同的元素。

综上所述,sample操作符会在必定的时刻距离内,发射最新的元素,而debounce操作符则需求等候在必定时刻内没有新的元素,才会将最终一个元素发射出去。这两个操作符有着不同的用途,需求根据详细状况来挑选运用。

fun main() = runBlocking {
    //sample
    flowOf("A", "B", "C", "D", "E", "F")
        .onEach { delay(100) }
        .sample(250)
        .collect { printMsg(it) }
    // 运用 debounce 操作符来从一个 Flow 中去重接连发送的相同音讯
    flowOf(1, 2, 2, 3, 3, 3, 4, 4, 4, 4)
        .onEach { delay(100) }
        .debounce(150) // 用于制止在 150ms 内发射相同值的操作符
        .collect { printMsg("Distinct value: $it") }
}
fun printMsg(msg: String) {
    println("${Thread.currentThread().name} $msg")
}
//日志
main @coroutine#1 B
main @coroutine#1 D
main @coroutine#1 Distinct value: 4

9、distinctUntilChanged

过滤重复的值

val flow = flowOf(1, 2, 2, 3, 3, 3, 4, 5, 5)
// 运用 distinctUntilChanged 过滤接连重复的元素
flow.distinctUntilChanged().collect {
    printMsg("flow $it")
}
//日志
main @coroutine#1 flow 1
main @coroutine#1 flow 2
main @coroutine#1 flow 3
main @coroutine#1 flow 4
main @coroutine#1 flow 5

三、StateFlow

1、StateFlow,MutableStateFlow

MutableStateFlowStateFlow 的子类,与其具有相同的行为和功用,一起还具有一些不同之处。与 StateFlow 只能在创立时设置其初始值不同,MutableStateFlow 能够在任何时分更改其值。这使得 MutableStateFlow 更适合于表明可变状况。所以咱们在运用时经常能够看到如下的代码:

class MyViewModel : ViewModel() {
    //可变的设置为私有的不对外露出
    private val _counter = MutableStateFlow(0)
    //不可变的对外露出
    val counter: StateFlow<Int> = _counter或_counter.asStateFlow()
    fun incrementCounter() {
        //修正值
        _counter.value++
    }
}
class MainActivity : AppCompatActivity() {
    private val viewModel: MyViewModel by viewModels()
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        viewModel.counter.collect { count ->
            // Display the current count
        }
        button.setOnClickListener {
            viewModel.incrementCounter()
        }
    }
}

2、简易案例

下面持续看一个查找防抖的雏形案例,简易代码如下:

class MainActivity : AppCompatActivity() {
    private val etFlow = MutableStateFlow("et_Hint")
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        //打印协程称号
        System.setProperty("kotlinx.coroutines.debug", "on")
        val et = findViewById<AppCompatEditText>(R.id.et)
        et.doAfterTextChanged {
            //假如不为空或许null就发射流
            if (!TextUtils.isEmpty(it)) {
                etFlow.value = it.toString()
            }
        }
        lifecycleScope.launch {
            //在1秒采样周期内只发射最新的值
            etFlow.sample(1000).collect {
                //获取值去查找,这儿就打印日志
                log(it)
            }
        }
    }
    /**
     * 打印日志
     */
    private fun log(msg: String) {
        Log.d("LOG_PRINT", "内容:$msg 线程:${Thread.currentThread().name}")
    }
}
//日志
内容:et_Hint 线程:main @coroutine#2
内容:123 线程:main @coroutine#2

能够看到创立MutableStateFlow需求给默许参数,上面代码中是et_Hint,而且这个默许值会首要发射出来,经过日志能够看到后边在EditText中输入123也打印出来了。

上面的代码其实有一个问题,当咱们先输入1这个时分发射1,然后咱们删除1输入框内容为空不发射任何数据,当咱们再次输入1,这次的1是不会被发送的,这是因为给MutableStateFlow赋值,假如二次的数据的哈希值一样,后边的数据不会被发送(能够看下源码),这明显不契合查找的需求,解决办法就是包一下并重写hashCode办法回来随机数让它每次的值都不相同:

data class MutableStateFlowBean(var content:String) {
    override fun equals(other: Any?): Boolean=false
    override fun hashCode(): Int {
        return Random.nextInt()
    }
}

3、运用场景

(1)在ViewModel中运用StateFlow

这是最常用的办法,能够参阅上面三、1中的一段代码

(2)怎么将StateFlow的值安全露出给UI层,不受生命周期困扰?

能够经过asLiveData()StateFlow转化为LiveData
先导包implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0"

看如下的代码,界说一个按钮,发送5个数据,经过StateFlowasLiveData()和一般的LiveData接纳:

class MainViewModel : ViewModel() {
    //flow
    private val mutableStateFlow = MutableStateFlow(0)
    val flowLiveData = mutableStateFlow.asLiveData()      <------这儿asLiveData()
    //liveData
    val liveData = MutableLiveData<Int>()
    fun sendData(value: Int) {
        //flow发送值
        mutableStateFlow.value = value
        //livedata发送值
        liveData.value = value
    }
}
class MainActivity : AppCompatActivity() {
    private val mainViewModel: MainViewModel by viewModels()
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        //打印协程称号
        System.setProperty("kotlinx.coroutines.debug", "on")
        //点击按钮发送5个数据
        val bt = findViewById<AppCompatButton>(R.id.bt)
        bt.setOnClickListener {
            lifecycleScope.launch {
                (1..5).forEach {
                    delay(1500)
                    log("sendData:$it")
                    mainViewModel.sendData(it)
                }
            }
        }
        //flow转的livedata
        mainViewModel.flowLiveData.observe(this) {
            log("flowLiveData:$it")
        }
        //livedata
        mainViewModel.liveData.observe(this) {
            log("liveData:$it")
        }
    }
}

半途息屏一段时刻再亮屏,日志输出如下:

内容:flowLiveData:0 线程:main @coroutine#1   <------asLiveData后flowLiveData收到了默许值0
内容:sendData:1 线程:main @coroutine#4       <------点击按钮开始发射数据
内容:flowLiveData:1 线程:main @coroutine#1   
内容:liveData:1 线程:main @coroutine#4
内容:sendData:2 线程:main @coroutine#4
内容:flowLiveData:2 线程:main @coroutine#1
内容:liveData:2 线程:main @coroutine#4
内容:sendData:3 线程:main @coroutine#4       <------息屏,flowLiveData和liveData都没接纳数据
内容:sendData:4 线程:main @coroutine#4
内容:sendData:5 线程:main @coroutine#4
内容:flowLiveData:5 线程:main @coroutine#6   <------亮屏,flowLiveData和liveData都接纳到数据
内容:liveData:5 线程:main

除了先会收到默许值,其他的契合预期。

(3) 一处接纳多处订阅

比方经过MQTT接纳机器的多种状况,然后许多页面都需求订阅机器的实时状况,那么能够测验这样做。

界说一个单例的类,类里面有设置Flow值的办法,也有二个获取Flow值的办法,其中一个关怀页面的生命周期,另一个不关怀:

class SimpleFlowClient {
    private val mutableStateFlow = MutableStateFlow("offline")
    private val stateFlow: StateFlow<String> = mutableStateFlow
    /**
     * 单例模式
     */
    companion object {
        private var instance: SimpleFlowClient? = null
        @Synchronized
        fun getInstance(): SimpleFlowClient {
            return instance ?: SimpleFlowClient().also { instance = it }
        }
    }
    /**
     * 设置Flow的值
     */
    fun setFlowValue(value: String) {
        mutableStateFlow.value = value
    }
    /**
     * 获取Flow的值,不关怀页面的生命周期,
     * 运用LifecycleCoroutineScope是页面封闭时一起封闭协程
     * @param scope lifecycleScope
     */
    fun getFlowValue(
        scope: LifecycleCoroutineScope,
        result: (value: String) -> Unit = { _ -> }
    ) {
        scope.launch {
            stateFlow.collect {
                result.invoke(it)
            }
        }
    }
    /**
     * 获取Flow的值,转化成LiveData,关怀页面的生命周期
     * @param owner lifecycleScope
     */
    fun getFlowValueAsLiveData(
        owner: LifecycleOwner,
        observer: (value: String) -> Unit = { _ -> }
    ) {
        stateFlow.asLiveData().observe(owner) {
            observer(it)
        }
    }
}

分别在MainActivityTwoActivity中做测验:

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        //打印协程称号
        System.setProperty("kotlinx.coroutines.debug", "on")
        //点击按钮跳转
        val bt = findViewById<AppCompatButton>(R.id.bt)
        bt.setOnClickListener {
            //跳转到TwoActivity
            Intent(this, TwoActivity::class.java).also { startActivity(it) }
            //2秒后修正了StateFlow的值
            Handler(Looper.myLooper()!!).postDelayed({
                SimpleFlowClient.getInstance().setFlowValue("online")
            }, 2000)
        }
        //盯梢Flow的数据
        SimpleFlowClient.getInstance().getFlowValue(lifecycleScope) {
            log("MainActivity:flow:$it")
        }
        //盯梢Flow的数据
        SimpleFlowClient.getInstance().getFlowValueAsLiveData(this) {
            log("MainActivity:asLiveData:$it")
        }
    }
}
class TwoActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_two)
        //盯梢Flow的数据
        SimpleFlowClient.getInstance().getFlowValue(lifecycleScope) {
            log("TwoActivity:flow:$it")
        }
        //盯梢Flow的数据
        SimpleFlowClient.getInstance().getFlowValueAsLiveData(this) {
            log("TwoActivity:asLiveData:$it")
        }
    }
}

日志如下:

------------------------进入MainActivity收到的值----------------------
内容:MainActivity:flow:offline 线程:main @coroutine#2
内容:MainActivity:asLiveData:offline 线程:main @coroutine#3
------------------------点击按钮进入TwoActivity收到的值------------------------
内容:TwoActivity:flow:offline 线程:main @coroutine#5
内容:TwoActivity:asLiveData:offline 线程:main @coroutine#6
------------------------在TwoActivity页面2秒后收到的值-------------------
内容:MainActivity:flow:online 线程:main @coroutine#2
内容:TwoActivity:flow:online 线程:main @coroutine#5
内容:TwoActivity:asLiveData:online 线程:main @coroutine#6
------------------------回来MainActivity页面后收到的值-------------------
内容:MainActivity:asLiveData:online 线程:main @coroutine#8

全体契合预期。

4、需求留意的问题

(1) StateFlow是SharedFlow高度封装的

跟下StateFlow的源码,能够看到有如下的代码:

/**
 * [StateFlow] that represents the number of subscriptions.
 *                                                                   ------看这儿↓-------
 * It is exposed as a regular [StateFlow] in our public API, but it is implemented as [SharedFlow] undercover to
 * avoid conflations of consecutive updates because the subscription count is very sensitive to it.
 *
 * The importance of non-conflating can be demonstrated with the following example:
 * ```
 * val shared = flowOf(239).stateIn(this, SharingStarted.Lazily, 42) // stateIn for the sake of the initial value
 * println(shared.first())
 * yield()
 * println(shared.first())
 * ```
 * If the flow is shared within the same dispatcher (e.g. Main) or with a slow/throttled one,
 * the `SharingStarted.Lazily` will never be able to start the source: `first` sees the initial value and immediately
 * unsubscribes, leaving the asynchronous `SharingStarted` with conflated zero.
 *
 * To avoid that (especially in a more complex scenarios), we do not conflate subscription updates.
 */
private class SubscriptionCountStateFlow(initialValue: Int) : StateFlow<Int>,
    SharedFlowImpl<Int>(1, Int.MAX_VALUE, BufferOverflow.DROP_OLDEST)        <------留意这儿
{
    init { tryEmit(initialValue) }
    override val value: Int
        get() = synchronized(this) { lastReplayedLocked }
    fun increment(delta: Int) = synchronized(this) {
        tryEmit(lastReplayedLocked + delta)
    }
}
public enum class BufferOverflow {
    /**
     * Suspend on buffer overflow.
     */
    SUSPEND,
    /**
     * Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
     */
    DROP_OLDEST,                                                            ------看这儿↑-------
    /**
     * Drop **the latest** value that is being added to the buffer right now on buffer overflow
     * (so that buffer contents stay the same), do not suspend.
     */
    DROP_LATEST
}

需求留意一些小细节:
① StateFlow是根据SharedFlow完成的,它是高度封装的SharedFlow。
② StateFlow的缓存容量为Int.MAX_VALUE
③ StateFlow的缓存策略为丢掉最旧的值且不挂起

(2) StateFlow中怎么界说什么是新的值?

根据以上的源码,看下面的代码会输出什么?

fun main() {
    runBlocking {
        val mutableStateFlow = MutableStateFlow(0)
        //提早订阅的协程
        launch {
            mutableStateFlow.collect {
                printMsg("collect before $it")
            }
        }
        //修正StateFlow的值,分别修正为0-100
        launch {
            for (i in 0..100) {
                mutableStateFlow.value = i
                printMsg(" send  $i")
            }
        }
        //稍后订阅的协程
        launch {
            mutableStateFlow.collect {
                printMsg("collect after $it")
            }
        }
    }
}
//日志
main @coroutine#2 collect before 0
main @coroutine#3  send  0
main @coroutine#3  send  1
...省掉send 2-98的输出...
main @coroutine#3  send  99
main @coroutine#3  send  100
main @coroutine#4 collect after 100
main @coroutine#2 collect before 100

能够看到提早订阅的协程的只收到了默许值0和最终的100,而稍后订阅的协程只收到了最终的100,中心的值都没有收到,这是为什么?

经过源码打断点,和官方的注释能够看到

kotlin-协程(七)关于Flow

kotlin-协程(七)关于Flow

只呈现了0100二个值,其他的值都没有呈现在断点中……官方注释(上图红框内)的解说是:
Here the coroutine could have waited for a while to be dispatched, so we use the most recent state here to ensure the best possible conflation of stale values 这儿协程可能会等候一段时刻才能被调度,所以咱们在这儿运用最近的状况来保证陈旧值的最佳可能合并

深层次的原因不得而知。假如改形成发送一个数据就挂起一次会怎么样呢? 把上面的代码做一个小小的修正:

//修正StateFlow的值,分别修正为0-100
launch {
    for (i in 0..100) {
        mutableStateFlow.value = i
        printMsg(" send  $i")
        delay(1)      <----------------改动在这儿:加了一个挂起函数
    }
}

成果发现一切的中心值都收到了。根据现象能够得出结论:协程挂起或许履行完才会被判定有新的更新,就会更新StateFlow的值。

假如运用StateFlow发送了多个值,且中心的每个值都需求处理,那就需求留意上面的问题。假如多个值建议换成SharedFlow

(3) 为什么引荐直接给value赋值,而不是经过emit和tryEmit发射新的值?

首要,需求清晰的是,StateFlow的状况值是不可变的。也就是说,一旦状况值被赋值,就不能再次修正。因而,假如咱们运用emittryEmit办法来发射新的状况值,就需求创立一个新的目标来代表新的状况值。这样做会导致内存分配和废物收回的开支,然后影响程序的功能。

比较之下,直接给value特点赋值能够防止这种开支。因为value特点是一个可变的变量,咱们能够直接修正它的值,而不需求创立新的目标。这样做不仅能够进步程序的功能,还能够防止因为频频创立目标而导致的内存泄漏等问题。

别的,需求留意的是,emittryEmit办法是异步的,它们会将新的状况值放入一个队列中,等候下一次事情循环时再进行处理。而直接给value特点赋值是同步的,它会当即更新状况值,并通知一切的观察者。因而,在某些状况下,直接给value特点赋值可能愈加方便和牢靠。

综上所述,虽然StateFlow提供了多种办法来更新状况值,可是引荐运用直接给value特点赋值的办法。这样做能够防止内存分配和废物收回的开支,进步程序的功能,一起也愈加方便和牢靠。

(4) distinctUntilChanged无效

关于一般的flow来说distinctUntilChanged是收效的(见上面二、9、distinctUntilChanged操作符),可是关于StateFlow来说因为自身就会比照前后的值,所以不建议在StateFlow中运用这个操作符。

(5) 假如不想处理默许值能够怎么办

提早订阅(会收到默许值)的前提下,运用drop操作符

val mutableStateFlow = MutableStateFlow(0)
//提早订阅
launch {
    mutableStateFlow.drop(1).collect {
        printMsg("collect $it")
    }
}

(6) emit和tryEmit的差异?

emit 函数是一个挂起函数,它能够在流中发射一个元素。它将暂停当时协程的履行,直到一切已订阅该流的搜集器都成功接纳到这个元素,然后该协程才会持续履行。假如在发射元素时发生反常,则该反常会向上抛出,而且该流的一切搜集器都将失败。

tryEmit 函数也是用于发射元素的函数,但与 emit 不同,它不是一个挂起函数。相反,它会当即回来一个布尔值,指示元素是否已成功发射。假如已成功发射元素,则 tryEmit 函数回来 true;假如已有搜集器撤销了订阅,或许流现已被中止,则 tryEmit 函数回来 false。

因而,emit 和 tryEmit 的首要差异在于,emit 函数是一个挂起函数,有必要等候一切订阅者接纳到元素后才会回来,而 tryEmit 函数是一个非挂起函数,会当即回来一个布尔值来指示元素是否已成功发射。在运用时,需求根据详细的状况来挑选运用哪个函数。

四、SharedFlow

1、SharedFlow,MutableSharedFlow

看一下MutableSharedFlow办法的源码

@Suppress("FunctionName", "UNCHECKED_CAST")
public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
  ...略...
}

MutableSharedFlow的结构函数包含三个参数,分别是replayextraBufferCapacityonBufferOverflow

(1) replay参数表明是否需求缓存最新的元素值,并在新的观察者加入时自动发送。假如该参数为0,则不会缓存最新的元素值;假如该参数为正整数n,则会缓存最新的n个元素值,并在新的观察者加入时自动发送这些元素值。该参数的默许值为0。

(2)extraBufferCapacity参数表明缓冲区的额定容量。假如该参数为0,则缓冲区的容量与replay参数指定的容量相同;假如该参数为正整数n,则缓冲区的容量为replay+n。该参数的默许值为0。

(3)onBufferOverflow参数表明当缓冲区溢出时的处理办法。假如缓冲区已满,而又有新的元素要发射时,就会发生缓冲区溢出。该参数能够是以下三种值之一:

  • BufferOverflow.SUSPEND:挂起当时协程,直到缓冲区有空间能够存储新的元素。
  • BufferOverflow.DROP_OLDEST:丢掉最旧的元素,以腾出空间存储新的元素。
  • BufferOverflow.DROP_LATEST:丢掉最新的元素,不将其存储到缓冲区中。

假如不指定该参数,则默许为BufferOverflow.SUSPEND

2、shareIn

ShareIn办法的源码为:

public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T> {
    ...略...
}

三个参数的意义分别为:

(1) 参数一:CoroutineScope

指定了同享流的效果域,即同享流将在哪个协程范围内履行。这个参数的值应该是一个CoroutineScope类型的目标。

在Kotlin协程中,CoroutineScope代表了一个协程的上下文和生命周期,它能够用来发动协程,或许用来撤销一个协程以及其子协程。

在运用shareIn创立同享流时,你能够传递一个协程效果域,表明同享流将在该效果域内履行。这样,当协程效果域被撤销时,同享流也将被撤销,然后防止资源泄漏。

(2) 参数二:SharingStarted

SharingStarted 具有以下几个选项:

  • WhileSubscribed:默许选项,表明当至少有一个订阅者时才发动同享暖流。也就是说,在第一个订阅者订阅同享暖流之前,同享暖流是不会发动的,而在最终一个订阅者撤销订阅后,同享暖流会自动中止。

    传入时刻(毫秒值)的意义是什么?

     public fun WhileSubscribed(
        stopTimeoutMillis: Long = 0,   //毫秒值
        replayExpirationMillis: Long = Long.MAX_VALUE
     ): SharingStarted = StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)
    

    同享流在没有订阅者时最长的存活时刻,假如在此期间没有订阅者,同享流将自动封闭。

  • Eagerly:表明在调用 shareIn 时当即发动同享暖流,无论是否有订阅者。这意味着即使没有订阅者,同享暖流也会一直运转,直到手动撤销。

  • Lazily:表明在第一个订阅者订阅同享暖流时才发动同享暖流,而在最终一个订阅者撤销订阅后,同享暖流会自动中止。与 WhileSubscribed 比较,Lazily 选项会在第一个订阅者订阅之前等候一段时刻,因而能够防止在没有订阅者的状况下糟蹋资源。

  • Started:表明在调用 shareIn 时当即发动同享暖流,而且不会自动中止,直到手动撤销。

(3) 参数三:replay

假如有新的订阅者订阅,回来缓存数据的最新数据的个数。

(4) shareIn的优势在哪里?

  • 节约内存和核算资源:当多个观察者订阅同一数据流时,运用 shareIn 能够防止为每个观察者创立新的数据流。相反,一切观察者都能够同享同一数据流。这能够大大削减内存和核算资源的运用,特别是在订阅者数量很大的状况下。

  • 进步功能和呼应性:当多个观察者同享同一数据流时,数据只需求被核算和分发一次,而不需求为每个观察者核算和分发一次。这能够削减核算时刻和延迟,并进步应用程序的呼应性。

  • 简化代码:运用 shareIn 能够使代码更简单和更易于维护。它能够防止重复代码,使数据同享和订阅愈加直观和易于理解。

总之,shareIn 函数能够进步应用程序的功能和呼应性,削减内存和核算资源的运用,并简化代码。这使得它在 Android 开发中非常有用,尤其是在需求同享数据的状况下。

(5) shareIn运用场景

操作同一份数据源,需求运用数据时将冷流转化为暖流

//同一份数据源
val flow = flow {
    for (i in 1..3) {
        delay(1000)
        emit(i)
        printMsg("emit $i")
    }
}
//Flow是冷流不会自动发射数据,转化成暖流才会自动发射数据。这样在需求的时分再转化,能够节约资源。
val shareInFlow =
    flow.shareIn(lifecycleScope, SharingStarted.WhileSubscribed(5000), 1)
// 在协程中搜集数据。Activity封闭,lifecycleScope效果域的协程中止,暖流也会中止
lifecycleScope.launchWhenCreated {
    shareInFlow.collect {
        printMsg("collect before $it")
    }
}
lifecycleScope.launchWhenCreated {
    shareInFlow.collect {
        printMsg("collect after $it")
    }
}
//日志
内容:collect before 1 线程:main @coroutine#6
内容:collect after 1 线程:main @coroutine#7
内容:emit 1 线程:main @coroutine#9
内容:collect before 2 线程:main @coroutine#6
内容:collect after 2 线程:main @coroutine#7
内容:emit 2 线程:main @coroutine#9
内容:collect before 3 线程:main @coroutine#6
内容:collect after 3 线程:main @coroutine#7
内容:emit 3 线程:main @coroutine#9

更有用一点的场景: 比方做蓝牙开发,许多时分,咱们会将蓝牙设备给App的数据处理后直接发射出去,不管页面需不需求,就一直发,这会糟蹋必定的资源,咱们能够把蓝牙的数据包装成冷流,需求的时分转化为暖流,下面是一个粗陋的比方:

class MainActivity : AppCompatActivity() {
    private var mBleData = ""
    private var flow: Flow<String>? = null
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        //打印协程称号
        System.setProperty("kotlinx.coroutines.debug", "on")
        //发射蓝牙的数据
        emitBleData()
        //需求运用数据时将冷流转化成暖流
        val shareInFlow =
            flow!!.shareIn(lifecycleScope, SharingStarted.WhileSubscribed(5000), 1)
        //假如不想收到的数据重复,能够经过stateIn转化成StateFlow
        // flow!!.stateIn(lifecycleScope, SharingStarted.WhileSubscribed(5000), 1)
        // 需求数据时订阅
        lifecycleScope.launchWhenCreated {
            shareInFlow.collect {
                printMsg("collect: $it")
            }
        }
    }
    private fun emitBleData() {
        flow = flow {
            while (true) {
                //假设蓝牙数据3秒发射一次,这儿也延迟3秒
                delay(3000)
                //收到的数据往往不在这儿,可是咱们能够把它界说为成员变量,然后把新的成员变量发射出来
                mBleData = getBleData()
                emit(mBleData)
                printMsg("emit: $mBleData $flow")
            }
        }
    }
    //模仿蓝牙数据: 随机发生数据
    private fun getBleData(): String = "ble data ${Random.nextInt(100)}"
}

3、运用场景

  • 从一个单一数据源同享数据

  • 假如将参数界说为BufferOverflow.DROP_OLDEST(丢掉旧数据),MutableSharedFlow能够作为一个固定长度的缓冲区运用。

  • SharedFlow在同享的ViewModel中同享数据。(这儿涉及一个知识点怎么在项目中设计一个大局可用的ViewModel目标,这样都能访问同一份数据源?)

4、SharedFlow中的数据是怎么移动的?

kotlin-协程(七)关于Flow

一般状况下,当调用emit办法发射数据时,假如缓存数组的buffered values未达到最大容量,则发射的数据将保存到缓存中,并当即回来emit办法。假如缓存数组的buffered values已达到最大容量,则调用emit办法的协程会被当即挂起,而且它的续体和数据会被封装成一个Emitter类型的目标,保存到缓存数组的queued emitters中。

当buffered values中方位为0的数据被一切的订阅者都处理后,buffered values会前移动一位。这时,queued emitters中方位为7的Emitter类型的目标就会被“拆箱”,将其中保存的数据寄存到方位7,一起恢复其中保存的emit办法地点续体的履行。之后,方位7将作为buffered values的一部分。

参阅了以下内容:

为什么说Flow是冷的?

Kotlin协程:MutableSharedFlow的完成原理

学习笔记