继续谈谈从Rxjava迁移到Flow的背压策略

前语

关于背压问题不久前就讨论过了,这儿就不过多介绍了,总归它是一个非常复杂的话题,本文的首要意图是分析咱们怎么从Rxjava迁移到Flow而且运用其背压计划,由于本身技能的限制以及协程内部的复杂性,不会做过多的深入讨论,仅仅经过相似黑盒测验的方法,给出一些示例比较它们之前存在的差异以及怎么去运用不同的背压处理计划。鉴于 RxJava 和协程的完结差异,每个示例的实际输出根本都不会相同,这些示例的意图是阐明它们之间处理背压的不同战略

本文会侧重于从Rxjava的角度出发去比照Flow背压的差异和相关背压战略的运用计划,关于Flow处理背压的简略运用,之前有专门共享过一篇文章,感兴趣的小伙伴能够移步 —–> Flow是怎么处理背压问题的

关于Rxjava的背压

首要不得不说到Rxjava中最常用的Observable,有一个咱们称之为无限缓冲区,由于Observable 没有供给高雅的方法来处理背压,一切发送/接纳的数据都存储在内存中并保证订阅者接纳到它。假如发送的数据量非常非常大,那么终究或许会导致 OOM内存溢出,程序会发生crash

Rxjava中只有Flowable流类型才有对背压进行处理,它默许具有128巨细的缓冲区,是经过Subscriber接口支持的,咱们看下它的内部代码结构

public interface Subscriber<T> {
    public void onSubscribe(Subscription s); 
    public void onNext(T t); 
    public void onError(Throwable t) 
    public void onComplete(); 
}

而供给背压管理内容的功用正是其内部的Subscripion接口

public interface Subscription {
    public void request(long n);
    public void cancel();
}

特别request方法,每当下流能够运用更多事情时,它都会向上游发送恳求,供给它能够运用的事情数量。

因此,根本通讯如下:

  • 在订阅下流恳求一些事情(比方 1)

  • 上游收到该恳求并发生下一个事情

  • 当下流接纳到事情时,它能够向上游恳求更多事情

    这便是支持背压的方法:假如下流没有恳求事情,生产者应该中止生产事情。假如运用了某种背压战略而且生产者能够宣布新项目而顾客无法消费它,那么生产者或许会丢掉掉一些值(Drop)或缓冲(Buffer)它们。这或许被视为从链的底部到顶部的通讯(简称链式通讯),以告知上游是否宣布值,而且(根据背压战略)对没有准备好发送给顾客的值运用一些规则。

关于Flow背压

Flow中一切都会更加复杂,由于本文不深入研究,所以也不做过多讨论,只需求知道它没有像Rxjava那样从下流到上游的直接链式通讯,一切的一切都是根据suspend,当流的搜集器不堪重负时,它能够简略地暂停发射器,然后在准备好接受更多元素时恢复它;当下流暂停(或做一些其他作业)时,上游或许会识别而且不发射元素。

Flow&Rxjava的简略比较

简略来说,为了比较RxjavaFlow背压战略的不同,咱们经过一些测验用例进行阐明,例如模仿从网络中拿取一些数据等,这儿也不用那些看起来就非常复杂的官方术语解释了,总归就一句话,Rxjava会将它进行堵塞,而Flow则将它挂起/暂停

Flow&Rxjava

话不多说,接下来咱们将创建一些上游,然后定义一些推迟事情,一起来看下下面这个比如吧

关于Rxjava,咱们能够这么定义:

private fun flowable(delay: Long, mode: BackpressureStrategy, limit: Int): Flowable<Int> =
        Flowable.create<Int>({ emitter ->
            for (i in 1..limit) {
                Thread.sleep(delay)
                emitter.onNext(i)
            }
        }, mode)

关于Flow,咱们等效战略代码如下:

private fun flow(timeout: Long, limit: Int): Flow<Int> = flow {
        for (i in 1..limit) {
            delay(timeout)
            emit(i)
        }
    }

简略来说,以上便是完结在每个事情之间推迟timeout时长,宣布limit + 1项意图流

接下来再具体验证下,所以我弥补了一些代码,如下所示,将让咱们的上游每 100毫秒生产一次项目,并顺次用 200 毫秒和 300 毫秒处理它们。经过这样的设置,估计(关于顾客来说,它需求大约 500 毫秒来处理成果)并不是一切的事情都会被消费。

//by Rxjava 
fun testFlowable(mode: BackpressureStrategy, limit: Int = 10) {
    val latch = CountDownLatch(1)
    val stringBuffer = StringBuffer()
    val time = System.currentTimeMillis()
​
    flowable(delay = 100, mode = mode, limit = limit)
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.computation(), false, 1)
        .map { doWorkBlocking(i = it, delay = 200) }
        .map { doWorkBlocking(i = it, delay = 300) }
        .doOnComplete {
            latch.countDown()
        }
        .subscribe {
            stringBuffer.append("$it")
        }
​
    latch.await()
​
    println(System.currentTimeMillis() - time)
    println(stringBuffer.toString())
}
​
//by flow
fun testFlow(limit: Int = 10, onBackpressure: Flow<Int>.() -> Flow<Int>) {
​
    val latch = CountDownLatch(1)
    val time = System.currentTimeMillis()
​
    val stringBuffer = StringBuffer()
​
    CoroutineScope(Job() + Dispatchers.Default).launch {
​
        flow(timeout = 100, limit = limit)
            .flowOn(Dispatchers.IO)
            .onBackpressure()
            .map { doWorkDelay(i = it, timeout = 200) }
            .map { doWorkDelay(i = it, timeout = 300) }
            .onCompletion { latch.countDown() }
            .collect {
                stringBuffer.append("$it")
            }
    }
​
    latch.await()
    println((System.currentTimeMillis() - time))
    println(stringBuffer.toString())
}

Rxjava的背压战略

Rxjava Flowable供给了一些背压战略

  • Drop:假如超出缓冲区巨细,则丢掉未恳求的项目
  • Buffer:缓冲生产者的一切项目,注意OOM内存泄漏
  • Latest:只保留最新的项目
缓冲Observable的过度生成

一般情况下,当数据生成速度快于数据耗费速度时,就会出现背压,这时候缓冲Observable会过度生成,以至于顾客来不及处理;所以咱们来讨论下Rxjava最初处理背压的方法,处理过度生成Observable的最初的方法是为 Observer无法处理的元素定义某种缓冲区

咱们能够经过调用buffer() 方法来做到这一点:

val source = PublishSubject.create<Int>()
          source.buffer(1024)
              .observeOn(Schedulers.computation())
              .subscribe(ComputeFunction::compute, Throwable::printStackTrace);

定义一个巨细为 1024 的缓冲区会给观察者一些时刻来赶上生产过剩的来源,缓冲区将存储没有处理的元素。咱们能够添加缓冲区巨细,以便为生成的值留出满足的空间,这个其实和下文即将说到的BackpressureBuffer()处理战略的性质大致相同,能够说是它的初级版本,下文会对该战略计划进行实践弥补。

请注意,一般来说,这或许仅仅一个临时修复,由于假如生产者过度生成元素超出了预测的缓冲区巨细,溢出依然或许发生。

这样说或许会有点抽象,咱们做一个类比,想像一下,有一个这样的漏斗,咱们不断地去灌某种液体去填充这个漏斗,假如它发生的速度快于它流出的速度,锥体部分将作为缓冲器作业一段时刻,假如它装满就会溢出

漏斗.jpg

为了让它不要溢出来,对此咱们用不同的方法去对待:

  • 假如液体是那种贵金属的话,能够运用锥形部分更大的漏斗来添加缓冲液(BUFFER
  • 不介意液体浪费的话,就让它倒出来吧 (DROPLATEST
  • 假如或许的话,调整发射器以更慢地发生

接下来咱们对这些背压战略分别进行实践阐明

.onBackpressureBuffer()

关于这种情况,咱们只能画出相似之处并为咱们的用例完结等效处理计划。也便是说——咱们期望接纳宣布的事情,即使顾客跟不上。看下官方的示例图:

BackpressureBuffer.webp

RxJavaFlow将以不同的方法处理这个问题。

RxJava 能够缓冲项目,直到顾客准备好处理它们(这或许导致OutOfMemoryException),而Flow能够暂停发射器。运用上文的测验代码,咱们分别传入对应的参数进行测验

   //buffer
    BackpressureTest.testFlowable(BackpressureStrategy.BUFFER)
    BackpressureTest.testFlow { buffer() }

成果大致如下所示

5114
1 2 3 4 5 6 7 8 9 10

好的,下面再写个示例进行测验,发射10个数据进行模仿,而且运用onBackPressureBuffer这种背压战略,RxJava中的示例如下所示:

fun rxBuffer() {
        Flowable.range(1, 10)
            .onBackpressureBuffer()
            .observeOn(Schedulers.single())
            .subscribe { value ->
                Thread.sleep(100)
                println("Get value: $value")
            }
        Thread.sleep(1000)
    }

它会将一切成果进行输出,如下图所示

42b62edcdca0586244f9c399e3e0d85.png

然后咱们看下Flow中对应的等效战略是怎么处理的

fun main() = runBlocking {
    (1..1_000_000).asFlow()
        .buffer(capacity = 0, onBufferOverflow = BufferOverflow.SUSPEND)
        .collect { value ->
            delay(100)
            println("Got value: $value")
        }
}

它的输出成果和上图是相同的

42b62edcdca0586244f9c399e3e0d85.png

你或许会问,这个比如是不是和没有任何缓冲区相同的,由于Flow默许情况下会暂停发射器。这是一个很好的观察成果,但请记住,当发射器和搜集器在单独的协程中运转时,假如它们都需求一些时刻才干完结的时候,它们能够并发履行,这样速度会更快。

在这个比如中,咱们现已简略研究了其间一种战略,现在让咱们快速看一下其他战略。

.onBackpressureDrop()

假如顾客无法跟上而且缓冲区已满,此战略将丢掉一切宣布的项目。在这种情况下(相似于 Latest)生产者将丢掉顾客无法消费的一切项数据,但不会保留最新数据,看下官方的示例图:

BackpressureDrop.webp

好的,举个栗子,仍是延用之前的示例,咱们仍旧发射10个数据进行模仿,这儿咱们运用onBackpressureDrop背压战略,这是来自 RxJava 的示例:

fun rxDrop() {
        Flowable.range(1, 10)
            .onBackpressureDrop()
            //将默许的缓冲区size改为1
            .observeOn(Schedulers.single(), false)
            .subscribe { value ->
                Thread.sleep(100)
                println("Get Value: $value")
            }
        Thread.sleep(1000)
    }

能够看到它的输出成果只拿到了一个1,之后的一切值都被丢掉了,由于顾客无法处理它们。

8487b17da104714187a679ad46943f6.png

这儿有几件事要注意下:

  • 首要,假如咱们没有更改Scheduler调度器,咱们将获得一切项目,由于这段代码将同步运转,而且顾客会堵塞生产者。相似地,默许情况下 Flow的话,假如在同一个协程中,它会顺次在发射和搜集之间交替,直到Flow完结。
  • 另外,请记住,observeOn() 中运用默许的缓冲区巨细是128,这儿咱们指定了size为1。假如运用默许缓冲区巨细,只要咱们的运转时刻满足长,就会和之前相同获得一切发射出来的元素

运用的等效战略Flow如下所示:

fun flowDrop() = runBlocking {
    (1..10).asFlow()
        .buffer(capacity = 1, onBufferOverflow = BufferOverflow.DROP_LATEST)
        .collect { value ->
            delay(100)
            println("Get value: $value")
        }
}

Rxjava不相同的是,鉴于协程的性质,搜集器处理前两个元素,但主意是相同的——一切未处理的元素都被丢掉。之前文章现已讨论过了,当Flow调用.collect() 的时候,默许情况下,发射和搜集将在同一个协程中次序运转。换句话说,搜集器将暂停发射器,直到它准备好接纳更多数据。

f28f67125adcbf66f5902dc4c7ea71e.png

那么这儿发生了什么呢?为了使缓冲和背压处理正常作业,咱们需求在单独的协程中运转搜集器。这便是.buffer()操作员进来的当地。它将一切宣布的项目经过 Flow 发送Channel到在单独的协程中运转的搜集器

它还为咱们供给了缓冲功用:

public fun <T> Flow<T>.buffer(
    capacity: Int = BUFFERED, 
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): Flow<T>

咱们能够指定咱们的缓冲区capacity和处理战略onBufferOverflow

在这个比如中,咱们现已简略研究了其间一种战略,接下来让咱们快速看一下本文叙述的最后一种背压战略。

.onBackpressureLatest()

这个战略的主意是在背压的情况下只宣布最新的项目。但是处理计划在 Rxjava 和协程Flow中的作业方法又有所不同。下面是官方给出的图示:

BackpressureLatest.webp

咱们能够看到,这种战略它在顾客来不及接纳消息的时候,丢掉掉之前的数据,只保留生产者给的最新的数据

进入正题,仍是经过一个栗子来进行阐明,仍旧是和之前相同发射10个数据进行模仿

首要,让咱们看一下 Rxjava 示例:

  fun rxLatest() {
        Flowable.range(1, 10)
            .onBackpressureLatest()
            .observeOn(Schedulers.single(), false, 2)
            .subscribe { value ->
                Thread.sleep(100)
                println("Get value:$value ")
            }
        Thread.sleep(1000L)
    }

为了阐明一个要点,咱们将缓冲区巨细添加到2。能够看下输出成果,在这种情况下,缓冲区将填充最旧的值,并删除一切后续值,最后一个值在外。

5697e549c512100fcee1199d970bee4.png

但是在Flow中,等效处理计划如下所示,buffer中现已供给了这样的战略DROP_OLDEST

 fun flowLatest() = runBlocking {
        (1..10).asFlow()
            .buffer(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST)
            .collect { value ->
                delay(100)
                println("Get Value: $value")
            }
    }

这种战略它的新数据会直接覆盖掉旧数据,不设缓冲区,也便是缓冲区巨细为 0,丢掉旧数据,Flow中对此也供给了相应的操作符conflate, 看下输出成果

5b916a83c5616a4324a62bffd7af2b6.png

关于缓冲和背压的终究主意

RxJava 库还有更多处理背压的功用,笔者就不再共享了。它在灵活性和挑选方面或许很好,但在学习曲线方面也很糟糕,正确运用这些操作对大多数开发人员来说并不是一项微乎其微的任务,需求花上不小的精力,达到运用通晓的门槛相对较高。

总的来说,RxJava FlowableKotlin Flow 都支持背压,但依然存在差异。这些差异首要是根据 RxJava 内置的背压支持,它从下到上作业(下流能够在需求更多值时告知上游),而 Kotlin Flow背压首要根据suspend挂起函数,当咱们从 RxJava 迁移到 Kotlin Flow 时,请特别注意运用 Flowable 和运用到背压战略的当地,所以在我看来,Kotlin Flow初学者更友爱且易于运用,一起为您或许遇到的大多数问题供给了处理计划。

好咯,关于背压的相关讨论就到此划上句号吧,还有许多当地值得继续深入学习,由于背压是一个非常复杂的问题,我仅仅简略的分析下Rxjava&Flow之间背压战略的区别以及完结方法,还有许多东西要学,共勉!!!!!

参考资料

  • Rxjava背压实践
  • RxJava 背压,你为什么要关心?
  • 运用 RxJava 处理背压
  • Kotlin Flows 中的背压