前言
上篇剖析了Kotlin Flow原理,大部分操作符完成比较简单,相较而言背压和线程切换比较复杂,遗憾的是,纵观网上大部分文章,关于Flow背压和协程切换这块的原理说得比较少,语焉不详,鉴于此,本篇要点剖析两者的原理及运用。
经过本篇文章,你将了解到:
- 什么是背压?
- 怎么处理背压?
- Flow buffer的原理
- Flow 线程切换的运用
- Flow 线程切换的原理
1. 什么是背压?
先看自然界的水流:
当上游的流速大于下流的流速,铢积寸累,终究导致大坝溢出,此种现象称为背压的呈现
而对于Kotlin里的Flow,也有上游(出产者)、下流(顾客)的概念,如:
suspend fun testBuffer1() {
var flow = flow {
//出产者
(1..3).forEach {
println("emit $it")
emit(it)
}
}
flow.collect {
//顾客
println("collect:$it")
}
}
经过collect操作符触发了流,从出产者出产数据(flow闭包),到顾客接收并处理数据(collect闭包),这就完成了流从上游到下流的一次流动进程。
2. 怎么处理背压?
模拟一个出产者顾客速度不一致的场景:
suspend fun testBuffer3() {
var flow = flow {
(1..3).forEach {
delay(1000)
println("emit $it")
emit(it)
}
}
var time = measureTimeMillis {
flow.collect {
delay(2000)
println("collect:$it")
}
}
println("use time:${time} ms")
}
核算流从出产到消费的整个时刻:
因而,整个流的耗时=出产者耗时(3 * 1000ms)+顾客耗时(3 * 2000ms)=9s。
清楚明了,顾客影响了出产者的速度,这种情况下该怎么优化呢?
最简单的解决方案:
出产者和顾客分别在不同的线程履行
如:
suspend fun testBuffer4() {
var flow = flow {
(1..3).forEach {
delay(1000)
println("emit $it in thread:${Thread.currentThread()}")
emit(it)
}
}.flowOn(Dispatchers.IO)
var time = measureTimeMillis {
flow.collect {
delay(2000)
println("collect:$it in thread:${Thread.currentThread()}")
}
}
println("use time:${time} ms")
}
添加了flowOn()函数,它的存在使得它前面的代码在指定的线程里履行,如flow闭包了的代码都在IO线程履行,也就是出产者在IO线程履行。
而顾客在当时线程履行,因而两者无需彼此等候,节省了总时刻:
确实是减少了时刻,提高了功率。但咱们知道敞开线程价值还是挺大的,已然都在协程里运行了,能否借助协程的特性:协程挂起不堵塞线程 来完成此事呢?
此刻,Buffer出场了,先看看它是怎么表演的:
suspend fun testBuffer5() {
var flow = flow {
(1..3).forEach {
delay(1000)
println("emit $it in thread:${Thread.currentThread()}")
emit(it)
}
}.buffer(5)
var time = measureTimeMillis {
flow.collect {
delay(2000)
println("collect:$it in thread:${Thread.currentThread()}")
}
}
println("use time:${time} ms")
}
这次没有运用flowOn,取而代之的是buffer。
运行成果如下:
那么它是怎么做到的呢?这就得从buffer的源码说起。
3. Flow buffer的原理
无buffer
先看看没有buffer时的耗时:
suspend fun testBuffer3() {
var flow = flow {
(1..3).forEach {
delay(1000)
println("emit $it")
emit(it)
}
}
var time = measureTimeMillis {
flow.collect {
delay(2000)
println("collect:$it")
}
}
println("use time:${time} ms")
}
从collect开端,顺次履行flow闭包,经过emit调用到collect闭包,由于flow闭包里包含了几回emit,因而整个流程会有几回发射。
如上图,从进程1到进程8,由于是在同一个线程里,因而是串行履行的,整个流的耗时即为出产者到顾客(进程1~进程8)的耗时。
有buffer
在没看源码之前,咱们先猜想一下它的流程:
每次emit都发送到buffer里,然后马上回来继续发送,如此一来出产者没有被顾客的速度拖累。
而顾客会检测Buffer里是否有数据,有则取出来。
依据之前的经历咱们知道:collect调用到emit最后到buffer是线性调用的,放入buffer后继续循环emit,那么问题来了:
是谁触发了collect闭包的调用呢?
接下来深化源码,探求答案。
buffer源码流程剖析
创建Flow
public fun <T> Flow<T>.buffer(capacity: Int = Channel.BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {
var capacity = capacity//buffer容量
var onBufferOverflow = onBufferOverflow//buffer满之后的处理战略
if (capacity == Channel.CONFLATED) {
capacity = 0
onBufferOverflow = BufferOverflow.DROP_OLDEST
}
// create a flow
return when (this) {
is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow)
//走else 分支,结构ChannelFlowOperatorImpl
else -> ChannelFlowOperatorImpl(this, capacity = capacity, onBufferOverflow = onBufferOverflow)
}
}
buffer 返回Flow实例,其间涉及几个重要的类和函数:
调用collect
当调用Flow.collect时:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
结构了匿名内部类FlowCollector,并完成了emit办法,它的完成为collect的闭包。
调用ChannelFlowOperatorImpl.collect终究会调用ChannelFlow.collect:
override suspend fun collect(collector: FlowCollector<T>): Unit =
coroutineScope {
collector.emitAll(produceImpl(this))
}
public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)
produceImpl 创建了Channel,内部敞开了协程,返回ReceiveChannel。
再来看emitAll函数:
private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
ensureActive()
var cause: Throwable? = null
try {
while (true) {
//挂起等候Channel数据
val result = run { channel.receiveCatching() }
if (result.isClosed) {
//Channel封闭后才会退出循环
result.exceptionOrNull()?.let { throw it }
break // returns normally when result.closeCause == null
}
//发送数据
emit(result.getOrThrow())
}
} catch (e: Throwable) {
cause = e
throw e
} finally {
if (consume) channel.cancelConsumed(cause)
}
}
Channel此刻并没有数据,因而协程会挂起等候。
Channel发送
Channel什么时候有数据呢?当然是在调用了Channel.send()函数后。
前面提到过collect之后敞开了协程:
public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)
internal val collectToFun: suspend (ProducerScope<T>) -> Unit
get() = { collectTo(it) }
protected override suspend fun collectTo(scope: ProducerScope<T>) =
flowCollect(SendingCollector(scope))
此刻传入的参数为:collectToFun,最后结构了:
public class SendingCollector<T>(
private val channel: SendChannel<T>
) : FlowCollector<T> {
override suspend fun emit(value: T): Unit = channel.send(value)
}
当协程得到履行时,会调用collectToFun–>collectTo(it)–>flowCollect(SendingCollector(scope)),终究调用到:
#ChannelFlowOperatorImpl
override suspend fun flowCollect(collector: FlowCollector<T>) =
flow.collect(collector)
而该flow为最开端的flow,collector为SendingCollector。
flow.collect后会调用到flow的闭包,进而调用到emit函数:
private fun emit(uCont: Continuation<Unit>, value: T): Any? {
val currentContext = uCont.context
currentContext.ensureActive()
//...
completion = uCont
return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
}
emitFun实质上会调用collector里的emit函数,而此刻的collector即为SendingCollector,最后调用channel.send(value)
如此一来,Channel就将数据发送出去了,此刻channel.receiveCatching()被唤醒,接下来履行emit(result.getOrThrow()),这函数最后会流通到最初始的collect的闭包里。
上面的剖析即为出产者到顾客的流通进程,单看源码或许比较乱,看图解惑:
Flow buffer的实质上是利用了Channel进行数据的发送和接收
buffer为啥能提高功率
前面剖析过无buffer时出产者顾客的流程图,作为对比,咱们也将参加buffer后出产者顾客的流程图。
- 出产者挂起1s,当1s完毕后调用emit发射数据,此刻数据放入buffer里,出产者调用delay继续挂起
- 此刻顾客被唤醒,然后挂起 2s等候
- 第2s到来之时,出产者调用emit发送数据到buffer里,继续挂起
- 第2s到来之时,顾客完毕挂起,消费数据,然后继续挂起2s
- 第3s到来之时,出产者继续出产数据,而后出产者退出出产
- 第5s到来之时,顾客挂起完毕,消费数据,然后继续挂起2s
- 第7s到来之时,顾客挂起完毕,消费完毕,此刻由于channel里已经没有数据了,退出循环,终究顾客退出
由此可见,总共花费了7s。
至此,咱们找到了buffer可以提高功率的原因:
出产者、顾客运行在不同的协程,挂起操作不堵塞对方
抛出一个比较有意思的问题:以下代码加buffer之后功率会有提高吗?
suspend fun testBuffer6() {
var flow = flow {
(1..3).forEach {
println("emit $it")
emit(it)
}
}
var time = measureTimeMillis {
flow.collect {
delay(2000)
println("collect:$it")
}
}
println("use time:${time} ms")
}
在未试验之前,假如你已经有答案,恭喜你已经弄懂了buffer的实质。
4. Flow 线程切换的运用
suspend fun testBuffer4() {
var flow = flow {
(1..3).forEach {
delay(1000)
println("emit $it in thread:${Thread.currentThread()}")
emit(it)
}
}.flowOn(Dispatchers.IO)
var time = measureTimeMillis {
flow.collect {
delay(2000)
println("collect:$it in thread:${Thread.currentThread()}")
}
}
println("use time:${time} ms")
}
flowOn(Dispatchers.IO)表明其之前的操作符(函数)都在IO线程履行,如这儿的意思是flow闭包里的代码在IO线程履行。
而其之后的操作符(函数)在当时的线程履行。
通常用在子线程里获取网络数据(flow闭包),然后再collect闭包里(主线程)更新UI。
5. Flow 线程切换的原理
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
checkFlowContext(context)
return when {
context == EmptyCoroutineContext -> this
this is FusibleFlow -> fuse(context = context)
else -> ChannelFlowOperatorImpl(this, context = context)
}
}
看到这你或许已经有答案了:这不就和buffer相同的方法吗?
但仔细看,此处多了个上下文:CoroutineContext。
CoroutineContext的效果就是用来决议协程运行在哪个线程。
前面剖析的buffer时,咱们的协程的效果域是runBlocking,即便出产者、顾客在不同的协程,但是它们始终在同一个线程里履行。
而运用了flowOn指定线程,此刻出产者、顾客在不同的线程运行协程。
因而,只要弄懂了buffer原理,flowOn原理自然而然就懂了。
以上为Flow背压和线程切换的全部内容,下篇将剖析Flow的热流。
本文根据Kotlin 1.5.3,文中完整Demo请点击
您若喜欢,请点赞、关注、收藏,您的鼓舞是我行进的动力
继续更新中,和我一起步步为营体系、深化学习Android/Kotlin
1、Android各种Context的宿世今生
2、Android DecorView 必知必会
3、Window/WindowManager 不行不知之事
4、View Measure/Layout/Draw 真明白了
5、Android事件分发全套服务
6、Android invalidate/postInvalidate/requestLayout 完全厘清
7、Android Window 怎么确认大小/onMeasure()屡次履行原因
8、Android事件驱动Handler-Message-Looper解析
9、Android 键盘一招搞定
10、Android 各种坐标完全明了
11、Android Activity/Window/View 的background
12、Android Activity创建到View的显示过
13、Android IPC 系列
14、Android 存储系列
15、Java 并发系列不再疑问
16、Java 线程池系列
17、Android Jetpack 前置根底系列
18、Android Jetpack 易学易懂系列
19、Kotlin 轻松入门系列
20、Kotlin 协程系列全面解读