前言

协程系列文章:

  • 一个小故事讲理解进程、线程、Kotlin 协程到底啥联系?
  • 少年,你可知 Kotlin 协程最初的姿态?
  • 讲真,Kotlin 协程的挂起/康复没那么奥秘(故事篇)
  • 讲真,Kotlin 协程的挂起/康复没那么奥秘(原理篇)
  • Kotlin 协程调度切换线程是时分解开真相了
  • Kotlin 协程之线程池探索之旅(与Java线程池PK)
  • Kotlin 协程之取消与反常处理探索之旅(上)
  • Kotlin 协程之取消与反常处理探索之旅(下)
  • 来,跟我一同撸Kotlin runBlocking/launch/join/async/delay 原理&运用
  • 继续来,同我一同撸Kotlin Channel 深水区
  • Kotlin 协程 Select:看我怎样多路复用
  • Kotlin Sequence 是时分派上用场了
  • Kotlin Flow啊,你将流向何方?
  • Kotlin Flow 背压和线程切换竟然如此类似
  • Kotlin SharedFlow&StateFlow 热流到底有多热?
  • 狂飙吧,Lifecycle与协程、Flow的化学反应
  • 来吧!承受Kotlin 协程–线程池的7个灵魂拷问
  • 当,Kotlin Flow与Channel相逢
  • 这一次,让Kotlin Flow 操作符真正好用起来

之前的文章现已剖析了Flow的相关原理与简单运用,Flow之所以用起来香,Flow便捷的操作符功不行没,而想要熟练运用更杂乱的操作符,那么需求厘清Flow和Channel的联系。
本篇文章构成:

当,Kotlin Flow与Channel相逢

1. Flow与Channel 对比

1.1 Flow中心原理与运用场景

原理

先看最简单的Demo:

    fun test0() {
        runBlocking {
            //结构flow
            val flow = flow {
                //下流
                emit("hello world ${Thread.currentThread()}")
            }
            //收集flow
            flow.collect {
                //下流
                println("collect:$it ${Thread.currentThread()}")
            }
        }
    }

打印结果:

collect:hello world Thread[main,5,main] Thread[main,5,main]

阐明下流和上游运转在同一线程里。

当,Kotlin Flow与Channel相逢

一个最基本的flow包括如下几个元素:

  1. 操作符,也即是函数
  2. 上游,经过结构操作符创立
  3. 下流,经过结尾操作符构建

咱们能够类比流在管道里流动:

当,Kotlin Flow与Channel相逢

上游早就准备好了,只是下流没有发出指令,此刻上下流是没有树立起相关的,只有当下流渴了,需求水了才会告诉上游放水,这个时分上下流才相关起来,管道就建好了。
因而咱们以为Flow是冷流。

更多Flow细节请移步:Kotlin Flow啊,你将流向何方?

运用
根据Flow的特性,通常将其用在提供数据的场景,比方出产数据的模块将出产过程封装到flow的上游里,最终创立了flow目标。
而运用数据的模块就能够经过该flow目标去收集上游的数据,如下:

//提供数据的模块
class StudentInfo {
    fun getInfoFlow() : Flow<String> {
        return flow {
            //伪装结构数据
            Thread.sleep(2000)
            emit("name=fish age=18")
        }
    }
}
//消费数据的模块
    fun test1() {
        runBlocking {
            val flow = StudentInfo().getInfoFlow()
            flow.collect {
                println("studentInfo:$it")
            }
        }
    }

1.2 Channel中心原理与运用场景

原理
由上可知,Flow比较被迫,在没有收集数据之前,上下流是互不感知的,管道并没有建起来。
而现在咱们有个场景:

需求将管道提早建起来,在任何时分都能够在上游出产数据,在下流取数据,此刻上下流是能够感知的

先看最简单的Demo:

    fun test2() {
        //提早树立通道/管道
        val channel = Channel<String>()
        GlobalScope.launch {
            //上游放数据(放水)
            delay(200)
            val data = "放水啦"
            println("上游:data=$data ${Thread.currentThread()}")
            channel.send(data)
        }
        GlobalScope.launch {
            val data = channel.receive()
            println("下流收到=$data ${Thread.currentThread()}")
        }
    }

当,Kotlin Flow与Channel相逢

一个最基本的Channel包括如下几个元素:

  1. 创立Channel
  2. 往Channel里放数据(出产)
  3. 从Channel里取数据(消费)

当,Kotlin Flow与Channel相逢

运用
能够看出与Flow不同的是,出产者、顾客都能够往Channel里存放/取出数据,只是能否进行有用的存放,能否成功取出数据需求依据Channel的状况确定。
Channel最大的特色:

  1. 出产者、顾客访问Channel是线程安全的,也便是说不论出产者和顾客在哪个线程,它们都能安全的存取数据
  2. 数据只能被消费一次,上游发送了1条数据,只要有1个下流消费了数据,则其它下流将不会拿到此数据

更多Channel细节请移步:继续来,同我一同撸Kotlin Channel 深水区

2. Flow与Channel 相逢

2.1 Flow切换线程的始末

考虑一种场景:需求在flow里进行耗时操作(比方网络恳求),外界拿到flow目标后等待收集数据即可。 很简单咱们就想到如下写法:

    fun test3() {
        runBlocking {
            //结构flow
            val flow = flow {
                //下流
                //模仿耗时
                thread { 
                    Thread.sleep(3000)
                    emit("hello world ${Thread.currentThread()}")
                }
            }
        }
    }

惋惜的是编译不经过:

当,Kotlin Flow与Channel相逢

因为emit是挂起函数,需求在协程作用域里调用。

当然,添加一个协程作用域也很简单:

    fun test4() {
        runBlocking {
            //结构flow
            val flow = flow {
                //下流
                val coroutineScope = CoroutineScope(Job() + Dispatchers.IO)
                coroutineScope.launch {
                    //模仿耗时,在子线程履行
                    Thread.sleep(3000)
                    emit("hello world ${Thread.currentThread()}")
                }
            }
            flow.collect {
                println("collect:$it")
            }
        }
    }

编译没有报错,满心欢喜履行,等待3s后,事与愿违:

当,Kotlin Flow与Channel相逢

意思是”检测到了在另一个线程里发射数据,这种行为不是线程安全的因而被禁止了”。

检查源码发现:

当,Kotlin Flow与Channel相逢

在emit之前会检测emit地点的协程与collect地点协程是否共同,不共同就抛出反常。
显然在咱们上面的Demo里,collect归于runBlocking协程,而emit归于咱们新开的协程,当然不一样了。

2.2 ChannelFlow 闪亮上台

2.2.1 克己丐版ChannelFlow

既然是线程安全问题,咱们很简单想到运用Channel来处理,在此之前需求对Flow进行封装:

//参数为SendChannel扩展函数
class MyFlow(private val block: suspend SendChannel<String>.() -> Unit) : Flow<String> {
    //结构Channel
    private val channel = Channel<String>()
    override suspend fun collect(collector: FlowCollector<String>) {
        val coroutineScope = CoroutineScope(Job() + Dispatchers.IO)
        coroutineScope.launch {
            //启动协程
            //模仿耗时,在子线程履行
            Thread.sleep(3000)
            //把Channel目标传递出去
            block(channel)
        }
        //获取数据
        val data = channel.receive()
        //发射
        collector.emit(data)
    }
}

如上,重写了Flow的collect函数,当外界调用flow.collect时:

  1. 先启动一个协程
  2. 从channel里读取数据,没有数据则挂起当前协程
  3. 1里的协程履行,调用flow的闭包履行上游逻辑
  4. 拿到数据后进行发射,最终传递到collect的闭包

外界运用flow:

    fun test5() {
        runBlocking {
            //结构flow
            val myFlow = MyFlow {
                send("hello world emit 线程: ${Thread.currentThread()}")
            }
            myFlow.collect {
                println("下流收到=$it collect 线程: ${Thread.currentThread()}")
            }
        }
    }

最终打印:

下流收到=hello world emit 线程: Thread[DefaultDispatcher-worker-1,5,main] collect 线程: Thread[main,5,main]

能够看出,上游、下流在不同的协程里履行,也在不同的线程里履行。
如此一来就满足了需求。

2.2.2 ChannelFlow 中心原理

上面重写的Flow没有运用泛型,也没有对Channel进行封闭,还有其它的点没有完善。
还好官方现已提供了完善的类和操作符,得益于此咱们很简单就完结如上需求。

    fun test6() {
        runBlocking {
            //结构flow
            val channelFlow = channelFlow<String> {
                send("hello world emit 线程: ${Thread.currentThread()}")
            }
            channelFlow.collect {
                println("下流收到=$it collect 线程: ${Thread.currentThread()}")
            }
        }
    }

接着来简单剖析其原理:

#ChannelFlow.kt
private open class ChannelFlowBuilder<T>(
    //闭包目标
    private val block: suspend ProducerScope<T>.() -> Unit,
    context: CoroutineContext = EmptyCoroutineContext,
    //Channel模式
    capacity: Int = Channel.BUFFERED,
    //Buffer满之后的处理方式,此处是挂起
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
    //...
    override suspend fun collectTo(scope: ProducerScope<T>) =
        //调用闭包
        block(scope)
    //...
}
public abstract class ChannelFlow<T>(
    // upstream context
    @JvmField public val context: CoroutineContext,
    // buffer capacity between upstream and downstream context
    @JvmField public val capacity: Int,
    // buffer overflow strategy
    @JvmField public val onBufferOverflow: BufferOverflow
) : FusibleFlow<T> {
    //produceImpl 敞开的新协程会调用这
    internal val collectToFun: suspend (ProducerScope<T>) -> Unit
        get() = { collectTo(it) }
    public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
        //创立Channel协程,回来Channel目标
        scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)
    //重写collect函数
    override suspend fun collect(collector: FlowCollector<T>): Unit =
        //敞开协程
        coroutineScope {
            //发射数据
            collector.emitAll(produceImpl(this))
        }
}

produceImpl函数并不耗时,仅仅只是敞开了新的协程。
接着来看collector.emitAll:

#Channels.kt
private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
    ensureActive()
    var cause: Throwable? = null
    try {
        //循环从Channel读取数据
        while (true) {
            //从Channel获取数据
            val result = run { channel.receiveCatching() }
            if (result.isClosed) {
                //如果Channel封闭了,也便是上游封闭了,则退出循环
                result.exceptionOrNull()?.let { throw it }
                break // returns normally when result.closeCause == null
            }
            //发射数据
            emit(result.getOrThrow())
        }
    } catch (e: Throwable) {
        cause = e
        throw e
    } finally {
        //封闭Channel
        if (consume) channel.cancelConsumed(cause)
    }
}

从源码或许无法一眼厘清其流程,老规矩上图就会清晰明晰。

当,Kotlin Flow与Channel相逢

上一小结丐版的实现便是参照channelFlow,若是了解了丐版,再来了解官方豪华版就比较简单。

2.2.3 ChannelFlow 应用场景

检查ChannelFlow衍生的子类:

当,Kotlin Flow与Channel相逢

这些子类是Flow里各种杂乱操作符的基础,如:
buffer、flowOn、flatMapLatest、flatMapMerge等。
因而把握了ChannelFlow再来看各种操作符就会豁然开朗。

2.3 callbackFlow 拯救你的回调

2.3.1 原理

运用channelFlow {},尽管能够在新的协程里履行闭包,但由于新协程的调度器是运用collect地点协程调度器不够灵敏:

    fun test6() {
        runBlocking {
            //结构flow
            val channelFlow = channelFlow<String> {
                send("hello world emit 线程: ${Thread.currentThread()}")
            }
            channelFlow.collect {
                println("下流收到=$it collect 线程: ${Thread.currentThread()}")
            }
        }
    }

collect地点的协程为runBlocking协程,而send函数尽管在新的协程里,但它的协程调度器运用的是collect协程的,因而send函数与collect函数所运转的线程是同一个线程。
尽管咱们能够更改外层的调度器使之运转在不同的线程如:

    fun test6() {
        GlobalScope.launch {
            //结构flow
            val channelFlow = channelFlow<String> {
                send("hello world emit 线程: ${Thread.currentThread()}")
            }
            channelFlow.collect {
                println("下流收到=$it collect 线程: ${Thread.currentThread()}")
            }
        }
    }

但终归不灵敏,从规划的视点来说,Flow(目标)的提供者并不关怀运用者在什么样的环境下进行collect操作。

还是以网络恳求为例:

fun getName(callback:NetResult<String>) {
    thread {
        //伪装从网络获取
        Thread.sleep(2000)
        callback.onSuc("I'm fish")
    }
}
interface NetResult<T> {
    fun onSuc(t:T)
    fun onFail(err:String)
}

如上,存在这样一个网络恳求,在子线程里进行网络恳求,并经过回调告诉外部调用者。
很典型的一个恳求回调,该怎样把它封装为Flow呢?尝试用channelFlow进行封装:

    fun test7() {
        runBlocking {
            //结构flow
            val channelFlow = channelFlow {
                getName(object : NetResult<String> {
                    override fun onSuc(t: String) {
                        println("begin send")
                        trySend("hello world emit 线程: ${Thread.currentThread()}")
                        println("stop send")
                    }
                    override fun onFail(err: String) {
                    }
                })
            }
            channelFlow.collect {
                println("下流收到=$it collect 线程: ${Thread.currentThread()}")
            }
        }
    }

看似美好,实则却收不到数据,明明”begin send”和”stop send”都打印了,为啥collect闭包里没有打印呢?
getName函数内部敞开了线程,因而它本身并不是耗时操作,由此可知channelFlow闭包很快就履行完结了。
由ChannelFlow源码可知:CoroutineScope.produce的闭包履行完毕后会封闭Channel:

当,Kotlin Flow与Channel相逢

既然channel都封闭了,当子线程里回调onSuc并履行trySend并不会再往channel发送数据,collect当然就收不到了。

要处理这个问题也很简单:不让协程封闭channel,换句话说只要协程没有完毕,那么channel就不会被封闭。而让协程不完毕,最直接的方法便是在协程里调用挂起函数。
刚好,官方也提供了相应的挂起函数:

    fun test7() {
        runBlocking {
            //结构flow
            val channelFlow = channelFlow {
                getName(object : NetResult<String> {
                    override fun onSuc(t: String) {
                        println("begin send")
                        trySend("hello world emit 线程: ${Thread.currentThread()}")
                        println("stop send")
                        //封闭channel,触发awaitClose闭包履行
                        close()
                    }
                    override fun onFail(err: String) {
                    }
                })
                //挂起函数
                awaitClose {
                    //走到此,channel封闭
                    println("awaitClose")
                }
            }
            channelFlow.collect {
                println("下流收到=$it collect 线程: ${Thread.currentThread()}")
            }
        }
    }

相较上个Demo而言,增加了2点:

  1. awaitClose 挂起协程,该协程不完毕,则channel不被封闭
  2. channel运用完结后需求开释资源,自动调用channel的close函数,该函数最终会触发awaitClose闭包履行,在闭包里做一些开释资源的操作

你或许会说以上用法不太友好,如果不知道有awaitClose这函数,都无法排查为啥没收到数据。 嗯,这官方也考虑到了,那便是callbackFlow。

当,Kotlin Flow与Channel相逢

能够看出就比channelFlow函数多了个判别:
若是履行了红框部分,阐明该协程没有被挂起,则抛出反常提示咱们在协程里调用awaitClose函数。

2.3.2 运用

和channelFlow的运用如出一辙:

    fun test8() {
        runBlocking {
            //结构flow
            val channelFlow = callbackFlow {
                getName(object : NetResult<String> {
                    override fun onSuc(t: String) {
                        println("begin send")
                        trySend("hello world emit 线程: ${Thread.currentThread()}")
                        println("stop send")
                        //封闭channel,触发awaitClose闭包履行
//                        close()
                    }
                    override fun onFail(err: String) {
                    }
                })
                //挂起函数
                awaitClose {
                    //走到此,channel封闭
                    println("awaitClose")
                }
            }
            channelFlow.collect {
                println("下流收到=$it collect 线程: ${Thread.currentThread()}")
            }
        }
    }

有了callbackFlow,咱们就能够优雅的将回调转为Flow提供给外部调用者运用。

3. Flow与Channel 互转

3.1 Channel 转 Flow

Flow和Channel相遇,磕碰出了ChannelFlow,ChannelFlow顾名思义,既是Channel也是Flow,因而能够作为中介对Flow与Channel进行转化。

    fun test9() {
        runBlocking {
            val channel = Channel<String>()
            val flow = channel.receiveAsFlow()
            GlobalScope.launch {
                flow.collect {
                    println("collect:$it")
                }
            }
            delay(200)
            channel.send("hello fish")
        }
    }

channel经过send,flow经过collect收集。

3.2 Flow 转 Channel

    fun test10() {
        runBlocking {
            val flow = flow {
                emit("hello fish")
            }
            val channel = flow.produceIn(this)
            val data = channel.receive()
            println("data:$data")
        }
    }

flow.produceIn(this) 触发collect操作,从而履行flow闭包,emit将数据放到channel里,最后经过channel.receive()取数据。

下篇将完全解析Flow各种操作符,把握了ChannelFlow再去看操作符相信你会如虎添翼。

本文根据Kotlin 1.5.3,文中完整试验Demo请点击

您若喜爱,请点赞、关注、保藏,您的鼓舞是我前进的动力

继续更新中,和我一同稳扎稳打体系、深入学习Android/Kotlin

1、Android各种Context的前世今生
2、Android DecorView 必知必会
3、Window/WindowManager 不行不知之事
4、View Measure/Layout/Draw 真理解了
5、Android事情分发全套服务
6、Android invalidate/postInvalidate/requestLayout 彻底厘清
7、Android Window 怎样确定大小/onMeasure()多次履行原因
8、Android事情驱动Handler-Message-Looper解析
9、Android 键盘一招搞定
10、Android 各种坐标彻底明晰
11、Android Activity/Window/View 的background
12、Android Activity创立到View的显示过
13、Android IPC 系列
14、Android 存储系列
15、Java 并发系列不再疑惑
16、Java 线程池系列
17、Android Jetpack 前置基础系列
18、Android Jetpack 易学易懂系列
19、Kotlin 轻松入门系列
20、Kotlin 协程系列全面解读