协程系列文章:

  • Kotlin协程之根底运用
  • Kotlin协程之深入理解协程作业原理
  • Kotlin协程之协程撤销与反常处理
  • Kotlin协程之再次读懂协程作业原理
  • Kotlin协程之Flow作业原理
  • Kotlin协程之一文看懂StateFlow和SharedFlow

个人觉得 Kotlin协程之再次读懂协程作业原理 剖析比较易懂,强行引荐,有需求的同学能够移步看看。

概述

Android 架构经过多年的开展和迭代,目前最常用的应该是 MVVM 了,至于 MVI 架构,在实践开发中我看到的比较少,但无论是哪一款,都是运用分层的架构思想,引证一张官网的引荐架构图:

Kotlin协程之一文看懂StateFlow和SharedFlow

关于 UI Layer, 咱们一般有两个组件:

  • View: 处理 UI 展示,用户事情监听,页面跳转等,比如说 Activity 和 Fragment
  • ViewModel: 向 View 供应数据

这二者通常通过呼应式编程来处理它们之间的交互,View 订阅 ViewModel 暴露的呼应式接口,接纳到通知后进行相应逻辑,而 ViewModel 不再持有任何形式的 View 的引证。常用的呼应式组件有 LiveData, RxJava, Flow 等:

  • RxJava: 个人很少运用,倒不是由于它欠好,而是…一向傻傻不太会用 RxJava 的操作符,看不太懂,又一向没花时间(懒散)去研讨它那些操作符的原理,就不怎样敢用。
  • LiveData: 在很长一段时间里,这个组件简直是我(们咱们)的宝贝,简略,上手快,又能满足大多数场景的需求,感恩。
  • Flow: 撒播它想要替代 LiveData, 网上也有一些从 LiveData 迁移到 Flow 的文章。

个人看来,LiveData 不至于真的被替换掉,由于它有很多优势,比如说运用简略,轻量级,可感知生命周期,可调查,粘性(优缺视运用场景而定)等;所以一般来说,关于 View 和 ViewModel 之间简略的呼应式开发,运用 LiveData 就足够了,毕竟适宜的才是最好的,而关于 UI Layer 之外的杂乱场景,能够考虑运用 Flow 用来处理异步数据流。

之前有剖析过根底的 Flow 作业原理,这篇文章咱们再看看 StateFlow 和 SharedFlow。

Flow的三个角色

Flow 数据流中有三个角色:

  • 出产者(Producer): 出产添加到数据流中的数据
  • 顾客(Consumer): 消费数据流中的数据
  • 中介(Intermediaries 可选): 能够批改发送到数据流的值,或批改数据流自身

参阅官方中的一段视频动画,能够很好地理解这个进程:

Kotlin协程之一文看懂StateFlow和SharedFlow

不得不说,这些新知识点的学习,官方文档或者视频往往更直观易懂,网上很多文章写得也挺好的,能够用来加深理解,但建议官方文档也能够多看看。

冷流与暖流

在前一篇文章 Flow 作业原理 中也有解析过 Flow 为什么是冷流,参阅这个流程图:

Kotlin协程之一文看懂StateFlow和SharedFlow

flow {} 办法(或flowOf, asFlow)创立的 Flow 实例是 SafeFlow 类型,当调用其 collect(FlowCollector) 办法时,首要会履行该 Flow 对象传入的 block 代码块,代码块中一般会有 emit 办法发射值,这个 emit 会调用到 collect 办法传入的 action 代码块。由于它在每次调用 collect 时才去触发发送数据的动作,所以说 Flow 是冷流

即 Flow 是在顾客消费时才会出产数据,且冷流和顾客是一对一的联系,即当有多个顾客消费时,出产者是从头出产发送数据的。

与之对应的是暖流,Flow 有供应相关完成: StateFlow 和 SharedFlow 。

StateFlow 和 SharedFlow 是暖流,出产数据不依赖顾客消费,暖流与顾客是一对多的联系,当有多个顾客时,它们之间的数据都是同一份。

StateFlow

相似 LiveData, StateFlow 也供应了 可读写只读 两个版别:

public interface StateFlow<out T> : SharedFlow<T> {
    // StateFlow 当时的数据
    public val value: T
}
public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
    public override var value: T
    public fun compareAndSet(expect: T, update: T): Boolean
}

StateFlow 与 LiveData 的定位比较相似,官方或许是想用它来替代 LiveData, 其共同点:

  • 都供应了 可读写只读 两个版别。
  • 值仅有,会把最新值重现给订阅者,即粘性。
  • 可被多个订阅者订阅(同享数据流)。
  • 子线程中屡次发送数据或许会丢掉数据。

不同点:

  • StateFlow 有必要传入初始值,保证值的空安全,永久有值。
  • StateFlow 能够防抖,即屡次传入相同值,不会有呼应(Setting a value that is [equal][Any.equals] to the previous one does nothing) 。
  • 当 View 进入 STOPPED 状况时,LiveData.observe() 会中止接纳数据,而从 StateFlow 或其他 Flow collect 数据的操作并不会主动中止。如需完成相同的行为,需求从 Lifecycle.repeatOnLifecycle 块 collect 数据流。

SharedFlow

SharedFlow 也供应了 可读写只读 两个版别:

public interface SharedFlow<out T> : Flow<T> {
    public val replayCache: List<T>
}
public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
    override suspend fun emit(value: T)
    public fun tryEmit(value: T): Boolean
    public val subscriptionCount: StateFlow<Int>
    public fun resetReplayCache()
}

与 StateFlow 的区别:

  • SharedFlow 没有初始值,StateFlow 有必要传初始值。
  • SharedFlow 能够保留历史数据,新订阅者能够获取到之前发射过的一系列数据,StateFlow 只保留最新数据。
  • SharedFlow 能够传入一个 replay 参数,它表明能够对新订阅者从头发送 replay 个历史数据,默许值为 0, 即非粘性。
  • StateFlow 能够看成是一个 replay = 1 且没有缓冲区的 SharedFlow 。
  • SharedFlow 在子线程中屡次 emit() 不会丢掉数据。

State 和 Event

依据 Android developers 上的官方示例,能够看出 StateFlow 和 SharedFlow 分别是用来处理 状况(state)事情(event) 的,它称呼 StateFlow 是 a state-holder observable

val uiState: StateFlow<LatestNewsUiState>
val tickFlow: SharedFlow<Event<String>>
  • 对 UI 而言,state 是 UI 组件的状况,它应当一直都有一个值来表明其状况,且当有新的订阅者加入时,它也应知道当时的状况,即 StateFlow 的粘性。
  • 而 event 事情只要在产生某些动作后才会触发,不需求有初始值。SharedFlow 默许非粘性的,当新的订阅者加入时,它不会重复产生已经产生过的事情。
  • 关于状况而言,即便屡次发送,咱们只重视最新的状况;关于事情而言,如果屡次发送,咱们不想丢掉任何一个前台的事情。

SharedFlow 运用

创立 SharedFlow

看看 SharedFlow 的结构函数:

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>

主要有三个参数:

  • replay: 新订阅者 collect 时,发送 replay 个历史数据给它,默许新订阅者不会获取曾经的数据。
  • extraBufferCapacity: MutableSharedFlow 缓存的数据个数为 replay + extraBufferCapacity; 缓存一方面用于粘性事情的发送,另一方面也为了处理背压问题,既下流的顾客的消费速度低于上游出产者的出产速度时,数据会被放在缓存中。
  • onBufferOverflow: 背压处理策略,缓存区满后怎样处理(挂起或丢掉数据),默许挂起。留意,当没有订阅者时,只要最近 replay 个数的数据会存入缓存区,不会触发 onBufferOverflow 策略。

下面演示一下 MutableSharedFlow 的创立及其参数的效果。

在 ViewModel 中发射数据

class MainViewModel(): CoroutineScope by CoroutineScope(Dispatchers.IO + SupervisorJob()) {
    private val _data1: MutableSharedFlow<Int> = MutableSharedFlow(
        replay = 10
    )
    val data1: SharedFlow<Int> = _data1
    fun refresh() {
        var cur = 1
        launch {
            repeat(200) {
                delay(20)
                _data1.emit(cur)
                println("emit data: ${cur++}")
            }
        }
    }
}

订阅数据

class MainActivity : BaseActivity(), CoroutineScope by MainScope() {
    override fun onCreate(savedInstanceState: Bundle?) {
        job = launch { // 订阅 data1
            lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
                viewModel.data1.collect {
                    delay(100) // 消费速度慢于出产速度
                    println("data: $it")
                }
            }
        }
    }
    fun click(view: View) {
        launch {
            viewModel.refresh() // 触发数据出产
            delay(2000)
            job?.cancel() // 2s 后中止订阅
            delay(2000)
            lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
                viewModel.data1.collect { // 2s 后继续订阅
                    delay(100)
                    println("data again: $it")
                }
            }
        }
    }
}

看看输出

// 首要消费速度慢于出产速度
System.out: emit data: 1
System.out: emit data: 2
System.out: emit data: 3
System.out: emit data: 4
System.out: emit data: 5
System.out: data: 1
System.out: emit data: 6
System.out: emit data: 7
System.out: emit data: 8
System.out: emit data: 9
System.out: emit data: 10
System.out: data: 2
// 接着缓存区满了,消费了一个数据后,缓存区才够存储新的数据
System.out: emit data: 11
System.out: emit data: 12
System.out: emit data: 13
System.out: data: 3
System.out: emit data: 14
System.out: data: 4
System.out: emit data: 15
// ...
// 2s 后撤销了订阅,出产者发送数据就不受 onBufferOverflow 影响了,只会把最新的 10 条数据存入缓存区
System.out: emit data: 29
System.out: data: 19
System.out: emit data: 30
System.out: emit data: 31
System.out: emit data: 32
System.out: emit data: 33
System.out: emit data: 34
System.out: emit data: 35
System.out: emit data: 36
System.out: emit data: 37
// ...
// 接着 2s 后又继续订阅,订阅者消费的第一条数据是 113, 能够看到恰好是出产者出产的前 10 条数据
System.out: emit data: 121
System.out: emit data: 122
System.out: emit data: 123
System.out: data again: 113
System.out: emit data: 124
System.out: data again: 114
System.out: emit data: 125
System.out: data again: 115
System.out: emit data: 126
System.out: data again: 116
System.out: emit data: 127

shareIn 转化

能够运用 shareIn 办法把一般 flow 冷流通化成 SharedFlow 暖流。通过 shareIn 创立的 SharedFlow 会把数据供应 View 订阅,一同也会订阅上游的数据流:

public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T>

有三个参数:

  • scope: 用于同享数据流的 CoroutineScope, 此效果域函数的生命周期应善于任何运用方,使同享数据流在足够长的时间内坚持活泼状况。
  • started: 发动策略
  • replay: 同上 replay 意义

started 有三种取值:

  • Eagerly: 当即发动,到 scope 效果域被完毕时中止
  • Lazily: 当存在首个订阅者时发动,到 scope 效果域被完毕时中止
  • WhileSubscribed: 在没有订阅者的情况下撤销订阅上游数据流,避免不必要的资源糟蹋

关于只履行一次的操作,能够运用 Lazily 或 Eagerly, 不然能够运用 WhileSubscribed 来完成一些优化。

public fun WhileSubscribed(
    stopTimeoutMillis: Long = 0,
    replayExpirationMillis: Long = Long.MAX_VALUE
): SharingStarted

它支撑两个参数:

  • stopTimeoutMillis: 最终一个订阅者完毕订阅后多久中止订阅上游流
  • replayExpirationMillis: 数据 replay 的过期时间,超出时间后,新订阅者不会收到历史数据

简略示例

Repository 层

class MainRepository {
    private val _data: MutableSharedFlow<Int> = MutableSharedFlow()
    val data: SharedFlow<Int> = _data
    suspend fun request() {
        var cur = 1
        repeat(100) {
            delay(100)
            _data.emit(cur) // 模仿出产数据
            println("emit data: ${cur++}")
        }
    }
}

ViewModel 层

class MainViewModel(
    private val repository: MainRepository
) : CoroutineScope by CoroutineScope(Dispatchers.IO + SupervisorJob()) {
    val data by lazy {
        repository.data.shareIn(
            scope = this,
            started = SharingStarted.WhileSubscribed(5000)
        ).onEach {
            println("repository data: $it")
        }.map {
            // 转化数据
            "UI Data-$it"
        }
    }
    private val _state = MutableStateFlow(MainState.INIT)
    val state: StateFlow<MainState> = _state
    fun refresh() {
        launch {
            _state.value = MainState.LOADING
            repository.request()
            _state.value = MainState.INIT
        }
    }
}

View 层

jobData = launch {
    lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
        viewModel.data.collect {
            println("data: $it")
        }
    }
}
jobState = launch {
    lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
        viewModel.state.collect {
            println("state: $it")
        }
    }
}

StateFlow 运用

创立 StateFlow

看看 StateFlow 的结构函数:

public fun <T> MutableStateFlow(value: T): MutableStateFlow<T>

结构函数只需传入一个初始值,它本质上是一个 replay = 1 且没有缓冲区的 SharedFlow, 因此在第一次订阅时会先获取到初始值。

stateIn 转化

和 SharedFlow 相似,也能够用 stateIn 将一般流通化成 StateFlow:

public fun <T> Flow<T>.stateIn(
    scope: CoroutineScope,
    started: SharingStarted,
    initialValue: T
): StateFlow<T>

跟 shareIn 不同的是需求传入一个初始值。

总结

个人看来,LiveData 不至于真的被替换掉,由于它有很多优势,比如说运用简略,轻量级,可感知生命周期,可调查等。简略的组件或许功用不够强壮,强壮的组件用起来一般都比较杂乱,二者不可兼得,但是无论是组件还是架构,适宜的才是最好的,咱们在选择某个组件或者某种架构的时候,需求结合自身项目和需求的特色,选择适宜的即可。

一般来说,关于 View 和 ViewModel 之间简略的呼应式开发,运用 LiveData 就足够了,而关于一些杂乱场景(切换线程,数据流变换等),能够考虑运用 Flow 来处理异步数据流。

我的专栏文章:

  • Android视图体系:Android 视图体系相关的底层原理解析,看完定有收获。
  • Kotlin专栏:Kotlin 学习相关的博客,包含协程, Flow 等。
  • Android架构学习之路:架构不是一蹴而就的,希望咱们有一天的时候,能够从自己写的代码中找到架构的成就感,而不是干几票就跑路。作业太忙,更新比较慢,咱们有爱好的话能够一同学习。
  • Android实战系列:记载实践开发中遇到和解决的一些问题。

文中内容如有错误欢迎指出,共同进步!更新不易,觉得不错的留个再走哈~

本文正在参与「金石方案 . 分割6万现金大奖」