前言
协程系列文章:
- 一个小故事讲理解进程、线程、Kotlin 协程到底啥联系?
- 少年,你可知 Kotlin 协程最初的姿态?
- 讲真,Kotlin 协程的挂起/康复没那么奥秘(故事篇)
- 讲真,Kotlin 协程的挂起/康复没那么奥秘(原理篇)
- Kotlin 协程调度切换线程是时分解开真相了
- Kotlin 协程之线程池探索之旅(与Java线程池PK)
- Kotlin 协程之取消与反常处理探索之旅(上)
- Kotlin 协程之取消与反常处理探索之旅(下)
- 来,跟我一同撸Kotlin runBlocking/launch/join/async/delay 原理&运用
- 继续来,同我一同撸Kotlin Channel 深水区
- Kotlin 协程 Select:看我怎样多路复用
- Kotlin Sequence 是时分派上用场了
- Kotlin Flow啊,你将流向何方?
- Kotlin Flow 背压和线程切换竟然如此类似
- Kotlin SharedFlow&StateFlow 热流到底有多热?
- 狂飙吧,Lifecycle与协程、Flow的化学反应
- 来吧!承受Kotlin 协程–线程池的7个灵魂拷问
- 当,Kotlin Flow与Channel相逢
- 这一次,让Kotlin Flow 操作符真正好用起来
之前的文章现已剖析了Flow的相关原理与简单运用,Flow之所以用起来香,Flow便捷的操作符功不行没,而想要熟练运用更杂乱的操作符,那么需求厘清Flow和Channel的联系。
本篇文章构成:
1. Flow与Channel 对比
1.1 Flow中心原理与运用场景
原理
先看最简单的Demo:
fun test0() {
runBlocking {
//结构flow
val flow = flow {
//下流
emit("hello world ${Thread.currentThread()}")
}
//收集flow
flow.collect {
//下流
println("collect:$it ${Thread.currentThread()}")
}
}
}
打印结果:
collect:hello world Thread[main,5,main] Thread[main,5,main]
阐明下流和上游运转在同一线程里。
一个最基本的flow包括如下几个元素:
- 操作符,也即是函数
- 上游,经过结构操作符创立
- 下流,经过结尾操作符构建
咱们能够类比流在管道里流动:
上游早就准备好了,只是下流没有发出指令,此刻上下流是没有树立起相关的,只有当下流渴了,需求水了才会告诉上游放水,这个时分上下流才相关起来,管道就建好了。
因而咱们以为Flow是冷流。
更多Flow细节请移步:Kotlin Flow啊,你将流向何方?
运用
根据Flow的特性,通常将其用在提供数据的场景,比方出产数据的模块将出产过程封装到flow的上游里,最终创立了flow目标。
而运用数据的模块就能够经过该flow目标去收集上游的数据,如下:
//提供数据的模块
class StudentInfo {
fun getInfoFlow() : Flow<String> {
return flow {
//伪装结构数据
Thread.sleep(2000)
emit("name=fish age=18")
}
}
}
//消费数据的模块
fun test1() {
runBlocking {
val flow = StudentInfo().getInfoFlow()
flow.collect {
println("studentInfo:$it")
}
}
}
1.2 Channel中心原理与运用场景
原理
由上可知,Flow比较被迫,在没有收集数据之前,上下流是互不感知的,管道并没有建起来。
而现在咱们有个场景:
需求将管道提早建起来,在任何时分都能够在上游出产数据,在下流取数据,此刻上下流是能够感知的
先看最简单的Demo:
fun test2() {
//提早树立通道/管道
val channel = Channel<String>()
GlobalScope.launch {
//上游放数据(放水)
delay(200)
val data = "放水啦"
println("上游:data=$data ${Thread.currentThread()}")
channel.send(data)
}
GlobalScope.launch {
val data = channel.receive()
println("下流收到=$data ${Thread.currentThread()}")
}
}
一个最基本的Channel包括如下几个元素:
- 创立Channel
- 往Channel里放数据(出产)
- 从Channel里取数据(消费)
运用
能够看出与Flow不同的是,出产者、顾客都能够往Channel里存放/取出数据,只是能否进行有用的存放,能否成功取出数据需求依据Channel的状况确定。
Channel最大的特色:
- 出产者、顾客访问Channel是线程安全的,也便是说不论出产者和顾客在哪个线程,它们都能安全的存取数据
- 数据只能被消费一次,上游发送了1条数据,只要有1个下流消费了数据,则其它下流将不会拿到此数据
更多Channel细节请移步:继续来,同我一同撸Kotlin Channel 深水区
2. Flow与Channel 相逢
2.1 Flow切换线程的始末
考虑一种场景:需求在flow里进行耗时操作(比方网络恳求),外界拿到flow目标后等待收集数据即可。 很简单咱们就想到如下写法:
fun test3() {
runBlocking {
//结构flow
val flow = flow {
//下流
//模仿耗时
thread {
Thread.sleep(3000)
emit("hello world ${Thread.currentThread()}")
}
}
}
}
惋惜的是编译不经过:
当然,添加一个协程作用域也很简单:
fun test4() {
runBlocking {
//结构flow
val flow = flow {
//下流
val coroutineScope = CoroutineScope(Job() + Dispatchers.IO)
coroutineScope.launch {
//模仿耗时,在子线程履行
Thread.sleep(3000)
emit("hello world ${Thread.currentThread()}")
}
}
flow.collect {
println("collect:$it")
}
}
}
编译没有报错,满心欢喜履行,等待3s后,事与愿违:
检查源码发现:
在emit之前会检测emit地点的协程与collect地点协程是否共同,不共同就抛出反常。
显然在咱们上面的Demo里,collect归于runBlocking协程,而emit归于咱们新开的协程,当然不一样了。
2.2 ChannelFlow 闪亮上台
2.2.1 克己丐版ChannelFlow
既然是线程安全问题,咱们很简单想到运用Channel来处理,在此之前需求对Flow进行封装:
//参数为SendChannel扩展函数
class MyFlow(private val block: suspend SendChannel<String>.() -> Unit) : Flow<String> {
//结构Channel
private val channel = Channel<String>()
override suspend fun collect(collector: FlowCollector<String>) {
val coroutineScope = CoroutineScope(Job() + Dispatchers.IO)
coroutineScope.launch {
//启动协程
//模仿耗时,在子线程履行
Thread.sleep(3000)
//把Channel目标传递出去
block(channel)
}
//获取数据
val data = channel.receive()
//发射
collector.emit(data)
}
}
如上,重写了Flow的collect函数,当外界调用flow.collect时:
- 先启动一个协程
- 从channel里读取数据,没有数据则挂起当前协程
- 1里的协程履行,调用flow的闭包履行上游逻辑
- 拿到数据后进行发射,最终传递到collect的闭包
外界运用flow:
fun test5() {
runBlocking {
//结构flow
val myFlow = MyFlow {
send("hello world emit 线程: ${Thread.currentThread()}")
}
myFlow.collect {
println("下流收到=$it collect 线程: ${Thread.currentThread()}")
}
}
}
最终打印:
下流收到=hello world emit 线程: Thread[DefaultDispatcher-worker-1,5,main] collect 线程: Thread[main,5,main]
能够看出,上游、下流在不同的协程里履行,也在不同的线程里履行。
如此一来就满足了需求。
2.2.2 ChannelFlow 中心原理
上面重写的Flow没有运用泛型,也没有对Channel进行封闭,还有其它的点没有完善。
还好官方现已提供了完善的类和操作符,得益于此咱们很简单就完结如上需求。
fun test6() {
runBlocking {
//结构flow
val channelFlow = channelFlow<String> {
send("hello world emit 线程: ${Thread.currentThread()}")
}
channelFlow.collect {
println("下流收到=$it collect 线程: ${Thread.currentThread()}")
}
}
}
接着来简单剖析其原理:
#ChannelFlow.kt
private open class ChannelFlowBuilder<T>(
//闭包目标
private val block: suspend ProducerScope<T>.() -> Unit,
context: CoroutineContext = EmptyCoroutineContext,
//Channel模式
capacity: Int = Channel.BUFFERED,
//Buffer满之后的处理方式,此处是挂起
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
//...
override suspend fun collectTo(scope: ProducerScope<T>) =
//调用闭包
block(scope)
//...
}
public abstract class ChannelFlow<T>(
// upstream context
@JvmField public val context: CoroutineContext,
// buffer capacity between upstream and downstream context
@JvmField public val capacity: Int,
// buffer overflow strategy
@JvmField public val onBufferOverflow: BufferOverflow
) : FusibleFlow<T> {
//produceImpl 敞开的新协程会调用这
internal val collectToFun: suspend (ProducerScope<T>) -> Unit
get() = { collectTo(it) }
public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
//创立Channel协程,回来Channel目标
scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)
//重写collect函数
override suspend fun collect(collector: FlowCollector<T>): Unit =
//敞开协程
coroutineScope {
//发射数据
collector.emitAll(produceImpl(this))
}
}
produceImpl函数并不耗时,仅仅只是敞开了新的协程。
接着来看collector.emitAll:
#Channels.kt
private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
ensureActive()
var cause: Throwable? = null
try {
//循环从Channel读取数据
while (true) {
//从Channel获取数据
val result = run { channel.receiveCatching() }
if (result.isClosed) {
//如果Channel封闭了,也便是上游封闭了,则退出循环
result.exceptionOrNull()?.let { throw it }
break // returns normally when result.closeCause == null
}
//发射数据
emit(result.getOrThrow())
}
} catch (e: Throwable) {
cause = e
throw e
} finally {
//封闭Channel
if (consume) channel.cancelConsumed(cause)
}
}
从源码或许无法一眼厘清其流程,老规矩上图就会清晰明晰。
上一小结丐版的实现便是参照channelFlow,若是了解了丐版,再来了解官方豪华版就比较简单。
2.2.3 ChannelFlow 应用场景
检查ChannelFlow衍生的子类:
buffer、flowOn、flatMapLatest、flatMapMerge等。
因而把握了ChannelFlow再来看各种操作符就会豁然开朗。
2.3 callbackFlow 拯救你的回调
2.3.1 原理
运用channelFlow {},尽管能够在新的协程里履行闭包,但由于新协程的调度器是运用collect地点协程调度器不够灵敏:
fun test6() {
runBlocking {
//结构flow
val channelFlow = channelFlow<String> {
send("hello world emit 线程: ${Thread.currentThread()}")
}
channelFlow.collect {
println("下流收到=$it collect 线程: ${Thread.currentThread()}")
}
}
}
collect地点的协程为runBlocking协程,而send函数尽管在新的协程里,但它的协程调度器运用的是collect协程的,因而send函数与collect函数所运转的线程是同一个线程。
尽管咱们能够更改外层的调度器使之运转在不同的线程如:
fun test6() {
GlobalScope.launch {
//结构flow
val channelFlow = channelFlow<String> {
send("hello world emit 线程: ${Thread.currentThread()}")
}
channelFlow.collect {
println("下流收到=$it collect 线程: ${Thread.currentThread()}")
}
}
}
但终归不灵敏,从规划的视点来说,Flow(目标)的提供者并不关怀运用者在什么样的环境下进行collect操作。
还是以网络恳求为例:
fun getName(callback:NetResult<String>) {
thread {
//伪装从网络获取
Thread.sleep(2000)
callback.onSuc("I'm fish")
}
}
interface NetResult<T> {
fun onSuc(t:T)
fun onFail(err:String)
}
如上,存在这样一个网络恳求,在子线程里进行网络恳求,并经过回调告诉外部调用者。
很典型的一个恳求回调,该怎样把它封装为Flow呢?尝试用channelFlow进行封装:
fun test7() {
runBlocking {
//结构flow
val channelFlow = channelFlow {
getName(object : NetResult<String> {
override fun onSuc(t: String) {
println("begin send")
trySend("hello world emit 线程: ${Thread.currentThread()}")
println("stop send")
}
override fun onFail(err: String) {
}
})
}
channelFlow.collect {
println("下流收到=$it collect 线程: ${Thread.currentThread()}")
}
}
}
看似美好,实则却收不到数据,明明”begin send”和”stop send”都打印了,为啥collect闭包里没有打印呢?
getName函数内部敞开了线程,因而它本身并不是耗时操作,由此可知channelFlow闭包很快就履行完结了。
由ChannelFlow源码可知:CoroutineScope.produce的闭包履行完毕后会封闭Channel:
要处理这个问题也很简单:不让协程封闭channel,换句话说只要协程没有完毕,那么channel就不会被封闭。而让协程不完毕,最直接的方法便是在协程里调用挂起函数。
刚好,官方也提供了相应的挂起函数:
fun test7() {
runBlocking {
//结构flow
val channelFlow = channelFlow {
getName(object : NetResult<String> {
override fun onSuc(t: String) {
println("begin send")
trySend("hello world emit 线程: ${Thread.currentThread()}")
println("stop send")
//封闭channel,触发awaitClose闭包履行
close()
}
override fun onFail(err: String) {
}
})
//挂起函数
awaitClose {
//走到此,channel封闭
println("awaitClose")
}
}
channelFlow.collect {
println("下流收到=$it collect 线程: ${Thread.currentThread()}")
}
}
}
相较上个Demo而言,增加了2点:
- awaitClose 挂起协程,该协程不完毕,则channel不被封闭
- channel运用完结后需求开释资源,自动调用channel的close函数,该函数最终会触发awaitClose闭包履行,在闭包里做一些开释资源的操作
你或许会说以上用法不太友好,如果不知道有awaitClose这函数,都无法排查为啥没收到数据。 嗯,这官方也考虑到了,那便是callbackFlow。
若是履行了红框部分,阐明该协程没有被挂起,则抛出反常提示咱们在协程里调用awaitClose函数。
2.3.2 运用
和channelFlow的运用如出一辙:
fun test8() {
runBlocking {
//结构flow
val channelFlow = callbackFlow {
getName(object : NetResult<String> {
override fun onSuc(t: String) {
println("begin send")
trySend("hello world emit 线程: ${Thread.currentThread()}")
println("stop send")
//封闭channel,触发awaitClose闭包履行
// close()
}
override fun onFail(err: String) {
}
})
//挂起函数
awaitClose {
//走到此,channel封闭
println("awaitClose")
}
}
channelFlow.collect {
println("下流收到=$it collect 线程: ${Thread.currentThread()}")
}
}
}
有了callbackFlow,咱们就能够优雅的将回调转为Flow提供给外部调用者运用。
3. Flow与Channel 互转
3.1 Channel 转 Flow
Flow和Channel相遇,磕碰出了ChannelFlow,ChannelFlow顾名思义,既是Channel也是Flow,因而能够作为中介对Flow与Channel进行转化。
fun test9() {
runBlocking {
val channel = Channel<String>()
val flow = channel.receiveAsFlow()
GlobalScope.launch {
flow.collect {
println("collect:$it")
}
}
delay(200)
channel.send("hello fish")
}
}
channel经过send,flow经过collect收集。
3.2 Flow 转 Channel
fun test10() {
runBlocking {
val flow = flow {
emit("hello fish")
}
val channel = flow.produceIn(this)
val data = channel.receive()
println("data:$data")
}
}
flow.produceIn(this) 触发collect操作,从而履行flow闭包,emit将数据放到channel里,最后经过channel.receive()取数据。
下篇将完全解析Flow各种操作符,把握了ChannelFlow再去看操作符相信你会如虎添翼。
本文根据Kotlin 1.5.3,文中完整试验Demo请点击
您若喜爱,请点赞、关注、保藏,您的鼓舞是我前进的动力
继续更新中,和我一同稳扎稳打体系、深入学习Android/Kotlin
1、Android各种Context的前世今生
2、Android DecorView 必知必会
3、Window/WindowManager 不行不知之事
4、View Measure/Layout/Draw 真理解了
5、Android事情分发全套服务
6、Android invalidate/postInvalidate/requestLayout 彻底厘清
7、Android Window 怎样确定大小/onMeasure()多次履行原因
8、Android事情驱动Handler-Message-Looper解析
9、Android 键盘一招搞定
10、Android 各种坐标彻底明晰
11、Android Activity/Window/View 的background
12、Android Activity创立到View的显示过
13、Android IPC 系列
14、Android 存储系列
15、Java 并发系列不再疑惑
16、Java 线程池系列
17、Android Jetpack 前置基础系列
18、Android Jetpack 易学易懂系列
19、Kotlin 轻松入门系列
20、Kotlin 协程系列全面解读