敞开成长之旅!这是我参与「日新方案 12 月更文应战」的第1天,点击检查活动概况
最近马斯克收买了推特之后,马上就裁掉了 50% 的推特职工,这不由让我想起了灭霸的响指… 还有苹果、亚马逊冻住招聘,英特尔、Lyft敞开裁人方案,国内外都不好过啊,咱们都开端勒紧裤腰带了那么,咱们打工人是不是也该刷刷题了(笑Cry.jpg)
Kotlin 学习笔记艰难地来到了第五篇~ 在这一篇首要会说 Flow 的基本常识和实例。因为 Flow 内容较多,所以会分几个末节来解说,这是榜首末节,文章后边会结合一个实例介绍 Flow 在实践开发中的应用。
首先回想一下,在协程中处理某个操作,咱们只能回来单个成果;而 Flow 能够按次序回来多个成果,在官方文档中,Flow 被翻译为 数据流
,这也阐明晰 Flow 适用于多值回来的场景。
Flow 是以协程为基础构建的,所以它可经过异步的方式处理一组数据,所要处理的数据类型必须相同,比如:Flow<Int>
是处理整型数据的数据流。
Flow 一般包含三个部分:
1)提供方:担任生成数据并增加到 Flow 中,得益于协程,Flow 能够异步生成数据;
2)中介(可选):可对 Flow 中的值进行操作、修正;也可修正 Flow 自身的一些属性,如地点线程等;
3)运用方:接收并运用 Flow 中的值。
提供方:出产者,运用方:顾客,典型的出产者顾客形式。
1. Flow 概述
Flow 是一个异步数据流,它能够次序地宣布数据,经过流上的一些中心操作得出成果;若犯错可抛出反常。这些 “流上的中心操作” 包含但不限于 map
、filter
、take
、zip
等等办法。这些中心操作是链式的,能够在后边再次增加其他操作办法,而且也不是挂起函数,它们仅仅构建了一条链式的操作并实时回来成果给后边的操作步骤。
流上的终端操作符要么是挂起函数,例如 collect
、single
、toList
等等,要么是在给定效果域内开端搜集流的 launchIn
操作符。前半句好理解,后半句啥意思?这就得看一下 launchIn
这个终端操作符的效果了。它里边是这样的:
//code 1
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
collect() // tail-call
}
原来 launchIn
办法能够传入一个 CoroutineScope
协程效果域,然后在这个效果域里边调用 collect
办法。lifecycleScope
、MainScope()
这些都是协程效果域,所以 launchIn
办法只不过是 scope.launch { flow.collect() }
的一种简写。
流的履行也被称之为搜集流,而且是以挂起的方式,不是阻塞的。流终究的履行成功与否取决于流上的操作是否悉数履行成功。collect
函数便是最常见的搜集流函数。
1.1 冷流与暖流
冷流(Cold Flow):在数据被运用方订阅后,即调用 collect
办法之后,提供刚才开端履行发送数据流的代码,通常是调用 emit
办法。即不消费,不出产,屡次消费才会屡次出产。运用方和提供方是1对1的关系。
暖流(Hot Flow):不论有无运用方,提供方都能够履行发送数据流的操作,提供方和运用方是一对多的关系。暖流便是不论有无消费,都可出产。
SharedFlow
便是暖流的一种,任何流也能够经过 stateIn
和 shareIn
操作转化为暖流,或者经过 produceIn
操作将流转化为一个热通道也能达到目的。本篇只介绍冷流相关常识,暖流会在后边末节解说~
2. Flow 构建办法
Flow 的结构办法有如下几种:
1、 flowOf()
办法。用于快速创立流,类似于 listOf()
办法,下面是它的源码:
//code 2
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
for (element in elements) {
emit(element)
}
}
所以用法也比较简单:
//code 3
val testFlow = flowOf(65,66,67)
lifecycleScope.launch {
testFlow.collect {
println("输出:$it")
}
}
//打印成果:
//输出:65
//输出:66
//输出:67
注意到 Flow 初始化的时分跟其他目标相同,效果域在哪儿都能够,但 collect
搜集的时分就需求放在协程里了,因为 collect
是个挂起函数。
2、asFlow()
办法。是调集的扩展办法,可将其他数据转换成 Flow,例如 Array
的扩展办法:
//code 4
public fun <T> Array<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
}
不仅 Array
扩展了此办法,各种其他数据类型的数组都扩展了此办法。所以调集能够很便利地结构一个 Flow。
3、flow {}
办法。这个办法能够在其内部次序调用 emit
办法或 emitAll
办法然后结构一个次序履行的 Flow。emit
是发射单个值;emitAll
是发射一个流,这两个办法别离类似于 list.add(item)
、list.addAll(list2)
办法。flow {}
办法的源码如下:
//code 5
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
需求额外注意的是,flow
后边的 lambda 表达式是一个挂起函数,里边不能运用不同的 CoroutineContext
来调用 emit
办法去发射值。因此,在 flow{...}
中不要经过创立新协程或运用 withContext
代码块在另外的 CoroutineContext
中调用 emit
办法,不然会报错。假如确实有这种需求,能够运用 channelFlow
操作符。
//code 6
val testFlow = flow {
emit(23)
// withContext(Dispatchers.Main) { // error
// emit(24)
// }
delay(3000)
emitAll(flowOf(25,26))
}
4、channelFlow {}
办法。这个办法就能够在内部运用不同的 CoroutineContext
来调用 send
办法去发射值,而且这种结构办法确保了线程安全也确保了上下文的一致性,源码如下:
//code 7
public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
ChannelFlowBuilder(block)
一个简单的运用比如:
//code 8
val testFlow1 = channelFlow {
send(20)
withContext(Dispatchers.IO) { //可切换线程
send(22)
}
}
lifecycleScope.launch {
testFlow1.collect {
println("输出 = $it")
}
}
5、MutableStateFlow
和 MutableSharedFlow
办法:都能够定义相应的结构函数去创立一个能够直接更新的暖流。因为篇幅有限,有关暖流的常识后边末节会再阐明。
3. Flow 常用的操作符
Flow 的运用依赖于很多的操作符,这些操作符能够大致地分为 中心操作符 与 结尾操作符 两大类。中心操作符是流上的中心操作,能够针对流上的数据做一些修正,是链式调用。中心操作符与结尾操作符的区别是:中心操作符是用来履行一些操作,不会当即履行,回来值仍是个 Flow;结尾操作符就会触发流的履行,回来值不是 Flow。
一个完好的 Flow 是由 Flow 构建器
、Flow 中心操作符
、Flow 结尾操作符
组成,如下示意图所示:
3.1 collect 结尾操作符
最常见的当然是 collect
操作符。它是个挂起函数,需求在协程效果域中调用;而且它是一个结尾操作符,结尾操作符便是实践启动 Flow 履行的操作符,这一点跟 RxJava 中的 Observable
目标的履行很像。
了解 RxJava 的同学知道,在 RxJava 中,Observable
目标的履行开端时机是在被一个订阅者(subscriber
) 订阅(subscribe
) 的时分,即在 subscribe
办法调用之前,Observable
目标的主体是不会履行的。
Flow 也是相同的工作原理,Flow 在调用 collect
操作符搜集流之前,Flow 构建器和中心操作符都不会履行。举个栗子:
//code 9
val testFlow2 = flow {
println("++++ 开端")
emit(40)
println("++++ 宣布了40")
emit(50)
println("++++ 宣布了50")
}
lifecycleScope.launch {
testFlow2.collect{
println("++++ 搜集 = $it")
}
}
// 输出成果:
//com.example.myapplication I/System.out: ++++ 开端
//com.example.myapplication I/System.out: ++++ 搜集 = 40
//com.example.myapplication I/System.out: ++++ 宣布了40
//com.example.myapplication I/System.out: ++++ 搜集 = 50
//com.example.myapplication I/System.out: ++++ 宣布了50
从输出成果能够看出,每次到 collect
办法调用时,才会去履行 emit
办法,而在此之前,emit
办法是不会被调用的。这种 Flow 便是冷流。
3.2 reduce 结尾操作符
reduce
也是一个结尾操作符,它的效果便是将 Flow 中的数据两两组合连续进行处理,跟 Kotlin 调集中的 reduce
操作符效果相同。举个栗子:
//code 10
private fun reduceOperator() {
val testFlow = listOf("w","i","f","i").asFlow()
CoroutineScope(Dispatchers.Default).launch {
val result = testFlow.reduce { accumulator, value ->
println("+++accumulator = $accumulator value = $value")
"$accumulator$value"
}
println("+++final result = $result")
}
}
//输出成果:
//com.example.myapplication I/System.out: +++accumulator = w value = i
//com.example.myapplication I/System.out: +++accumulator = wi value = f
//com.example.myapplication I/System.out: +++accumulator = wif value = i
//com.example.myapplication I/System.out: +++final result = wifi
看成果就知道,reduce
操作符的处理逻辑了,两个值处理后得到的新值作为下一轮中的输入值之一,这便是两两连续进行处理的意思。
图1 中呈现的 toList
操作符也是一种结尾操作符,能够将 Flow 回来的多个值放进一个 List
中回来,回来的 List
也能够自己设置,比较简单,感兴趣的同学可自行着手实验。
3.3 zip 中心操作符
zip
望文生义,便是能够将两个 Flow 汇组成一个 Flow,举个栗子就知道了:
//code 11
lateinit var testFlow1: Flow<String>
lateinit var testFlow2: Flow<String>
private fun setupTwoFlow() {
testFlow1 = flowOf("Red", "Blue", "Green")
testFlow2 = flowOf("fish", "sky", "tree", "ball")
CoroutineScope(Dispatchers.IO).launch {
testFlow1.zip(testFlow2) { firstWord, secondWord ->
"$firstWord $secondWord"
}.collect {
println("+++ $it +++")
}
}
}
// 输出成果:
//com.example.myapplication I/System.out: +++ Red fish +++
//com.example.myapplication I/System.out: +++ Blue sky +++
//com.example.myapplication I/System.out: +++ Green tree +++
//zip 办法声明:
public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = zipImpl(this, other, transform)
从 zip
办法的声明中可知,zip
办法的第二个参数便是针对两个 Flow 进行各种处理的挂起函数,也可如比如中写成尾调函数的姿态,回来值是处理之后的 Flow。而且当两个 Flow 长度不相同时,终究的成果会默认剔除去先前较长的 Flow 中的元素。所以 testFlow2
中的 “ball” 就被主动剔除去了。
4. Flow 反常处理
正如 RxJava 框架中的 subscribe
办法能够经过传入 Observer
目标在其 onNext
、onComplete
、onError
回来之前处理的成果,Flow 也有比如 catch
、onCompletion
等操作符去处理履行的成果。例如下面的代码:
//code 12
private fun handleExceptionDemo() {
val testFlow = (1..5).asFlow()
CoroutineScope(Dispatchers.Default).launch {
testFlow.map {
check(it != 3) {
//it == 3 时,会走到这儿
println("+++ catch value = $it")
}
println("+++ not catch value = $it")
it * it
}.onCompletion {
println("+++ onCompletion value = $it")
}.catch { exception ->
println("+++ catch exception = $exception")
}.collect{
println("+++ collect value = $it")
}
}
}
//输出成果:
//com.example.myapplication I/System.out: +++ not catch value = 1
//com.example.myapplication I/System.out: +++ collect value = 1
//com.example.myapplication I/System.out: +++ not catch value = 2
//com.example.myapplication I/System.out: +++ collect value = 4
//com.example.myapplication I/System.out: +++ catch value = 3
//com.example.myapplication I/System.out: +++ onCompletion value = java.lang.IllegalStateException: kotlin.Unit
//com.example.myapplication I/System.out: +++ catch exception = java.lang.IllegalStateException: kotlin.Unit
顺着代码咱先来看看一些常用的 Flow 中心操作符。
1)map
:用来将 Flow 中的数据一个个拿出来做各自的处理,然后交给下一个操作符;本例中便是将 Flow 中的数据进行平方处理;
2)check()
:类似于一个检查站,满足括号内条件的数据能够经过,不满足则交给它的尾调函数处理,而且抛出反常;
3)onCompletion
:Flow 终究的兜底器。不论 Flow 终究是履行完成、被取消、抛出反常,都会走到 onCompletion
操作符中,类似于在 Flow 的 collect
函数外加了个 try
,finally
。官方给了个小栗子,仍是很清楚的:
//code 13
try {
myFlow.collect { value ->
println(value)
}
} finally {
println("Done")
}
//上述代码能够替换为下面的代码:
myFlow
.onEach { println(it) }
.onCompletion { println("Done") }
.collect()
所以,在 code 12 中的 onCompletion
操作符能够接住从 check
那儿抛出的反常;
4)catch
:不必多说,专门用于捕捉反常的,避免程序溃散。这儿假如把 catch
去掉,程序就会溃散。假如把 catch
和 onCompletion
操作符方位互换,则 onCompletion
里边就接收不到反常信息了,如图所示。
5. Flow 数据恳求实例
说了这么多,举个在实践中经常用到的数据恳求的比如吧。先来看一个最简单的比如:
5.1 单接口恳求
现在一般都是在 ViewModel 里持有 LiveData 数据,而且进行数据的恳求,所以先来看下 ViewModel 中的代码完成:
//code 14
class SingleNetworkCallViewModel: ViewModel() {
private val users = MutableLiveData<Resource<List<ApiUser>>>()
private val apiHelperImpl = ApiHelperImpl(RetrofitBuilder.apiService)
fun fetchUsers() {
viewModelScope.launch {
users.postValue(Resource.loading(null))
apiHelperImpl.getUsers()
.catch { e ->
users.postValue(Resource.error(e.toString(), null))
}
.collect {
users.postValue(Resource.success(it))
}
}
}
fun getUsersData(): LiveData<Resource<List<ApiUser>>> {
return users
}
}
从代码可看出,fetchUsers
办法便是数据恳求办法,里边的核心办法是 ApiHelperImpl
类目标的 getUsers
办法,在之前初始化 apiHelperImpl
目标时传入了一个 RetrofitBuilder.apiService
值,所以底层仍是用到了 Retrofit 框架进行的网络恳求。Retrofit 相关的代码如下:
//code 15
object RetrofitBuilder {
private const val BASE_URL = "https://xxxxxxx/"
private fun getRetrofit(): Retrofit {
return Retrofit.Builder()
.baseUrl(BASE_URL)
.addConverterFactory(GsonConverterFactory.create())
.build()
}
val apiService: ApiService = getRetrofit().create(ApiService::class.java)
}
//ApiService 中的代码也是一般常见的代码:
interface ApiService {
@GET("users")
suspend fun getUsers(): List<ApiUser>
}
再回过来看看 ViewModel 的代码,从 apiHelperImpl.getUsers
办法后边的 catch
和 collect
操作符也可看出,getUsers
办法回来的便是一个 Flow 目标,其运用的结构办法便是前文中说到的 flow{}
办法:
//code 16
class ApiHelperImpl(private val apiService: ApiService) : ApiHelper {
override fun getUsers(): Flow<List<ApiUser>> {
return flow { emit(apiService.getUsers()) }
}
}
ApiHelper
其实便是一个接口,规定了 ApiHelperImpl
中数据恳求的办法名及回来值,回来值是一个 Flow,里边是咱们终究需求的数据列表:
//code 17
interface ApiHelper {
fun getUsers(): Flow<List<ApiUser>>
}
Flow 调用 emit
宣布去的便是 Retrofit 进行数据恳求后回来的 List<ApiUser>
数据。
如何在 Activity 中运用便是之前运用 LiveData 的惯例操作了:
//code 18
private fun setupObserver() {
viewModel.getUsersData().observe(this, Observer {
when (it.status) {
Status.SUCCESS -> {
progressBar.visibility = View.GONE
it.data?.let { users -> renderList(users) }
recyclerView.visibility = View.VISIBLE
}
Status.LOADING -> {
progressBar.visibility = View.VISIBLE
recyclerView.visibility = View.GONE
}
Status.ERROR -> {
//Handle Error
progressBar.visibility = View.GONE
Toast.makeText(this, it.message, Toast.LENGTH_SHORT).show()
}
}
})
}
5.2 双接口并行恳求
上述比如是最简单的单个数据接口恳求的场景,假如是两个或是多个数据接口需求并行恳求,该如何处理呢?这就需求用到之前说的 Flow 中的 zip
操作符了。接着上面的比如,再增加一个数据恳求办法 getMoreUsers
,那么两个接口并行的比如为:
//code 18
fun fetchUsers() {
viewModelScope.launch {
users.postValue(Resource.loading(null))
apiHelper.getUsers()
.zip(apiHelper.getMoreUsers()) { usersFromApi, moreUsersFromApi ->
val allUsersFromApi = mutableListOf<ApiUser>()
allUsersFromApi.addAll(usersFromApi)
allUsersFromApi.addAll(moreUsersFromApi)
return@zip allUsersFromApi
}
.flowOn(Dispatchers.Default)
.catch { e ->
users.postValue(Resource.error(e.toString(), null))
}
.collect {
users.postValue(Resource.success(it))
}
}
}
两个数据接口恳求的快慢必定不相同,但不必担心,zip
操作符会等候两个接口的数据都回来之后才进行拼接并交给后边的操作符处理,所以这儿还需求调用 flowOn
操作符将线程切换到后台线程中去挂起等候。但后边的 collect
操作符履行的代码是在主线程中,感兴趣的同学能够打印线程信息看看,这就需求了解一下 flowOn
操作符的用法了。
flowOn
办法能够切换 Flow 处理数据的地点线程,类似于 RxJava 中的 subscribeOn
办法。例如 flowOn(Dispatchers.Default)
便是将 Flow 的操作都放到后台线程中履行。
当 flowOn
操作符之前没有设置任何的协程上下文,那么 flowOn
操作符能够为它之前的操作符设置履行地点的线程,并不会影响它之后下游的履行地点线程。下面是一个简单比如:
//code 19
private fun flowOnDemo() {
val testFlow = (1..2).asFlow()
MainScope().launch {
testFlow
.filter {
println("1+++ $it ${Thread.currentThread().name}")
it != 3
}.flowOn(Dispatchers.IO)
.map {
println("2+++ $it ${Thread.currentThread().name}")
it*it
}.flowOn(Dispatchers.Main)
.filter {
println("3+++ $it ${Thread.currentThread().name}")
it!=25
}.flowOn(Dispatchers.IO)
.collect{
println("4+++ $it ${Thread.currentThread().name}")
}
}
}
//输出成果:
//com.example.myapplication I/System.out: 1+++ 1 DefaultDispatcher-worker-1
//com.example.myapplication I/System.out: 1+++ 2 DefaultDispatcher-worker-1
//com.example.myapplication I/System.out: 2+++ 1 main
//com.example.myapplication I/System.out: 2+++ 2 main
//com.example.myapplication I/System.out: 3+++ 1 DefaultDispatcher-worker-1
//com.example.myapplication I/System.out: 3+++ 4 DefaultDispatcher-worker-1
//com.example.myapplication I/System.out: 4+++ 1 main
//com.example.myapplication I/System.out: 4+++ 4 main
发现了么?flowOn
操作符只对最近的上游操作符线程担任,它下游的线程会主动切换到之前地点的线程。假如连续有两个或多个 flowOn
操作符切换线程,则会切换到首个 flowOn
操作符切换的线程中去:
//code 20
testFlow
.filter {
println("1+++ $it ${Thread.currentThread().name}")
it != 3 //终究会在 Main 主线程中履行
}.flowOn(Dispatchers.Main).flowOn(Dispatchers.IO).flowOn(Dispatchers.Default)
.collect{
println("4+++ $it ${Thread.currentThread().name}")
}
在 filter
后边连续有两个 flowOn
操作符,但终究会在 Main 线程中履行 filter
操作符中的逻辑。
全体上看,Flow 在数据恳求时所扮演的角色是数据接收与处理后发送给 UI 层的效果,这跟 RxJava 的责任是相同的,而且两者都有丰厚的操作符来处理各种不同的情况。不同的是 Flow 是将接收到的数据放到 Flow 载体中,而 RxJava 一般将数据放到 Observable
目标中;Flow 处理数据更加便利和天然,去除了 RxJava 中繁复且功能臃肿的操作符。
总结
终究总结一下 Flow 榜首末节的内容吧:
1)Flow 数据流可异步按次序回来多个数据;
2)Flow 全体是由 构建器、中心操作符、结尾操作符 组成;
3)冷流只有在调用结尾操作符时,流的结构器和中心操作符才会开端履行;冷流的运用方和提供方是1对1的;
4)简单介绍了 collect
、reduce
结尾操作符以及 zip
、map
等中心操作符的运用;
5)Flow 反常处理所用到的 catch
、check
、onCompletion
等操作符的用法;
6)Flow 在数据恳求上的实例
所用实例来源:github.com/MindorksOpe…
更多内容,欢迎关注公众号:修之竹
赞人玫瑰,手留余香!欢迎点赞、转发~ 转发请注明出处~
参考文献
- Android 上的 Kotlin 数据流;官方文档 https://developer.android.com/kotlin/flow
- Flow Kotlin 官方文档; https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/
- 【Kotlin Flow】 一眼看全——Flow操作符大全; 搬砖小子呈现了 https:///post/6989536876096913439
- What is Flow in Kotlin and how to use it in Android Project?; Himanshu Singh; https://blog.mindorks.com/what-is-flow-in-kotlin-and-how-to-use-it-in-android-project
- Understanding Terminal Operators in Kotlin Flow; Amit Shekhar; https://blog.mindorks.com/terminal-operators-in-kotlin-flow
- Creating Flow Using Flow Builder in Kotlin; Amit Shekhar; https://blog.mindorks.com/creating-flow-using-flow-builder-in-kotlin
- Exception Handling in Kotlin Flow; Amit Shekhar; https://blog.mindorks.com/exception-handling-in-kotlin-flow