前言

跟着时间的推移,越来越多的干流应用现已开始全面拥抱Kotlin,协程的引入,Flow的诞生,给予了开发很多便捷,作为协程与呼应式编程结合的流式处理结构,一方面它简略的数据转化与操作符,没有繁琐的操作符处理,广受大部分开发的喜爱,另一方面它并没有呼应式编程带来的背压问题(BackPressure)的困扰;接下来,本文将会就Flow怎么处理背压问题进行评论

关于背压(BackPressure

背压问题是什么

首要咱们要清晰背压问题是什么,它是怎么发生的?简略来说,在一般的流处理结构中,音讯的接纳处理速度跟不上音讯的发送速度,从而导致数据不匹配,形成积压。假如不及时正确处理背压问题,会导致一些严重的问题

  • 比如说,音讯拥堵了,系统运转不畅从而导致崩溃
  • 比如说,资源被耗费殆尽,甚至会发生数据丢失的情况

如下图所示,能够直观了解背压问题的发生,它在生产者的生产速率高于顾客的处理速率的情况下出现

Flow是如何解决背压问题的

界说背压战略

既然咱们现已知道背压问题是怎么发生的,就要去尝试正确地处理它,大致处理方案战略在于,假如你有一个流,你需求一个缓冲区,以防数据发生的速度快于耗费的速度,所以往往就会针对这个背压战略进行些评论

  • 界说的中心缓冲区需求多大才比较适宜?
  • 假如缓冲区数据已满了,咱们怎么样处理新的事情?

关于以上问题,经过学习Flow里的背压战略,信任能够很快就知道答案了

Flow的背压机制

由于Flow是基于协程中运用的,它不需求一些奇妙规划的处理方案来清晰处理背压,在Flow中,不同于一些传统的呼应式结构,它的背压办理是运用Kotlin挂起函数suspend实现的,看下源码你会发现,它里边一切的函数办法都是运用suspend修饰符符号,这个修饰符便是为了暂停调度者的履行不堵塞线程。因而,Flow<T>在同一个协程中发射和搜集时,假如搜集器跟不上数据流,它能够简略地暂停元素的发射,直到它准备好接纳更多。看到这,是不是觉得有点难明…….

简略举个比如,假设咱们拥有一个烤箱,能够用来烤面包,由于烤箱容量的限制,一次只能烤4个面包,假如你试着一次烤8个面包,会大大加大烤箱的承载负荷,这现已远远超过了它的内存运用量,很有可能会因而烧掉你的面包。

模仿背压问题

回忆下之前所说的,当咱们耗费的速度比生产的速度慢的时分,就会发生背压,下面用代码来模仿下这个进程

  • 首要先创立一个办法,用来每秒发送元素

    fun currentTime() = System.currentTimeMillis()
    fun threadName() = Thread.currentThread().name
    var start: Long = 0fun createEmitter(): Flow<Int> =
       (1..5)
         .asFlow()
         .onStart { start = currentTime() }
         .onEach {
          delay(1000L)
          print("Emit $it (${currentTime() - start}ms) ")
         }
    
  • 接着需求搜集元素,这儿咱们推迟3秒再接纳元素, 推迟是为了夸大缓慢的顾客并创立一个超级慢的搜集器。

    fun main() {
      runBlocking {
        val time = measureTimeMillis {
          createEmitter().collect {
            print("\nCollect $it starts ${start - currentTime()}ms")
            delay(3000L)
            println("  Collect $it ends ${currentTime() - start}ms")
           }
         }
        print("\nCollected in $time ms")
       }
    }
    

    看下输出成果,如下图所示

Flow是如何解决背压问题的

这样整个进程下来,大约需求20多秒才干结束,这儿咱们模仿了接纳元素比发送元素慢的情况,因而就需求一个背压机制,而这正是Flow实质中的,它并不需求别的的规划来处理背压

背压处理方式

运用buffer进行缓存搜集

为了使缓冲和背压处理正常作业,咱们需求在独自的协程中运转搜集器。这便是.buffer()操作符进来的地方,它是将一切宣布的项目发送Channel到在独自的协程中运转的搜集器

public fun <T> Flow<T>.buffer(
  capacity: Int = BUFFERED, 
  onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): Flow<T>

它还为咱们供给了缓冲功能,咱们能够指定capacity咱们的缓冲区和处理战略onBufferOverflow,所以当Buffer溢出的时分,它为咱们供给了三个选项

enum BufferOverflow {
  SUSPEND,
  DROP_OLDEST,
  DROP_LATEST
 }
  • 默许运用SUSPEND:会将当时协程挂起,直到缓冲区中的数据被消费了
  • DROP_OLDEST:它会丢掉最老的数据
  • DROP_LATEST: 它会丢掉最新的数据

好的,咱们回到上文所展现的模仿示例,这时分咱们能够加入缓冲搜集buffer,不指定任何参数,这样默许便是运用SUSPEND,它会将当时协程进行挂起

Flow是如何解决背压问题的

此刻当搜集器繁忙的时分,程序就开始缓冲,并在第一次搜集办法调用结束的时分,两次发射后再次开始搜集,此刻流程的耗时时长缩短到大约16秒就能够履行结束,如下图所示输出成果

Flow是如何解决背压问题的

运用conflate处理

conflate操作符于Channel中的Conflate模式是一直的,新数据会直接掩盖掉旧数据,它不设缓冲区,也便是缓冲区大小为 0,丢掉旧数据,也便是采取 DROP_OLDEST 战略,那么不就等于buffer(0,BufferOverflow.DROP_OLDEST),能够看下它的源码能够佐证咱们的判别

public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)

在某些情况下,由于根本原因是处理生产消费速率不匹配的问题,咱们需求做一些取舍的操作,conflate将丢掉掉旧数据,只有在搜集器闲暇之前宣布的最后一个元素才被搜集,将上文的模仿实例改为conflate履行,你会发现咱们直接丢掉掉了2和4,或者说新的数据直接掩盖掉了它们,整个流程只需求10秒左右就履行完成了

Flow是如何解决背压问题的
Flow是如何解决背压问题的

运用collectLatest处理

经过官方介绍,咱们知道collectLatest效果在于当原始流宣布一个新的值的时分,前一个值的处理将被撤销,也便是不会被接纳, 和conflate的区别在于它不会用新的数据掩盖,而是每一个都会被处理,只不过假如前一个还没被处理完后一个就来了的话,处理前一个数据的逻辑就会被撤销

suspend fun <T> Flow<T>.collectLatest(action: suspend (T) -> Unit)

还是上文的模仿实例,这儿咱们运用collectLatest看下输出成果:

Flow是如何解决背压问题的

这样也是有副效果的,假如每个更新都十分重要,例如一些视图,状况改写,这个时分就不必要用collectLatest ; 当然假如有些更新能够无损失的掩盖,例如数据库改写,就能够运用到collectLatest,详细详细的运用场景,还需求靠开发者自己去衡量选择运用

小结

关于Flow能够说不需求额定供给什么奇妙的方式处理背压问题,Flow的实质,亦或者说Kotlin协程自身就现已供给了相应的处理方案;开发者只需求在不同的场景中选择正确的背压战略即可。总的来说,它们都是经过运用Kotlin挂起函数suspend,当流的搜集器不堪重负时,它能够简略地暂停发射器,然后在准备好接受更多元素时康复它。

关于挂起函数suspend这儿就不过多赘述了,只需求理解的一点是它与传统的基于线程的同步数据管道中背压办理十分类似,无非便是,缓慢的顾客经过堵塞生产者的线程自意向生产者施加背压,简略来说,suspend经过透明地办理跨线程的背压而不堵塞它们,将其超越单个线程并进入异步编程领域。