呼应式编程
由于 Kotlin Flow 是根据 呼应式编程 的完结,所以先了解一下 呼应式编程 的概念。
首要看下百度百科解说:
呼应式编程是一种面向数据流和改变传达的编程范式。这意味着能够在编程语言中很便利地表达静态或动态的数据流,而相关的计算模型会自动将改变的值经过数据流进行传达。
这个释义很抽象,难以了解。只知道它的中心是:数据流。
怎么了解这个数据流,先看下呼应式编程 ReactiveX 下的一个结构 RxJava 。
RxJava 是根据呼应式编程的完结,它的界说:
RxJava 是 Reactive Extensions 的 Java VM 完结:一个经过运用可观察序列来组合异步和根据事情的程序的库。
它扩展了观察者模式以支撑数据/事情序列,并添加了运算符,答应您以声明办法组合序列,一起消除了对低级线程、同步、线程安全和并发数据结构等问题的忧虑。
看完这个界说,脑袋中也很模糊。下面从 RxJava 应用的一个简略比如来剖析:
Observable.just(bitmap).map { bmp->
//在子线程中履行耗时操作,存储 bitmap 到本地
saveBitmap(bmp)
}.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe { bitmapLocalPath ->
//在主线程中处理存储 bitmap 后的本地途径地址
refreshImageView(bitmapLocalPath)
}
上面比如中: 将一个 bitmap 存储到本地并回来本地途径,从源数据 bitmap → 存储 btimap 到本地操作 → 获取本地图片途径值刷新UI。其实,就能够把这整个过程中按时刻产生的事情序列了解为数据流。
数据流包括供给方(出产者),中介(中心操作),运用方(顾客):
- 供给方(出产者):源数据,将数据添加到数据流中;
- 中介(中心操作):能够修正发送到数据流的值,或修正数据流本身;
- 运用方(顾客):成果数据,运用数据流中的值。
那么,上面比如中的数据流是:
- 供给方(出产者):源数据 bitmap;
- 中介(中心操作):map 操作,存储 btimap 到本地;
- 运用方(顾客):本地图片途径。
再看下 RxJava 中的数据流解说:
RxJava 中的数据流由源、零个或多个中心过程组成,然后是数据顾客或组合器过程(其间该过程担任经过某种办法消费数据流):
source.operator1().operator2().operator3().subscribe(consumer);
source.flatMap(value -> source.operator1().operator2().operator3());
在这里,假如咱们想象自己在操作符 operator2 上,向左看 source 被称为上游。向右看 subscriber/consumer 称为下流。当每个元素都写在单独的行上时,这一点一般更为显着:
source
.operator1()
.operator2()
.operator3()
.subscribe(consumer)
这也是 RxJava 的上游、下流概念。
其实,Flow 数据流中参看 RxJava,也能够有这样相似的上游和下流概念:
flow
.operator1()
.operator2()
.operator3()
.collect(consumer)
了解了 呼应式编程 的中心 数据流 后,对 呼应式编程 有了开始印象。但是 呼应式编程 的完结远不止如此,它还涉及观察者模式,线程调度等。不论原理这些,用它来做开发有什么长处呢?其实,它首要长处是:
- 对于并发编程,线程切换,没有 callback hell,简化了异步履行的代码;
- 代码高雅,简练,易阅览和保护。
下面看两个业务比如:
Observable.just(bitmap).map { bmp ->
//在子线程中履行耗时操作,存储 bitmap 到本地
saveBitmap(bmp)
}.map { path ->
//在子线程中履行耗时操作,上传图片到服务端
uploadBitmap(path)
}.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
.subscribe { downloadUrl ->
//在主线程中处理获取图片下载地址
}
//从服务端批量下载文件
Observable.from(downloadUrls).flatMap { downloadUrl ->
//下载单个文件,回来本地文件
Observable.just(downloadUrl).map {url-> downloadResource(url) }
}.map { file ->
//对文件解压
unzipFile(file)
}.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
.subscribe { folderPath ->
//拿到文件夹途径
}
所以 呼应式编程 的完结,首要是帮咱们处理了 并发编程问题,能用高雅简练的代码做异步事情处理。
Kotlin 协程 和 Flow,它们结合在一起也完结了 呼应式编程。在 Kotlin 环境中,再结合 Android 供给 Lifecycle, ViewModel, Flow 的扩展,能让咱们在 Android 中做并发编程,异步事情办理如鱼得水。
Kotlin Flow
Kotlin Flow 便是 Kotlin 数据流,它根据 Kotlin 协程构建。上一篇 Kotlin 协程探究 剖析了 协程 的大致原理,知道协程便是 Kotlin 供给的一套线程 API 结构,便利做并发编程。那么 Kotlin 协程 和 Flow (数据流)的结合,和 RxJava 结构就有异曲同工之妙。
下面运用 Kotlin 协程 和 Flow 来完结上面 RxJava 的两个业务比如:
GlobalScope.launch(Dispatchers.Main) {
flowOf(bitmap).map { bmp ->
//在子线程中履行耗时操作,存储 bitmap 到本地
Log.d("TestFlow", "saveBitmap: ${Thread.currentThread()}")
saveBitmap(bmp)
}.flowOn(Dispatchers.IO).collect { bitmapLocalPath ->
//在主线程中处理存储 bitmap 后的本地途径地址
Log.d("TestFlow", "bitmapLocalPath=$bitmapLocalPath: ${Thread.currentThread()}")
}
}
//从服务端批量下载文件
GlobalScope.launch(Dispatchers.Main) {
downloadUrls.asFlow().flatMapConcat { downloadUrl ->
//下载单个文件,回来本地文件
flowOf(downloadUrl).map { url ->
Log.d("TestFlow", "downloadResource:url=$url: ${Thread.currentThread()}")
downloadResource(url)
}
}.map { file ->
//对文件解压
Log.d("TestFlow", "unzipFile:file=${file.path}: ${Thread.currentThread()}")
unzipFile(file)
}.flowOn(Dispatchers.IO).collect { folderPath ->
//拿到文件夹途径
Log.d("TestFlow", "folderPath=$folderPath: ${Thread.currentThread()}")
}
}
操控台成果输出:
TestFlow: saveBitmap: Thread[DefaultDispatcher-worker-1,5,main]
TestFlow: bitmapLocalPath=/mnt/sdcard/Android/data/com.wangjiang.example/files/images/flow.png: Thread[main,5,main]
TestFlow: downloadResource:url=https://www.wangjiang.example/coroutine.zip: Thread[DefaultDispatcher-worker-1,5,main]
TestFlow: unzipFile:file=/mnt/sdcard/Android/data/com.wangjiang.example/files/zips/coroutine.zip: Thread[DefaultDispatcher-worker-1,5,main]
TestFlow: downloadResource:url=https://www.wangjiang.example/flow.zip: Thread[DefaultDispatcher-worker-1,5,main]
TestFlow: unzipFile:file=/mnt/sdcard/Android/data/com.wangjiang.example/files/zips/flow.zip: Thread[DefaultDispatcher-worker-1,5,main]
TestFlow: folderPath=/mnt/sdcard/Android/data/com.wangjiang.example/files/zips/coroutine: Thread[main,5,main]
TestFlow: folderPath=/mnt/sdcard/Android/data/com.wangjiang.example/files/zips/flow: Thread[main,5,main]
能够看到,和 RxJava 完结的效果是共同的。首要,运用launch
发动一个协程,然后运用源数据创立一个 Flow
(数据出产),再经过 flatMapConcat
, map
改换(多个中心操作),最终经过collect
获取成果数据(数据消费),这其间还包括线程切换:在主线程中发动子线程履行耗时使命,并将耗时使命成果回来给主线程(flowOn 指定了中心操作在 IO 线程中履行)。所以 协程 和 Flow(数据流) 结合,便是 呼应式编程 的完结,这对咱们来说,运用它能够在 Kotlin 环境中写出高雅的异步代码来做并发编程。
下面再分别来了解一下 协程 和 Flow。
协程概念
首要来看一下协程中的一些概念和 API。
CoroutineScope: 界说协程的 scope。
CoroutineScope 会盯梢它运用 launch 或 async 创立的一切协程。您能够随时调用 scope.cancel() 以撤销正在进行的作业(即正在运转的协程)。在 Android 中,某些 KTX 库为某些生命周期类供给自己的 CoroutineScope。例如,ViewModel 有 viewModelScope,Lifecycle 有 lifecycleScope。不过,与调度程序不同,CoroutineScope 不运转协程。
Kotlin 供给了为 UI 组件运用的 MainScope
:
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
为应用程序整个生命周期运用的 GlobalScope
:
public object GlobalScope : CoroutineScope {
/**
* Returns [EmptyCoroutineContext].
*/
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}
由于是应用程序整个生命周期,所以要稳重运用。
也能够自界说 Scope:
val scope = CoroutineScope(Job() + Dispatchers.Main)
另外,Android KTX 库针对 CoroutineScope
做了扩展,所以在 Android 中一般会运用 Activity 或 Fragment 生命周期相关的 lifecycleScope
,和 ViewModel 生命周期相关的viewModelScope
。
public val Lifecycle.coroutineScope: LifecycleCoroutineScope
get() {
while (true) {
val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl?
if (existing != null) {
return existing
}
val newScope = LifecycleCoroutineScopeImpl(
this,
SupervisorJob() + Dispatchers.Main.immediate
)
if (mInternalScopeRef.compareAndSet(null, newScope)) {
newScope.register()
return newScope
}
}
}
public val ViewModel.viewModelScope: CoroutineScope
get() {
val scope: CoroutineScope? = this.getTag(JOB_KEY)
if (scope != null) {
return scope
}
return setTagIfAbsent(
JOB_KEY,
CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main.immediate)
)
}
internal class CloseableCoroutineScope(context: CoroutineContext) : Closeable, CoroutineScope {
override val coroutineContext: CoroutineContext = context
override fun close() {
coroutineContext.cancel()
}
}
发动协程: launch 和 async
发动协程有两种办法:
-
launch
:发动一个新的协程,并回来一个Job
,这个Job
是能够撤销的Job.cancel
; -
async
:也会发动一个新的协程,并回来一个Deferred
接口完结,这个接口其实也承继了Job
接口,能够运用await
挂起函数等待回来成果。
CoroutineContext: 协程上下文
val scope = CoroutineScope(Job() + Dispatchers.Main)
在 CoroutineScope 中界说了 plus 操作:
public operator fun CoroutineScope.plus(context: CoroutineContext): CoroutineScope =
ContextScope(coroutineContext + context)
由于 Job
和 Dispatchers
顶层都承继了接口 Element
,而 Element
又承继了接口 CoroutineContext
:
public interface Element : CoroutineContext
所以 Job() 和 Dispatchers.Main 能够相加。这里 CoroutineScope 的结构办法中是必须要有 Job()
,假如没有,它自己也会创立一个 Job()
:
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
ContextScope(if (context[Job] != null) context else context + Job())
Job 和 CoroutineDispatcher 在 CoroutineContext
中的作用是:
Job:操控协程的生命周期。
CoroutineDispatcher:将作业分派到适当的线程。
CoroutineDispatcher:协程调度器与线程
- Dispatchers.Default:默许调度器,指示此协程应在为 cpu 计算操作预留的线程上履行;
- Dispatchers.Main:指示此协程应在为 UI 操作预留的主线程上履行;
- Dispatchers.IO:指示此协程应在为 I/O 操作预留的线程上履行。
GlobalScope.launch(Dispatchers.Main) {
}
withContext(Dispatchers.IO){
}
.flowOn(Dispatchers.IO)
小结
要运用协程,首要创立一个 scope: CoroutineScope
来担任办理协程,界说scope
时需求指定操控协程的生命周期的 Job
和将作业分派到适当线程的CoroutineDispatcher
。界说好 scope 后, 可经过 scope.launch
发动一个协程,也能够多次运用scope.launch
发动多个协程,发动的协程可经过 scope.cancel
撤销,但它撤销的是 scope 发动的一切协程。假如要撤销单个协程,需求运用scope.launch
回来的 Job
来撤销Job.cancel
,这个 Job 操控着单个协程的生命周期。当发动协程后,主线程中的使命依然能够继续履行,在履行launch{}
时,能够经过 withContext(Dispatchers.IO)
将协程的履行操作移至一个 I/O 子线程,子线程履行完使命,再将成果回来主线程继续履行。
简略示例:
//主线程分派使命
private val scope = CoroutineScope(Job() + Dispatchers.Main)
//办理对应的协程的生命周期
private var job1: Job? = null
fun exec() {
//发动一个协程
job1 = scope.launch {
//子线程履行耗时使命
withContext(Dispatchers.IO){
}
}
//发动一个协程
val job2 = scope.launch {
//发动一个协程
val taskResult1 = async {
//子线程履行耗时使命
withContext(Dispatchers.IO){
}
}
val taskResult2 = async {
//子线程履行耗时使命
withContext(Dispatchers.IO){
}
}
//taskResult1 和 taskResult2 都回来成果才会继续履行
taskResult1.await() + taskResult2.await()
}
}
fun cancelJob() {
//撤销 job1 对应的协程
job1?.cancel("cancel job1")
}
fun cancelScope() {
//撤销 scope 对应的一切协程
scope.cancel("cancel scope")
}
在上面的比如中:
-
scope
:界说主线程分派使命的 scope 来盯梢它运用 launch 或 async 创立的一切协程; -
job1
:办理它对应的协程的生命周期; -
withContext(Dispatchers.IO)
:切换到子线程履行耗时使命; -
cancelJob
会撤销 job1 对应的协程; -
cancelScope
会撤销 scope 发动的一切协程。
Flow 数据流
了解了 Kotlin 协程的一些根底 概念和 API 后,知道了协程的根本运用。接下来,再了解一下 Kotlin Flow 相关的概念和 API。
Kotlin 中的 Flow API 旨在异步处理按次序履行的数据流。Flow 本质上是一个 Sequence。咱们能够像对 Kotlin 中 Sequence 相同来操作Flow:改换,过滤,映射等。Kotlin Sequences 和 Flow 的首要区别在于 Flow 能够挂起。
假如有了解 Kotlin Sequence,那其实很好了解 Kotlin Flow。刚好,在前面一篇 Kotlin 慵懒调集操作-序列 Sequence文章中,有剖析 Sequence 的原理,这里也能够把 Flow 按照相似的原理进行了解。
val sequenceResult = intArrayOf(1, 2, 3).asSequence().map { it * it }.toList()
MainScope().launch{
val flowResult = intArrayOf(1, 2, 3).asFlow().map { it * it }.toList(mutableListOf())
}
上面 sequenceResult 和 flowResult 的值都是:[1, 4, 9]
。
在 Sequence 中,假如没有结尾操作,中心操作不会被履行。在 Flow 中也是相同,假如数据流没有数据消费collect
,中心操作也不会被履行。
flowOf(bitmap).map { bmp ->
//在子线程中履行耗时操作,存储 bitmap 到本地
saveBitmap(bmp)
}.flowOn(Dispatchers.Default)
上面代码中,map
操作不会被履行。
一个完好的数据流应该包括:数据出产( flowOf
, asFlow
, flow{}
)→ 中心操作(map
, filter
等)→ 数据消费(collect
,asList
,asSet
等)。下面将分别了解相关操作。
数据流:数据出产
数据出产首要是经过数据源构建数据流。能够运用 Builders.kt
中供给的 Flow 相关扩展办法,如:
intArrayOf(1, 2, 3).asFlow().map { it * it }
val downloadUrl = "https://github.com/ReactiveX/RxJava"
flowOf(downloadUrl).map { downloadZip(it) }
(1..10).asFlow().filter { it % 2 == 0 }
一般运用 flowOf
和 asFlow
办法直接构建数据流。它们创立的都是冷流:
冷流:这段 flow 构建器中的代码直到流被搜集(collect)的时分才运转。
也能够经过 flow{}
来构建数据流,运用emit
办法将数据源添加到数据流中:
flow<Int> {
emit(1)
withContext(Dispatchers.IO){
emit(2)
}
emit(3)
}.map { it * it }
不论是 flowOf
,asFlow
仍是 flow{}
,它们都会完结接口 FlowCollector
:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
return object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
collector.block()
}
}
}
接口 FlowCollector
供给的 emit
办法,担任将源数据添加到数据流中:
public fun interface FlowCollector<in T> {
/**
* Collects the value emitted by the upstream.
* This method is not thread-safe and should not be invoked concurrently.
*/
public suspend fun emit(value: T)
}
总结:构建数据流能够运用 Flow 相关扩展办法: flowOf
, asFlow
, flow{}
,它们都是经过接口 FlowCollector
供给的 emit
办法,将源数据添加到数据流中。
数据流:中心操作
中心操作首要修正发送到数据流的值,或修正数据流本身。如 filter
, map
, flatMapConcat
操作等:
intArrayOf(1, 2, 3).asFlow().map { it * it }.collect{ }
(1..100).asFlow().filter { it % 2 == 0 }.collect{ }
val data = hashMapOf<String, List<String>>(
"Java" to arrayListOf<String>("xiaowang", "xiaoli"),
"Kotlin" to arrayListOf<String>("xiaozhang", "xiaozhao")
)
flow<Map<String, List<String>>> {
emit(data)
}.flatMapConcat {
it.values.asFlow()
}.collect{ }
中心操作符有许多,根据运用场景大概可分为:
- 转化操作符:简略转化能够运用过滤
filter
,映射map
操作,复杂转化能够运用改换transform
操作; - 限长过渡操作符:在流触及相应限制的时分会将它的履行撤销,能够运用获取
take
操作,take(2)
表明只获取前两个值; - 丢掉操作符:丢掉流中成果值,能够运用丢掉
drop
操作,drop(2)
表明丢掉前两个值; - 展平操作符:将给定的流展平为单个流,
flatMapConcat
与flattenConcat
操作表明次序搜集传入的流操作,flatMapMerge
与flattenMerge
操作表明并发搜集一切传入的流,并将它们的值兼并到一个单独的流,以便尽快的发射值操作,flatMapLatest
操作表明以展平的办法搜集最新的流操作; - 组合操作符:将多个流组合,
zip
操作表明组合两个流的值,两个流都有值才进行组合操作,combine
操作表明组合两个流最新的值,每次组合的时分都是运用每个流最新的值; - 缓冲操作符:当数据出产比数据消费快的时分,能够运用缓冲
buffer
操作,在数据消费的时分能够缩短时刻; - 兼并操作符:兼并发射项,不对每个值进行处理,能够运用兼并
conflate
操作,越过中心值; - flowOn 操作符:更改流发射的上下文,会将
flowOn
操作前的操作切换到 flowOn 指定的上下文Dispatchers.Default
,Dispatchers.IO
,Dispatchers.Main
,也便是指定前面的操作所履行的线程;
上面介绍了首要的操作符的大致运用场景,操作符详细解说能够检查官方文档:异步流。
中心操作符代码示例:
(1..3).asFlow().take(2).collect{
//搜集到成果值 1,2
}
(1..3).asFlow().drop(2).collect{
//搜集到成果值 3
}
private fun downloadVideo(videoUrl: String): Pair<String, String> {
return Pair(videoUrl, "videoFile")
}
private fun downloadAudio(audioUrl: String): Pair<String, String> {
return Pair(audioUrl, "audioFile")
}
private fun downloadImage(imageUrl: String): Pair<String, String> {
return Pair(imageUrl, "imageFile")
}
MainScope().launch {
val imageDownloadUrls = arrayListOf<String>("image1", "image2")
val audioDownloadUrls = arrayListOf<String>("audio1", "audio2", "audio3")
val videoDownloadUrls = arrayListOf<String>("video1", "video2", "video3", "video4")
val imageFlows = imageDownloadUrls.asFlow().map {
downloadImage(it)
}
val audioFlows = audioDownloadUrls.asFlow().map {
downloadAudio(it)
}
val videoFlows = videoDownloadUrls.asFlow().map {
downloadVideo(it)
}
merge(imageFlows, audioFlows, videoFlows).flowOn(Dispatchers.IO).onEach {
Log.d("TestFlow", "result=$it")
}.collect()
}
操控台输出成果:
TestFlow: result=(image1, imageFile)
TestFlow: result=(image2, imageFile)
TestFlow: result=(audio1, audioFile)
TestFlow: result=(audio2, audioFile)
TestFlow: result=(audio3, audioFile)
TestFlow: result=(video1, videoFile)
TestFlow: result=(video2, videoFile)
TestFlow: result=(video3, videoFile)
TestFlow: result=(video4, videoFile)
merge 操作符将多个流兼并到一个流,支撑并发。相似 RxJava 的 zip 操作
(1..3).asFlow().onStart {
Log.d("TestFlow", "onStart:${Thread.currentThread()}")
}.flowOn(Dispatchers.Main).map {
Log.d("TestFlow", "map:$it,${Thread.currentThread()}")
if (it % 2 == 0)
throw IllegalArgumentException("fatal args:$it")
it * it
}.catch {
Log.d("TestFlow", "catch:${Thread.currentThread()}")
emit(-1)
}.flowOn(Dispatchers.IO)
.onCompletion { Log.d("TestFlow", "onCompletion:${Thread.currentThread()}") }
.onEach {
Log.d("TestFlow", "onEach:$it,${Thread.currentThread()}")
}.collect()
操控台输出成果:
TestFlow: onStart:Thread[main,5,main]
TestFlow: map:1,Thread[DefaultDispatcher-worker-3,5,main]
TestFlow: map:2,Thread[DefaultDispatcher-worker-3,5,main]
TestFlow: catch:Thread[DefaultDispatcher-worker-3,5,main]
TestFlow: onEach:1,Thread[main,5,main]
TestFlow: onEach:-1,Thread[main,5,main]
TestFlow: onCompletion:Thread[main,5,main]
flowOn 指定 onStart 在主线程中履行(Dispatchers.Main),指定 map 和 catch 在 IO 线程中履行(Dispatchers.IO)
总结:中心操作其实便是数据流的改换操作,与 Sequence 和 RxJava 的改换操作相似。
数据流:数据消费
数据消费便是运用数据流的成果值。结尾操作符最常运用 collect
来搜集流成果值:
(1..3).asFlow().collect{
//搜集到成果值 1,2,3
}
除了 collect
操作符外,还有一些操作符能够获取数据流成果值:
-
collectLatest
:运用数据流的最新值; -
toList
或toSet
等:将数据流成果值转化为调集; -
first
:获取数据流的第一个成果值; -
single
:保证流发射单个(single)值; -
reduce
:累积数据流中的值; -
fold
:给定一个初始值,再累积数据流中的值。
结尾操作符代码示例:
(1..3).asFlow().collectLatest {
delay(300)
//只能获取到3
}
//转化为 List 调集 [1,2,3]
val list = (1..3).asFlow().toList()
//转化为 Set 调集 [1,2,3]
val set = (1..3).asFlow().toSet()
val first = (1..3).asFlow().first()
//first 为第一个成果值 1
val single = (1..3).asFlow().single()
//流不是发射的单个值,会抛反常
val reduce = (1..3).asFlow().reduce { a, b ->
a + b
}
//reduce 的值为6=1+2+3
val fold = (1..3).asFlow().fold(10) { a, b ->
a + b
}
//fold 的值为16=10+1+2+3
除了上面这些结尾操作符,在结尾之前还相关着一些操作符:
-
onStart
:在数据流成果值搜集之前调用; -
onCompletion
:在数据流成果值搜集之后调用; -
onEmpty
:在数据流完结而不发出任何元素时调用; -
onEach
:在数据流成果值搜集时迭代流的每个值; -
catch
:在搜集数据流成果时,声明式捕获反常。
结尾相关操作符代码示例:
(1..3).asFlow().onStart {
Log.d("TestFlow", "onStart")
}.map {
if (it % 2 == 0)
throw IllegalArgumentException("fatal args:$it")
it * it
}.catch { emit(-1) }.onCompletion { Log.d("TestFlow", "onCompletion") }.onEach {
Log.d("TestFlow", "onEach:$it")
}.collect()
操控台输出成果:
TestFlow: onStart
TestFlow: onEach:1
TestFlow: onEach:-1
TestFlow: onCompletion
总结:数据流进行数据消费时,能够结合结尾操作符输出调集,累积值等,当要监听数据流搜集成果值开始或结束,能够运用 onStart
和 onCompletion
,当遇到流抛出反常,能够声明 catch
进行反常处理。
总结
呼应式编程,能够了解为一种面向数据流编程的办法,也便是运用数据源构建数据流 → 修正数据流中的值 → 处理数据流成果值,在这个过程中,一系列的事情或操作都是按次序产生的。在 Java 环境中,RxJava 结构完结了呼应式编程,它结合了数据流、观察者模式、线程结构;在 Kotlin 环境中,Kotlin 协程和 Flow 结合在一起完结了呼应式编程,其间协程便是线程结构,Flow 便是数据流。不论是 RxJava 仍是 Kotlin 协程和 Flow 的完结的呼应式编程,它们的意图都是为了:运用高雅,简练,易阅览,易保护的代码来编写并发编程,处理异步操作事情。另外,Android LifeCycle 和 ViewModel 对 Kotlin 协程和 Flow 进行了扩展支撑,这也对异步事情进行生命周期办理更便利。
参考文档:
- GitHub RxJava:RxJava
- Kotlin Flow 操作符:异步流
- Google Android developer Kotlin Flow :Android 上的 Kotlin 数据流
下一篇将探究 Kotlin Flow 冷流和热流。