先来点废话
现在公司项目虽说全体架构MVVM,但是事务杂乱导致 ViewModel 过于巨大,就学习了下android 官方最新推的架构,留意到了官方现在主推的学习appNow in Android,看到了里边关于 SharedFlow/StateFlow 的运用,发现原来学的基本只能留个大概印象了,运用到项目里需求再细抠,就又大概捋了一遍。 之后发现现在项目中,一些 Flow 的每一次订阅都对应一次数据库的查询操作… 所以这篇文章就来了…

现在网上文章痛点

  • 照搬照抄,一个比如搬来搬去,看完后仍然无法合理挑选运用场景
  • 有些文章太老,Flow 相关API 变化较快,易误导初学者

本文目的

  • 正确认识冷数据流/热数据流差异
  • 依据运用场景正确挑选 Flow/StateFlow/SharedFlow
  • 依据 Android 官方引荐,把握现在最合理的 Flow 用法
  • 检查项目中已有运用场景是否正确合理,最好整个优化,赚个OKR

ps:文末附一张思想导图 和 demo 链接。

冷数据流/热数据流

冷数据流:

  • 当履行订阅的时候,上游发布者才开端发射数据流。
  • 订阅者与发布者是一一对应的联系,即当存在多个订阅者时,每个新的订阅者都会重新收到完整的数据。
  • flow 是冷流,flow有了订阅者 Collector 之后,发射出来的值才会实实在在的存在于内存之中,跟懒加载的概念很像。

热数据流:

  • 从数据流搜集数据不会触发任何供给方代码,不管是否被订阅,上游发布者都会发送数据流到内存中。
  • 订阅者与发布者是一对多的联系,当上游发送数据时,多个订阅者都会收到音讯。
  • StateFlow/SharedFlow 是暖流。

StateFlow/SharedFlow

SharedFlow

private val _showDialogFlow = MutableSharedFlow<Boolean>()
val showDialogFlow : SharedFlow<Boolean> = _showDialogFlow
//出产数据
_showDialogFlow.emit(true)

以上为引荐写法 — MutableStateFlow 更新状况并将其发送到数据流,运用 StateFlow 获取当时状况和状况更新。运用类型MutableSharedFlow的后备特点将数据项发送给数据流。
下面看下结构函数:

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {

replay:新的订阅者会收到的之前发送过的数据的个数,需 >= 0。
extraBufferCapacity:除 replay 外,额外缓存的数据个数,需 >= 0。
所以总的 buffer 容量为 bufferSize = replay + extraBufferCapacity
当出产者速度 > 顾客速度时,bufferSize 总会被填满,填满后再来的数据流就会运用到背压战略 BufferOverflow。

BufferOverflow 背压战略

  • SUSPEND,默许战略,填满后再来的数据流,发送会被挂起,若 bufferSize <= 0,此战略不行更改。
  • DROP_OLDEST,丢弃最旧的值,eg:在我司直播页面,评论区音讯就运用了这个战略。
  • DROP_LATEST,丢弃最新的值。

SharedFlow 合适行为 case or 事情告诉。

StateFlow

private val _testAFlow = MutableStateFlow(0)
val testAFlow: StateFlow<Int> = _testAFlow
//出产数据
_testAFlow.value = para

写法同上,看下结构函数。

public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)

StateFlow 类似于下面这个写法的 SharedFlow。

MutableSharedFlow<Boolean>(1)

当新订阅者开端从数据流中搜集数据时,它将接纳信息流中的最近一个状况及任何后续状况,类似于LiveData。 StateFlow 运用 CAS 方法赋值,且默许防抖。基于此特性,合适描述 android 中的 Ui 状况

关于二者还有很多用法和 API,这儿不赘述,自行查阅。

Flow 引荐用法

搜集上文中 testAFlow 方法有两种,如下:

//示例1,即第一种方法,不引荐这种方法
lifecycleScope.launchWhenStarted {
    model.testAFlow.collect{
       binding.mainTv.text = "$it"
    }
}
//示例2,即第二种方法,也是官方现在引荐的方法
lifecycleScope.launch {
    viewLifecycleOwner.lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
        model.testAFlow.collect {
            binding.mainTv.text = "$it"
        }
    }
}

LifecycleCoroutineScope 的 launchWhenXXX 系列

上面的示例1代码中,假如Lifecycle未至少处于所需的最低状况 STARTED,则会挂起在这些块内运转的任何协程。留意这儿要点是挂起,不是撤销。依据源码可追踪到如下代码:

private val observer = LifecycleEventObserver { source, _ ->
    if (source.lifecycle.currentState == Lifecycle.State.DESTROYED) {
        // cancel job before resuming remaining coroutines so that they run in cancelled
        // state
        handleDestroy(parentJob)
    } else if (source.lifecycle.currentState < minState) {
    //1.这儿是挂起,暂停协程的履行
        dispatchQueue.pause()
    } else {
    //2.这儿是康复协程的履行
        dispatchQueue.resume()
    }
}

并且此 API 在最新版本中已被抛弃「androidx.lifecycle:lifecycle-common:2.6.1」。

用错了Flow?每一次订阅都对应一次数据库的查询操作?Flow/StateFlow/SharedFlow 正确使用姿势

repeatOnLifecycle 函数

上面的示例2代码中,表示相关的Lifecycle 至少处于STARTED状况时运转,并且会在 Lifecycle 处于STOPPED状况时撤销运转。留意这儿要点是撤销,不是挂起。部分源码如下:

observer = LifecycleEventObserver { _, event ->
    if (event == startWorkEvent) {
        // Launch the repeating work preserving the calling context
        launchedJob = this@coroutineScope.launch {
            //指定生命周期内,创建协程履行传递过来的代码块
            mutex.withLock {
                coroutineScope {
                    block()
                }
            }
        }
        return@LifecycleEventObserver
    }
    if (event == cancelWorkEvent) {
    //非指定生命周期内撤销履行
        launchedJob?.cancel()
        launchedJob = null
    }
    if (event == Lifecycle.Event.ON_DESTROY) {
        cont.resume(Unit)
    }
}

前述小结

  • 倾向于运用repeatOnLifecycleAPI 搜集数据流,而launchWhenXAPI 。由于 launchWhenX API 会挂起协程,上游数据流会在后台保持活跃状况,并可能会耗用资源。
  • LiveData 具备感知生命周期的才能,但 StateFlow 不具备,所以需求结合 Lifecycle.repeatOnLifecycle 运用。
  • StateFlow 运用 CAS 方法赋值,且默许防抖。
  • StateFlow 可通过 value 特点获取最新值,SharedFlow 不行。
    综上:StateFlow 合适 UiState 场景,SharedFlow 合适行为 case or 事情告诉

项目实战

case:
有个从数据库中读取的全局变量,需求整个 app 中 N 个页面去监听状况变化「eg:当时用户的会员状况 or 某等级」,怎么去合理运用 Flow?

直接运用Flow方法

发布者人物
运用Android Jetpack的 Room,结合 Flow 的写法:

object AccountManager {
    val testFlow: Flow<DaoTestBean?> by lazy {
        AppDatabase.getInstance().testDao().getTestFlow()
    }
}

订阅者人物

留意前提:项目中 N 个页面会订阅 testFlow

lifecycleScope.launch {
    AccountManager.testFlow.flowWithLifecycle(lifecycle, Lifecycle.State.STARTED).collect {
        binding.main1Tv.text = "${it?.lastUseTime}"
    }
}

问题是什么呢?
看似运用了 flowWithLifecycle API,感知生命周期,但是 testFlow 是冷流,订阅者与发布者一一对应,当 N 个页面都注册时,每次都对应一次数据库的查询操作,这是极端不合理的。

运用暖流优化

发布者人物
依据事务,自行决定是运用 StateFlow or SharedFlow

private val coroutineIO = CoroutineScope(SupervisorJob() + Dispatchers.IO)
//接纳初始状况及后续改变,类似于livedata
val testStateFlow: StateFlow<DaoTestBean?> =
    AppDatabase.getInstance().testDao().getTestFlow().stateIn(coroutineIO, SharingStarted.Lazily, null)
//只接纳后续改变,类似于eventbus
val testSharedFlow = testStateFlow.shareIn(coroutineIO, SharingStarted.Lazily)

订阅者人物

viewLifecycleOwner.lifecycleScope.launch {
    viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
        launch {
            AccountManager.testStateFlow.collect {

            }
        }
        launch {
            AccountManager.testSharedFlow.collect {

            }
        }
    }
}

订阅者与发布者是一对多的联系,当上游发送数据时,多个订阅者都会收到音讯。所以即便 N 个订阅者,数据库只会对应一次查询操作,至于在实际运用中是运用 StateFlow or SharedFlow,就看事务场景如何对应了。

最终说点

本文运用的代码都在这儿了。

供给一个 repeatOnLifecycle 函数的封装:

inline fun <T> Flow<T>.launchAndCollectIn(
    owner: LifecycleOwner,
    minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
    crossinline action: suspend CoroutineScope.(T) -> Unit
) = owner.lifecycleScope.launchCoroutine {
    owner.repeatOnLifecycle(minActiveState) {
        collect {
            action(it)
        }
    }
}
inline fun Fragment.launchAndRepeatWithVLF(//VLF:viewLifecycle
    minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
    crossinline block: suspend CoroutineScope.() -> Unit
) {
    viewLifecycleOwner.lifecycleScope.launchCoroutine {
        viewLifecycleOwner.lifecycle.repeatOnLifecycle(minActiveState) {
            block()
        }
    }
}

文末附一张 SharedFlow/StateFlow 思想导图:

用错了Flow?每一次订阅都对应一次数据库的查询操作?Flow/StateFlow/SharedFlow 正确使用姿势

参考链接

StateFlow 和 SharedFlow
规划 repeatOnLifecycle API 背面的故事
运用 Android Jetpack 的 Room 部分将数据保存到本地数据库。