Kotlin flow 的创立原理和流程

能够以为:Flow是协程和响应式编程的结合体。

引言

最近项目中触及到了一些硬件设备查找的需求,逻辑比较繁杂。主要内容是经过UDP/TCP识别设备后再经过HTTP获取相应数据。于是就采用了Flow的方式去完成。但有小伙伴却一不小心对Flow屡次履行了collect。由此引发了一些意想不到的问题。

在了解Flow之前,最好先了解一下Sequence序列。其简略用法如下:

fun sequenceDemo(){
    val numbers = sequence {
        repeat(3){
            yield(it)
        }
    }
    println(numbers.toList())
}

Flow的用法类似:

fun flowDemo(){
    val numbers = flow {
        repeat(3){
            emit(it)
        }
    }.flowOn(Dispatchers.IO)
    runBlocking {
        numbers.collect{
            println(it)
        }
    }
}

用法类似,可是却又有两点明显的差异:

  • sequence由于受到RestrictsSuspension的束缚,SequenceScope的扩展里(也便是sequence{}代码块)是无法运用挂起函数的。Kotlin特意增加了这个约束,用来确保sequence在整个履行过程中不会产生线程切换。这带来序列不支撑协程上下文,也无法经过调度器拟定序列创立的线程。这大概率便是Flow诞生的原因吧!
  • Flow支撑了线程调度,能够运用flowOn设定它运行时地点的线程。一起,它支撑挂起函数和协程上下文。最终经过collect消费flow的数据,collect是一个挂起函数。这意味着它必须在协程或其他挂起函数内被调用。

对Flow屡次履行collect会产生什么

现在回到正题,假如对对Flow屡次履行collect会产生什么,也便是屡次消费flow会怎样?直接经过下面代码查看输出成果:

fun flowDemo(){
    val numbers = flow {
        repeat(10){
            delay(10)
            emit(it)
        }
    }.flowOn(Dispatchers.IO)
    GlobalScope.launch(Dispatchers.IO) {
        numbers.collect{
            println("第一个:${it}")
        }
    }
    GlobalScope.launch(Dispatchers.IO) {
        numbers.collect{
            println("第二个:${it}")
        }
    }
}

输出成果如下:

第二个:0
第一个:0
第二个:1
第一个:1
第二个:2
第一个:2
....省掉
第二个:7
第一个:7
第一个:8
第二个:8
第一个:9
第二个:9

能够看到,两次collect没有任何相关。flow被消费了两次。能够发现屡次消费则屡次出产,出产和消费总是相对应的。两次消费没有认识相关。可是假如讲代码修改为下面的姿态:

fun flowDemo(){
    val numbers = flow {
        repeat(10){
            emit(getNumber())
        }
    }.flowOn(Dispatchers.IO)
    GlobalScope.launch(Dispatchers.IO) {
        numbers.collect{
            println("第一个:${it}")
        }
    }
    GlobalScope.launch(Dispatchers.IO) {
        numbers.collect{
            println("第二个:${it}")
        }
    }
    Thread.sleep(10000)
}
var i = 1
suspend fun getNumber():Int {
    delay(10)
    return i++
}

输出成果如下:

第二个:2
第一个:1
第二个:4
第一个:3
....省掉
第一个:6
第二个:9
第一个:9
第一个:10
第二个:10
....省掉
第一个:17
第二个:18

很明显,产生了线程安全问题。要想解决这个问题也很简略,只需要为getNumber加锁,修改后的代码如下:

var i = 1
val mutex = Mutex()
suspend fun getNumber(): Int {
    mutex.withLock {
        delay(10)
        return i++
    }
}

为什么不同写法成果会造成如此悬殊的成果呢?两次collect都是针对同一个flow也便是number履行的操作,可是为什么第一种写法却各自安好互不干扰呢?这一切的秘密,都和Flow的创立流程有关。

flow的创立

注:本小节需要把握基本的Kotlin常识,包括但不局限于高阶函数、扩展函数和内联函数等

不知你是否留意到如下代码中的秘密:

val numbers = flow {
	emit(1)
}.flowOn(Dispatchers.IO)fun flowDemo() {
    val numbers = flow {
        println("this is:$this and hashcode is ${this.hashCode()}")
        emit(1)
    }.flowOn(Dispatchers.IO)
    GlobalScope.launch(Dispatchers.IO) {
        numbers.collect {
            println("第一个:numbers is:$numbers and hashcode is ${numbers.hashCode()}")   
        }
    }
    GlobalScope.launch(Dispatchers.IO) {
        numbers.collect {
            println("第二个:numbers is:$numbers and hashcode is ${numbers.hashCode()}")
        }
    }
}

不难发现,numbers确实是同一个目标,可是flow代码块里的this是什么,emit办法是哪里界说的?为什么两个this不是同一个目标?

这儿为什么能够直接调用它呢?咱们直接看一下到flow{}底做了什么:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
		//注释①
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

代码逻辑很简略:flow办法承受了一个FlowCollector的扩展函数,也便是咱们界说的代码块。然后创立了一个SafeFlow并回来。到这儿,flow代码块里的thisemit办法的问题就得的了答案:this是FlowCollectoremit是它的成员函数。之所以能够在代码块里直接调用emit,是因为扩展函数默许持有被扩展类实例目标的引证。它们在Kotlin中的界说如下:

public interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}

一个简略的接口。而这个扩展函数也便是咱们所界说的代码块是在注释①,SafeFlowcollectSafely被调用的。collectSafely承受一个FlowCollector实例,并调用它的扩展办法。那么,关键问题是找到调用者和FlowCollector 创立的过程。这时,咱们就要回头看一下collect办法了:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

很明显,这是一个内联函数,也是一个Flow扩展函数。该扩展函数由调用了一个collect(collector: FlowCollector<T>)办法,FlowCollector 便是在这儿以匿名内部类的方式创立的。而在numbers.collect中,numbers是SafeFlow的实例。在上文中而它继承自AbstractFlow

public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
    @InternalCoroutinesApi
    public final override suspend fun collect(collector: FlowCollector<T>) {
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            collectSafely(safeCollector)
        } finally {
            safeCollector.releaseIntercepted()
        }
    }
    public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

能够看到,collect 最终调用到了AbstractFlow.collect 办法,并创立了SafeCollector 的实例然后调用了collectSafely 办法,而collectSafely 是一个笼统函数,它的一个完成便是上文中的注释①处。至此,咱们能够大致总结一下:

  • flow{} 承受一个FlowCollector 的扩展函数并回来SafeFlow的一个实例。
  • 一旦调用SafeFlow 的扩展函数collectcollect就会经过匿名内部类的方式创立一个FlowCollector实例。并调用Flow.collect(collector: FlowCollector<T>) 办法。
  • Flow.collect最终调用collectSafely办法 。而该办法在SafeFlow中的具体完成便是履行flow{}代码块所界说的扩展函数。最终调用emit办法开始发送数据

咱们就不深入探讨SafeCollector发送流程了。

到这儿就能够解说上面屡次collect的行为了。虽然numbers都是同一个SafeFlow的实例,可是真实履行emit生成数据的FlowCollector却不同,每次collect都会生成不同的FlowCollector

总结

经过对flow{}的解析,不难发现:SafeFlow也便是Flow便是一个”东西人“,它的意图有以下几个关键作用:

  • 供给用于出产数据的FlowCollector的扩展函数
  • 承受用于响应数据的代码块,用来消费数据
  • 在承受用于相应数据的代码块的一起,创立数据出产者FlowCollector并将出产者和顾客相关起来

SafeFlow真的就只是个东西,用来将创立出产者和顾客并将它们相关在一起。这儿也不难看出,flow是真的冷啊,collect担任创立 FlowCollector,也便是说假如没有顾客经过collect和出产者相关,出产者压根就不存在,flow{}代码块里的代码根本就不会履行,更不会有任何数据流产生!!