Kotlin flow 的创立原理和流程
能够以为:Flow是协程和响应式编程的结合体。
引言
最近项目中触及到了一些硬件设备查找的需求,逻辑比较繁杂。主要内容是经过UDP/TCP识别设备后再经过HTTP获取相应数据。于是就采用了Flow的方式去完成。但有小伙伴却一不小心对Flow屡次履行了collect
。由此引发了一些意想不到的问题。
在了解Flow之前,最好先了解一下Sequence序列。其简略用法如下:
fun sequenceDemo(){
val numbers = sequence {
repeat(3){
yield(it)
}
}
println(numbers.toList())
}
Flow的用法类似:
fun flowDemo(){
val numbers = flow {
repeat(3){
emit(it)
}
}.flowOn(Dispatchers.IO)
runBlocking {
numbers.collect{
println(it)
}
}
}
用法类似,可是却又有两点明显的差异:
- sequence由于受到RestrictsSuspension的束缚,SequenceScope的扩展里(也便是sequence{}代码块)是无法运用挂起函数的。Kotlin特意增加了这个约束,用来确保sequence在整个履行过程中不会产生线程切换。这带来序列不支撑协程上下文,也无法经过调度器拟定序列创立的线程。这大概率便是Flow诞生的原因吧!
- Flow支撑了线程调度,能够运用flowOn设定它运行时地点的线程。一起,它支撑挂起函数和协程上下文。最终经过collect消费flow的数据,collect是一个挂起函数。这意味着它必须在协程或其他挂起函数内被调用。
对Flow屡次履行collect会产生什么
现在回到正题,假如对对Flow屡次履行collect会产生什么,也便是屡次消费flow会怎样?直接经过下面代码查看输出成果:
fun flowDemo(){
val numbers = flow {
repeat(10){
delay(10)
emit(it)
}
}.flowOn(Dispatchers.IO)
GlobalScope.launch(Dispatchers.IO) {
numbers.collect{
println("第一个:${it}")
}
}
GlobalScope.launch(Dispatchers.IO) {
numbers.collect{
println("第二个:${it}")
}
}
}
输出成果如下:
第二个:0
第一个:0
第二个:1
第一个:1
第二个:2
第一个:2
....省掉
第二个:7
第一个:7
第一个:8
第二个:8
第一个:9
第二个:9
能够看到,两次collect没有任何相关。flow被消费了两次。能够发现屡次消费则屡次出产,出产和消费总是相对应的。两次消费没有认识相关。可是假如讲代码修改为下面的姿态:
fun flowDemo(){
val numbers = flow {
repeat(10){
emit(getNumber())
}
}.flowOn(Dispatchers.IO)
GlobalScope.launch(Dispatchers.IO) {
numbers.collect{
println("第一个:${it}")
}
}
GlobalScope.launch(Dispatchers.IO) {
numbers.collect{
println("第二个:${it}")
}
}
Thread.sleep(10000)
}
var i = 1
suspend fun getNumber():Int {
delay(10)
return i++
}
输出成果如下:
第二个:2
第一个:1
第二个:4
第一个:3
....省掉
第一个:6
第二个:9
第一个:9
第一个:10
第二个:10
....省掉
第一个:17
第二个:18
很明显,产生了线程安全问题。要想解决这个问题也很简略,只需要为getNumber加锁,修改后的代码如下:
var i = 1
val mutex = Mutex()
suspend fun getNumber(): Int {
mutex.withLock {
delay(10)
return i++
}
}
为什么不同写法成果会造成如此悬殊的成果呢?两次collect都是针对同一个flow也便是number履行的操作,可是为什么第一种写法却各自安好互不干扰呢?这一切的秘密,都和Flow的创立流程有关。
flow的创立
注:本小节需要把握基本的Kotlin常识,包括但不局限于高阶函数、扩展函数和内联函数等
不知你是否留意到如下代码中的秘密:
val numbers = flow {
emit(1)
}.flowOn(Dispatchers.IO)fun flowDemo() {
val numbers = flow {
println("this is:$this and hashcode is ${this.hashCode()}")
emit(1)
}.flowOn(Dispatchers.IO)
GlobalScope.launch(Dispatchers.IO) {
numbers.collect {
println("第一个:numbers is:$numbers and hashcode is ${numbers.hashCode()}")
}
}
GlobalScope.launch(Dispatchers.IO) {
numbers.collect {
println("第二个:numbers is:$numbers and hashcode is ${numbers.hashCode()}")
}
}
}
不难发现,numbers确实是同一个目标,可是flow代码块里的this
是什么,emit
办法是哪里界说的?为什么两个this
不是同一个目标?
这儿为什么能够直接调用它呢?咱们直接看一下到flow{}
底做了什么:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
//注释①
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
代码逻辑很简略:flow
办法承受了一个FlowCollector
的扩展函数,也便是咱们界说的代码块。然后创立了一个SafeFlow
并回来。到这儿,flow代码块里的this
和emit
办法的问题就得的了答案:this是FlowCollector
而emit
是它的成员函数。之所以能够在代码块里直接调用emit
,是因为扩展函数默许持有被扩展类实例目标的引证。它们在Kotlin中的界说如下:
public interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
一个简略的接口。而这个扩展函数也便是咱们所界说的代码块是在注释①,SafeFlow
的collectSafely
被调用的。collectSafely
承受一个FlowCollector
实例,并调用它的扩展办法。那么,关键问题是找到调用者和FlowCollector
创立的过程。这时,咱们就要回头看一下collect
办法了:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
很明显,这是一个内联函数,也是一个Flow
扩展函数。该扩展函数由调用了一个collect(collector: FlowCollector<T>)
办法,FlowCollector
便是在这儿以匿名内部类的方式创立的。而在numbers.collect
中,numbers是SafeFlow
的实例。在上文中而它继承自AbstractFlow
:
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
@InternalCoroutinesApi
public final override suspend fun collect(collector: FlowCollector<T>) {
val safeCollector = SafeCollector(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}
能够看到,collect
最终调用到了AbstractFlow.collect
办法,并创立了SafeCollector
的实例然后调用了collectSafely
办法,而collectSafely
是一个笼统函数,它的一个完成便是上文中的注释①处。至此,咱们能够大致总结一下:
-
flow{}
承受一个FlowCollector
的扩展函数并回来SafeFlow
的一个实例。 - 一旦调用
SafeFlow
的扩展函数collect
,collect
就会经过匿名内部类的方式创立一个FlowCollector
实例。并调用Flow.collect(collector: FlowCollector<T>)
办法。 -
Flow.collect
最终调用collectSafely
办法 。而该办法在SafeFlow
中的具体完成便是履行flow{}
代码块所界说的扩展函数。最终调用emit
办法开始发送数据
咱们就不深入探讨SafeCollector
发送流程了。
到这儿就能够解说上面屡次collect
的行为了。虽然numbers都是同一个SafeFlow
的实例,可是真实履行emit
生成数据的FlowCollector
却不同,每次collect
都会生成不同的FlowCollector
总结
经过对flow{}
的解析,不难发现:SafeFlow
也便是Flow便是一个”东西人“,它的意图有以下几个关键作用:
- 供给用于出产数据的
FlowCollector
的扩展函数 - 承受用于响应数据的代码块,用来消费数据
- 在承受用于相应数据的代码块的一起,创立数据出产者
FlowCollector
并将出产者和顾客相关起来
SafeFlow
真的就只是个东西,用来将创立出产者和顾客并将它们相关在一起。这儿也不难看出,flow是真的冷啊,collect
担任创立 FlowCollector
,也便是说假如没有顾客经过collect
和出产者相关,出产者压根就不存在,flow{}
代码块里的代码根本就不会履行,更不会有任何数据流产生!!