前言

前边一系列的协程文章铺垫了很久,终于要剖析Flow了。如果说协程是Kotlin的精华,那么Flow便是协程的精髓。
经过本篇文章,你将了解到:

  1. 什么是流?
  2. 为什么引入Flow?
  3. Fow常见的操作
  4. 为什么说Flow是冷流?

1. 什么是流

Kotlin Flow啊,你将流向何方?

自然界的流水,从高到低,从上游到下贱活动。

而关于计算机国际的流:

数据的传递进程构成了数据流,简称流

比如想要查找1~1000内的偶数,能够这么写:

    var i = 0
    var list = mutableListOf<Int>()
    while (i < 1000) {
        if (i % 2 == 0)
            list.add(i)
        i++
    }

此处对数据的处理即为找出其间的偶数。
若想要在偶数中找到>500的数,则继续筛选:

    var i = 0
    var list = mutableListOf<Int>()
    while (i < 1000) {
        if (i > 500 && i % 2 == 0)
            list.add(i)
        i++
    }

能够看出,原始数据是1~1000,咱们对它进行了一些操作:过滤偶数、过滤>500的数。当然还能够进行其它操作,如映射、改换等。
提取上述进程三要素:

  1. 原始数据
  2. 对数据的一系列操作
  3. 终究的数据

把这一系列的进程作为流:

Kotlin Flow啊,你将流向何方?

从流的方历来调查,咱们称原始数据为上流,对数据进行一系列处理后,终究的数据为下贱。
从流的特点来调查,咱们认为出产者在上流出产数据,顾客在下贱消费数据。

2. 为什么引入Flow?

由前面的文章咱们知道,Java8供给了StreamAPI,专用来操作流,而Kotlin也供给了Sequence来处理流。
那为什么还要引入Flow呢?
在Kotlin的国际里当然不会想再依赖Java的StreamAPI了,主要来对比Kotlin里的各种方案挑选。
先看使用场景的演变。

a、调集获取多个值
想要获取多个值,很显而易见的想到了调集。

    fun testList() {
        //结构调集
        fun list(): List<Int> = listOf(1, 2, 3)
        list().forEach {
            //获取多个值
            println("value = $it")
        }
    }

以上函数功用触及两个目标:出产者和顾客。
出产者:担任将1、2、3结构为调集。
顾客:担任从调集里将1、2、3取出。
若此刻想要操控出产者的速度,比如先将1放到调集里,过1秒后再讲2放进调集,在此种场景下该函数显得不那么灵敏了。

b、Sequence操控生成速度
Sequence能够出产数据,先看看它是怎样操控出产速度的。

    fun testSequence() {
        fun sequence():Sequence<Int> = sequence {
            for (i in 1..3) {
                Thread.sleep(1000)
                yield(i)
            }
        }
        sequence().forEach {
            println("value = $it")
        }
    }

经过堵塞线程操控了出产者的速度。
你可能会说:在协程体里为啥要用Thread.sleep()堵塞线程呢,用delay()不香吗?
看起来很香,咱们来看看实际效果:

Kotlin Flow啊,你将流向何方?

直接报编译错误了,提示是:受约束的挂起函数只能调用自己协程效果域内的成员和其它挂起函数。
而sequence的效果域是SequenceScope,检查其界说发现:

Kotlin Flow啊,你将流向何方?

究其原因,SequenceScope 被RestrictsSuspension 修饰约束了。

c、调集合作协程使用
sequence 由于协程效果域的约束,不能异步出产数据,而使用调集却没此约束。

    suspend fun testListDelay() {
        suspend fun list():List<Int> {
            delay(1000)
            return listOf(1, 2, 3)
        }
        list().forEach {
            println("value = $it")
        }
    }

但也暴露了一个缺点,只能一次性的回来调集元素。

综上所述:

不管是调集仍是Sequence,都不能完全掩盖流的需求,此刻Flow闪亮上台了

3. Fow常见的操作

最简略的Flow使用

    suspend fun testFlow1() {
        //出产者
        var flow = flow {
            //发射数据
            emit(5)
        }
        //顾客
        flow.collect {
            println("value=$it")
        }
    }

经过flow函数结构一个flow目标,然后经过调用flow.collect收集数据。
flow函数的闭包为出产者的出产逻辑,collect函数的闭包为顾客的消费逻辑。

当然,还有更简略的写法:

    suspend fun testFlow2() {
        //出产者
        flow {
            //发射数据
            emit(5)
        }.collect {
            //顾客
            println("value=$it")
        }
    }

履行流程:

Kotlin Flow啊,你将流向何方?

Flow操作符

上面只提到了flow数据的发送以及接纳,并没有提及对flow数据的操作。
flow供给了许多操作符便利咱们对数据进行处理(对流进行加工)。
咱们以寻觅1~1000内大于500的偶数为例:

    suspend fun testFlow3() {
        //出产者
        var flow = flow {
            for (i in 1..1000) {
                emit(i)
            }
        }.filter { it > 500 && it % 2 == 0 }
        //顾客
        flow.collect {
            println("value=$it")
        }
    }

filter函数的效果依据一定的规则过滤数据,一般称这种函数为flow的操作符。
当然还能够对flow进行映射、改换、反常处理等。

    suspend fun testFlow3() {
        //出产者
        var flow = flow {
            for (i in 1..1000) {
                emit(i)
            }
        }.filter { it > 500 && it % 2 == 0 }
            .map { it - 500 }
            .catch {
                //反常处理
            }
        //顾客
        flow.collect {
            println("value=$it")
        }
    }

中心操作符
前面说过流的三要素:原始数据、对数据的操作、终究数据,对应到Flow上也是一样的。
flow的闭包里咱们看做是原始数据,而filter、map、catch等看做是对数据的操作,collect闭包里看做是终究的数据。
filter、map等操作符归于中心操作符,它们担任对数据进行处理。

中心操作符仅仅只是预先界说一些对流的操作方法,并不会主动触发动作履行

结尾操作符
结尾操作符也叫做终端操作符,调用结尾操作符后,Flow将从上流宣布数据,经过一些列中心操作符处理后,终究流到下贱形成终究数据。
如上面的collect操作符便是其间一种结尾操作符。

怎样区别中心操作符和结尾操作符呢?
和Sequence操作符相似,能够经过回来值判别。
先看看中心操作符filter:

public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
    if (predicate(value)) return@transform emit(value)
}
internal inline fun <T, R> Flow<T>.unsafeTransform(
    @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal use
    collect { value ->
        // kludge, without it Unit will be returned and TCE won't kick in, KT-28938
        return@collect transform(value)
    }
}

能够看出,filter操作符仅仅只是结构了Flow目标,并重写了collect函数。

再看结尾操作符collect:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

回来值为Unit,而且经过调用collect终究调用了emit,触发了流。

Flow比较Sequence、Collection的优势

Sequence关于协程的支撑不行好,不能调用其效果域外的suspend函数,而Collection出产数据不行灵敏,来看看Flow是怎么解决这些问题的。

    suspend fun testFlow4() {
        //出产者
        var flow = flow {
            for (i in 1..1000) {
                delay(1000)
                emit(i)
            }
        }.flowOn(Dispatchers.IO)//切换到io线程履行
        //顾客
        flow.collect {
            delay(1000)
            println("value=$it")
        }
    }

如上,flow的出产者、顾客闭包里都支撑调用协程的suspend函数,同时也支撑切换线程履行。
再者,flow能够将调集里的值一个个宣布,可调整其流速。
当然,flow还供给了许多操作符协助咱们完成各种各样的功用,此处限于篇幅就不再深入。
万变不离其宗,知道了原理,一切迎刃而解。

4. 为什么说Flow是冷流?

flow 的活动

在sequence的剖析里有提到过sequence是冷流,那么什么是冷流呢?

没有顾客,出产者不会出产数据
没有调查者,被调查者不会发送数据

    suspend fun testFlow5() {
        //出产者
        var flow = flow {
            println("111")
            for (i in 1..1000) {
                emit(i)
            }
        }.filter {
            println("222")
            it > 500 && it % 2 == 0
        }.map {
            println("333")
            it - 500
        }.catch {
            println("444")
            //反常处理
        }

如上代码,只需出产者没有顾客,该函数运转后不会有任何打印句子输出。
这个时候将顾客加上,就会触发流的活动。

仍是以最简略的flow demo为例,看看其调用流程:

Kotlin Flow啊,你将流向何方?

图上1~6过程即为最简略的flow调用流程。
能够看出,只要调用了结尾操作符(如collect)之后才会触发flow的活动,因此flow是冷流。

flow 的原理

    suspend fun testFlow1() {
        //出产者
        var flow = flow {
            //发射数据
            emit(5)
        }
        //顾客
        flow.collect {
            println("value=$it")
        }
    }

以上代码触及到三个要害函数(flow、emit、collect),两个闭包(flow闭包、collect闭包。
从上面的调用图可知,以上五者的调用关系:

flow–>collect–>flow闭包–>emit–>collect闭包

接下来逐一剖析在代码里的关系。

先看出产者动作(flow函数)
flow函数完成:


public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

传入的参数类型为:FlowCollector的扩展函数,而FlowCollector是接口,它有唯一的函数:emit(xx)。因此在flow函数的闭包里能够调用emit(xx)函数,flow闭包作为SafeFlow的成员变量block。
flow 函数回来SafeFlow,SafeFlow承继自AbstractFlow,并完成了collect函数:

#Flow.kt
    public final override suspend fun collect(SafeCollector: FlowCollector<T>) {
        //结构SafeCollector
        //collector 作为SafeCollector的成员变量
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            //抽象函数,子类完成
            collectSafely(safeCollector)
        } finally {
            safeCollector.releaseIntercepted()
        }
    }

collect的闭包作为SafeCollector的成员变量collector,后边会用到。
由此可见:flow函数仅仅只是结构了flow目标并回来。

再看顾客动作(collect)
当顾客调用flow.collect函数时:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

此刻调用的collect即为flow里界说的collect函数,并结构了匿名目标FlowCollector,完成了emit函数,而emit函数的真实完成为action,也便是外层传入的collect的闭包。

上面剖析到的collect源码里调用了collectSafely:

    private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
        override suspend fun collectSafely(collector: FlowCollector<T>) {
            collector.block()
        }
    }

此处的block即为在结构flow目标时传入的闭包。
此刻,顾客经过collect函数现已调用到出产者的闭包里

还剩余终究一个问题:出产者的闭包是怎么流转到顾客的闭包里呢?

终究看发射动作(emit)
在出产者的闭包里调用了emit函数:

    override suspend fun emit(value: T) {
        //挂起函数
        return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
            try {
                //uCont为当时协程续体
                emit(uCont, value)
            } catch (e: Throwable) {
                // Save the fact that exception from emit (or even check context) has been thrown
                lastEmissionContext = DownstreamExceptionElement(e)
                throw e
            }
        }
    }
    private fun emit(uCont: Continuation<Unit>, value: T): Any? {
        val currentContext = uCont.context
        currentContext.ensureActive()
        // This check is triggered once per flow on happy path.
        val previousContext = lastEmissionContext
        if (previousContext !== currentContext) {
            checkContext(currentContext, previousContext, value)
        }
        completion = uCont
        //collector.emit 终究调用collect的闭包
        return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    }

如此一来,出产者的闭包里调用emit函数后,将会调用到collect的闭包里,此刻数据从flow的上游流转到下贱。
总结以上过程,其实本质仍是目标调用。

中心操作符的原理
以filter为例:

    public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
        //判别过滤条件是否满意,若是则发送数据
        if (predicate(value)) return@transform emit(value)
    }
    internal inline fun <T, R> Flow<T>.unsafeTransform(
        @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
    ): Flow<R> = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal use
        //调用当时目标collect
        collect { value ->
            // kludge, without it Unit will be returned and TCE won't kick in, KT-28938
            return@collect transform(value)
        }
    }
    internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
        //结构flow,重写collect
        return object : Flow<T> {
            override suspend fun collect(collector: FlowCollector<T>) {
                collector.block()
            }
        }
    }

filter操作符结构了新的flow目标,该目标重写了collect函数。
当调用flow.collect时,先调用到filter目标的collect,进而调用到原始flow的collect,接着调用到原始flow目标的闭包,在闭包里调用的emit即为filter的闭包,若filter闭包里条件满意则调动emit函数,终究调用到collect的闭包。

Kotlin Flow啊,你将流向何方?

了解中心操作符的要点:

  1. 中心操作符回来新的flow目标,重写了collect函数
  2. collect函数会调用当时flow(调用filter的flow目标)的collect
  3. collect函数做其它的处理

与sequence相似,使用了装修者形式。
以上以filter为例阐述了原理,其它中心操作符的原理相似,此处就不再细说。

下篇将剖析Flow的背压与线程切换,相信剖析的逻辑会让大家耳目一新,敬请期待~

本文根据Kotlin 1.5.3,文中完好Demo请点击

您若喜欢,请点赞、重视、保藏,您的鼓励是我前进的动力

继续更新中,和我一同稳扎稳打体系、深入学习Android/Kotlin

1、Android各种Context的前世今生
2、Android DecorView 必知必会
3、Window/WindowManager 不可不知之事
4、View Measure/Layout/Draw 真理解了
5、Android事情分发全套服务
6、Android invalidate/postInvalidate/requestLayout 完全厘清
7、Android Window 怎么确认大小/onMeasure()多次履行原因
8、Android事情驱动Handler-Message-Looper解析
9、Android 键盘一招搞定
10、Android 各种坐标完全明了
11、Android Activity/Window/View 的background
12、Android Activity创立到View的显示过
13、Android IPC 系列
14、Android 存储系列
15、Java 并发系列不再疑问
16、Java 线程池系列
17、Android Jetpack 前置基础系列
18、Android Jetpack 易学易懂系列
19、Kotlin 轻松入门系列
20、Kotlin 协程系列全面解读