RxJava来源

作为现在最常用异步库,其常常与Retrofit+OKHttp一起构建APP的网络操作等模块,再加上其呼应式编程自身特性,可运用场景更是益发增多。RxJava经过不断迭代,目前版别为3.X,可掩盖日常开发大多数异步操作场景。详细说RxJava之前,不得不先说ReactiveX。ReactiveX官方介绍是 Alibrary for composing anychronouse and event-based programs by using observables sequences” 大白话意思便是这是个运用可调查序列来编写异步和依据事情的程序的库。

而RxJava在github上的介绍则是“RxJava is a Java VM implementation of Reactive Extensions:a library for composing asynchronous and event-based programs by using observable sequences.”

能够说,RxJava实际上是ReactiveX在JVM上的一种完结。(除此之外还有RxAndroid,了解即可)

有爱好可访问:github.com/ReactiveX/R…

特色

RxJava依据ReactiveX,继承了其一切特色,依据调查者办法,支撑流水式的处理数据或许事情,并提供大量的操作符来处理数据和事情序列而使得不必过多关心底层多线程新冠问题。划重点便是,调查者办法,异步处理(呼应式编程)。

其要素即为:

  1. Observable(被调查者)

  2. Observer/Subscriber(调查者)

  3. subscribe (订阅)

其核心能够了解为两方面:异步数据流和呼应式编程:

把一切的事情(数据)看成是一条河流,它能够被调查、过滤或操作,也能够和另外一条河流汇组成一条新的河流。

一旦事情(数据)发生或发生变化,就能够告诉调查这些事情的角色(调查者/订阅者)做出呼应处理。

除此之外,其链式调用也算是显著特色。

运用

增加依赖:

implementation'io.reactivex.rxjava3:rxjava:3.0.0'
implementation'io.reactivex.rxjava3:rxandroid:3.0.0'
1、创立被调查者 (Observable)
val ohThisIsObservable = Observable.create<String>{
    it.onNext("Hello")  //发送"事情"
    it.onNext("rx")
    it.onNext("world")
    it.onComplete() //发送完结"事情"
}

这儿采用了create()创立被调查者,但并非只要create()能创立,其他操作符也能够达成此作用(后边介绍)。

2、创立调查者 (Observer)
val observer: Observer<String> = object : Observer<String> {
    override fun onSubscribe(d: Disposable) { System.out.println(" onSubscribe ") }
    override fun onNext(string: String) { System.out.println(" onNext : "+string) }
    override fun onError(e: Throwable) { System.out.println(e) }
    override fun onComplete() { System.out.println(" on Complete ") }
}

PS:这儿日志都是手动增加

可看见这儿呼应事情别离有以下:

onSubscribe():准备监听,最早调用的办法;

onNext():用来发送数据,调用一次发送一条;

onError():发送反常告诉,只发送一次,屡次调用也只会发送第一条;

onComplete():发送完结告诉,只发送一次,屡次调用也只会发送第一条。

PS:onError()和onComplete()互斥,俩办法同时只能调用一个,要么发生反常onError()不会回调onComplete(),要么正常回调onComplete(),不回调onError()。

3、订阅 (Subscribe)
ohThisIsObservable.subscribe(observer)

最终一步。

运转代码,会发现如下成果:

RxJava入门及常用操作符(Kotlin版)
日志中可发现,当被调查者(ohThisIsObservable)经过调用onNext()发射数据的时分,调查者(observer)调用onNext()接纳数据;当被调查者(ohThisIsObservable)调用onComplete()时,调查者(observer)调用onComplete(),其他事情将不会持续发送(onError同此理)。

RxJava中,调查者不只仅只要observer才能完结,下面是个简略版示例:

val consumer: Consumer<String> =
    Consumer { s ->
        //创立调查者consumer
        println(s)
    }
val stringObservable = Observable.create { emitter ->
    emitter.onNext("Hello")
    emitter.onNext("~~~rx~~~")
    emitter.onNext("world")
    emitter.onComplete()
}
//被调查者宣布一连串字符并指定consumer订阅被调查者
stringObservable.subscribe(consumer)

对应输出成果如图:

RxJava入门及常用操作符(Kotlin版)
由以上代码可见,Observer相对于Consumer在接口办法上要多onSubscribe、onNext、onError、onComplete这些接口,在一次事情中,可操作程度更精密。

Dispose

在onSubscribe()中会接纳到一个Disposable目标,该目标相当于一个开关,假如开关封闭,则调查者不会收到任何事情和数据。例如:

val observer: Observer<String> = object : Observer<String> {
    var mDisposeable: Disposable? = null
    override fun onSubscribe(d: Disposable) {
        println(" onSubscribe ")
        mDisposeable = d
    }
    override fun onNext(s: String) {
        println(" onNext : $s")
        if (s == "stop") {
            mDisposeable!!.dispose()
        }
    }
    override fun onError(e: Throwable) {
        println(" onError ")
    }
    override fun onComplete() {
        println(" onComplete ")
    }
}
Observable.just("Hello", "world", "stop", "coding").subscribe(observer)

在上述代码中咱们运用一个变量来保存Disposable目标,在onNext办法中假如传过来的字符串是“stop”,则调用dispose封闭事情的接纳,后续字符串不在发射,乃至onComplete()也不会履行了。成果如下图:

RxJava入门及常用操作符(Kotlin版)

操作符

Rxjava提供大量操作符来完结对数据处理,这些操作符也能够了解成函数。假如把Rxjava比喻成一道数据流水线,那么一个操作符便是一道工序,数据经过这些工序加工改换、组装,最终生产出咱们想要的数据。其常用操作符一览如下:

RxJava入门及常用操作符(Kotlin版)
(图片不清楚可私信我要xmind原文件)

操作符类型

接下来这段纯概念,内容较长,不喜可直接跳过看后边示例。

创立型

RxJava入门及常用操作符(Kotlin版)

转化型

RxJava入门及常用操作符(Kotlin版)

组合型

RxJava入门及常用操作符(Kotlin版)

功能型

RxJava入门及常用操作符(Kotlin版)

过滤型

RxJava入门及常用操作符(Kotlin版)

条件型

RxJava入门及常用操作符(Kotlin版)
开发过程中纷歧定会用到一切操作符,这儿只对常用操作符做解说。

just操作符

用于创立一个被调查者,并发送事情,发送的事情不能够超越10个以上(从其构造函数就能够看出,如下图):

RxJava入门及常用操作符(Kotlin版)
简略写个示例:

val justObservable = Observable.just("Hello", "rx", "world~!")
val observer: Observer<String> = object : Observer<String> {
    override fun onSubscribe(d: Disposable) { System.out.println(" onSubscribe ") }
    override fun onNext(string: String) { System.out.println(" onNext : "+string) }
    override fun onError(e: Throwable) { System.out.println(e) }
    override fun onComplete() { System.out.println(" on Complete ") }
}
justObservable.subscribe(observer)

对应输出成果为:

RxJava入门及常用操作符(Kotlin版)

链式调用

RxJava最便利的一个特征便是链式调用,上述代码能够修正为:

Observable.just("Hello", "rx", "world").subscribe(object : Observer<String> {
    override fun onSubscribe(d: Disposable) { System.out.println(" onSubscribe ") }
    override fun onNext(string: String) { System.out.println(" onNext : "+string) }
    override fun onError(e: Throwable) { System.out.println(e) }
    override fun onComplete() { System.out.println(" on Complete ") }
})

作用相同(Java代码在这儿的表现办法则是lamba表达式),但跟之前看起来给人感觉彻底纷歧样,如无特别说明,后续比方都会如此调用。

fromArray操作符

类似于just,可是能够传入无限个参数,许多量限制,此处省略运用办法,有爱好可自行研讨。

fromIterable操作符

可直接传一个List给调查者发射(Listextends Collection接口,而CollectionextendsIterable接口,所以能够直接传进去)。例如:

val arrayList = ArrayList<String>()
arrayList.add("111")
arrayList.add("222")
Observable.fromIterable(arrayList).subscribe(object : Observer<String> {
    override fun onSubscribe(d: Disposable) { System.out.println(" onSubscribe ") }
    override fun onNext(string: String) { System.out.println(" onNext : "+string) }
    override fun onError(e: Throwable) { System.out.println(e) }
    override fun onComplete() { System.out.println(" on Complete ") }
})

对应成果为:

RxJava入门及常用操作符(Kotlin版)

map操作符

map操作符能直接对发射出来的事情进行处理而且发生新的事情,然后再次发射。例如下述比方:

Observable.just("Hello").map<Any> { "get it!" }
    .subscribe(object : Observer<Any> {
        override fun onSubscribe(d: Disposable) {
            println(" onSubscribe ")
        }
        override fun onNext(o: Any) {
            println(" onNext : "+o)
        }
        override fun onError(e: Throwable) {
            println(" onError ")
        }
        override fun onComplete() {
            println(" onComplete ")
        }
    })

这儿咱们本来传入参数是“Hello”,经过map()阻拦后发射出去的参数变成了“get it!”,阻拦修正成功。

RxJava入门及常用操作符(Kotlin版)
除此之外,map()还能完结数据类型的转化,有爱好的能够自己测验。

flatMap操作符

flat,英语翻译过来的意思是“使变平”的意思,跟map()相同,都能直接对发射出来的事情进行处理而且发生新的事情。但其内部办法参数不同。二者都是传参进Function()中并在apply()中进行数据修正,但二者传入参数不同。

RxJava入门及常用操作符(Kotlin版)

RxJava入门及常用操作符(Kotlin版)
map()是两个泛型,而flatMap()第二个参数填Observable被调查者,再将这个被调查者发射出去,这一下灵活度就增大了,这也是网络恳求场景中最常用的操作符。下述简略示例:

Observable.just("注册").flatMap<Any> { s ->
    println(s + "成功")
    Observable.just("进行登陆")
}.subscribe(object : Observer<Any> {
    override fun onSubscribe(d: Disposable) {
        println(" onSubscribe ")
    }
    override fun onNext(o: Any) {
        println(" onNext :"+o)
    }
    override fun onError(e: Throwable) {
        println(" onError ")
    }
    override fun onComplete() {
        println(" onComplete ")
    }
})

对应的日志打印也是意料之中:

RxJava入门及常用操作符(Kotlin版)

concatMap操作符

concatMap()与flatMap()运用办法彻底一致,基本上是相同的。不过,concatMap()转宣布来的数据是有序的,而flatMap()是无序的。flatMap()呈现无序的状况概率不高,有爱好的能够自己测验一下。

buffer操作符

buffer()有多参数办法,这儿介绍最常用的,单参数办法,buffer(x):依据个数来缓冲,每次缓冲x个数据转化成数组,再发射出去,例如:

Observable.just("1","2","3","4","5","8","9","7","6","10")
    .buffer(3)
    .subscribe(object : Observer<Any> {
    override fun onSubscribe(d: Disposable) {
        println(" onSubscribe ")
    }
    override fun onNext(o: Any) {
        println(" onNext :"+o)
    }
    override fun onError(e: Throwable) {
        println(" onError ")
    }
    override fun onComplete() {
        println(" onComplete ")
    }
})

对应输出成果为:

RxJava入门及常用操作符(Kotlin版)

concat操作符

能够将多个调查者组合在一起,然后按照之前发送顺序发送事情。需求留意的是,concat()最多只能够发送4个事情。

RxJava入门及常用操作符(Kotlin版)
示例如下:

Observable.concat(Observable.just("111"),Observable.just("222")).subscribe ( object : Observer<Any>{
    override fun onSubscribe(d: Disposable) {
        println(" onSubscribe ")
    }
    override fun onNext(t: Any) {
        println(" onNext : "+t)
    }
    override fun onError(e: Throwable) {
        println(" onError ")
    }
    override fun onComplete() {
        println(" onComplete ")
    }
} )

对应输出成果为:

RxJava入门及常用操作符(Kotlin版)
connatArray()与concat()作用相同,不过concatArray()能够发送多于4个被调查者,这儿就不赘述了。

异步

RxJava提供了非常便利的API来完结线程的调度,内置的线程调度器有以下几个:

Schedulers.single():单线程调度器,线程可复用;
Schedulers.newThread():为每个使命创立新的线程;
Schedulers.io():处理I/O密集使命,内部线程池完结,可依据需求增长;
Schedulers.computation():处理核算使命,如事情循环和回调使命; Schedulers.immediate():默许指定的线程,也便是当前线程; AndroidSchedulers.mainThread():Android主线程调度器,属于RxAndroid。

线程调度器实际上是指派事情在什么样的线程中处理,所需运用场景就不难想象了,假如该事情是耗时操作,比方网络恳求,但相应成果会先是在UI中,这时分在主线程履行网络恳求就不适宜了,但在子线程履行,成果同样要改写UI,也不太适宜,这儿就凸显自在切换线程的好处了。Rxjava可经过调度器来拟定被调查者和调查者别离能够在什么线程中履行自己的代码,而指定调度器的API则是:subscribeOn和observeOn。

subscribeOn

首要,咱们不必线程调度器,咱们先看调查者和被调查者默许状况下在什么线程中履行自己代码,如下:

Observable.create(object : ObservableOnSubscribe<Any>{
    override fun subscribe(emitter: ObservableEmitter<Any>) {
        println(" subscribe : "+ Thread.currentThread())
        emitter.onNext(" guess wich thread ")
        emitter.onComplete()
    }
}).subscribe ( object : Observer<Any>{
    override fun onSubscribe(d: Disposable) {
        println(" onSubscribe : "+ Thread.currentThread())
    }
    override fun onNext(t: Any) {
        println(" onNext : "+t+" : "+Thread.currentThread())
    }
    override fun onError(e: Throwable) {
        println(" onError : "+ Thread.currentThread())
    }
    override fun onComplete() {
        println(" onComplete : "+ Thread.currentThread())
    }
} )

对应的成果为:

RxJava入门及常用操作符(Kotlin版)
可见默许状况下,调查者和被调查者都是在主线程中履行。假定这个时分要履行耗时操作,Android程序必定溃散,所以咱们这时要切换线程。

subscribeOn()实际上是指定被调查者的代码在哪个线程中履行。例如:

Observable.create(object : ObservableOnSubscribe<Any>{
    override fun subscribe(emitter: ObservableEmitter<Any>) {
        println(" subscribe : "+ Thread.currentThread())
        try {
            Thread.sleep(2000) //这儿sleep模拟耗时操作
        } catch (e: InterruptedException) {
            e.printStackTrace()
        }
        emitter.onNext(" guess wich thread ")
        emitter.onComplete()
    }
}).subscribeOn(Schedulers.newThread()) //决议履行subscribe办法所在的线程,也便是发生事情或发射事情所在的线程
    .subscribe ( object : Observer<Any>{
    override fun onSubscribe(d: Disposable) {
        println(" onSubscribe : "+ Thread.currentThread())
    }
    override fun onNext(t: Any) {
        println(" onNext : "+t+" : "+Thread.currentThread())
    }
    override fun onError(e: Throwable) {
        println(" onError : "+ Thread.currentThread())
    }
    override fun onComplete() {
        println(" onComplete : "+ Thread.currentThread())
    }
} )

这段代码中采用subscribeOn(Schedulers.newThread())来指定在新建线程中履行:

RxJava入门及常用操作符(Kotlin版)
这时运转得到成果:

RxJava入门及常用操作符(Kotlin版)
可见日志是不对的,onNext()、onComplete()都没有打印。原因很简略,咱们在主线程创立调查者和被调查者之后,事情发送的履行转交给调度器Schedulers.newThread(),还没等来得及新线程发送出事情,主线程就直接退出了,所以后续日志看不到,鉴于此,咱们使主线程休眠sleep2秒,在上述办法的后边调用如下代码:

try {
    Thread.sleep(2000) //这儿sleep延时主线程
} catch (e: InterruptedException) {
    e.printStackTrace()
}

输出成果为:

RxJava入门及常用操作符(Kotlin版)
(一开始只要onSubscribe和subscribe打印,顿一下后打印onNext和onComplete)

这儿咱们再切换调度器为Schedulers.io(),会发现打印作用相同。 这儿需求留意的是,subscribeOn()调用位置没有特别要求,能够放到操作符前面、中间、后边都行,当然也能够屡次调用,可是只要第一次调用有效。

observeOn

observeOn()指定后续的操作符以及调查者的代码在什么样的线程中履行。且observeOn()能够屡次被调用,每次调用都收效。

RxJava入门及常用操作符(Kotlin版)
例如下述代码:

Observable.create(object : ObservableOnSubscribe<Any> {
    override fun subscribe(emitter: ObservableEmitter<Any>) {
        Log.i("rxdemo", " subscribe : " + Thread.currentThread())
        emitter.onNext(" guess wich thread ")
        emitter.onComplete()
    }
})
    .subscribeOn(Schedulers.io()) //决议履行subscribe办法所在的线程,也便是发生事情或发射事情所在的线程
    .observeOn(AndroidSchedulers.mainThread()) //决议下流事情被处理时所在的线程
    .subscribe(object : Observer<Any> {
        override fun onSubscribe(d: Disposable) {
            Log.i("rxdemo", " onSubscribe : " + Thread.currentThread())
        }
        override fun onNext(t: Any) {
            Log.i("rxdemo", " onNext : " + t + " : " + Thread.currentThread())
        }
        override fun onError(e: Throwable) {
            Log.i("rxdemo", " onError : " + Thread.currentThread())
        }
        override fun onComplete() {
            Log.i("rxdemo", " onComplete : " + Thread.currentThread())
        }
    })

对应输出成果为:

RxJava入门及常用操作符(Kotlin版)
(上述代码中AndroidSchedulers.mainThread()出自RxAndroid,因而这段代码要丢到安卓工程里运转才不会报错)

留意observeOn()每次指定都收效,有爱好的能够多测验下在其他位置调用,不同位置调用作用或许纷歧样。

背压

这个词是从backpressure直译过来,背压即来自背部的压力,指当被调查者宣布许多的数据或事情时,调查者来不及处理,都积压在那,压的调查者喘不过气,有时分还会导致OOM。

如下述代码:

Observable.create(object : ObservableOnSubscribe<Any> {
    override fun subscribe(emitter: ObservableEmitter<Any>) {
        while (true){
            emitter.onNext(" subscribe : Hello ")
        }
    }
})
    .subscribeOn(Schedulers.io()) //被调查者在I/O线程履行
    .observeOn(Schedulers.newThread()) //调查者在新线程履行
    .subscribe {    //Consumer
        Thread.sleep(9000);
        Log.i("rxdemo"," accept ~");
    }

调查者和被调查者在不同线程中履行,被调查者是个死循环不停发射,同时调查者处理数据的速度放缓了一些,休眠9秒处理一次。这时咱们能够在Profiler中能够看到:

RxJava入门及常用操作符(Kotlin版)
内存随时间可见的上升,这种状况假如不处理,很大概率或许会呈现OOM。究其原因是因为发送数据方和接纳数据方不在一个线程内,两个线程步调纷歧致,发送数据太多处理不来就缓存起来,直到内存用完,这便是背压。针对背压,Rxjava提供了支撑背压处理的调查者和被调查者,即Flowable和Subscriber。

Flowable

Flowable是Observable(调查者)的一种新完结,但Flowable额定完结了非阻塞式背压战略。同时,用Flowable的时分调查者变为Subscriber。例如下面示例:

Flowable.create(
    { emitter ->
        Log.d("rxdemo", "send 1")
        emitter.onNext(1)
        Log.d("rxdemo", "send 2")
        emitter.onNext(2)
        Log.d("rxdemo", "finish")
        emitter.onComplete()
    },
    BackpressureStrategy.ERROR
).subscribe(object : Subscriber<Int> {
    override fun onSubscribe(s: Subscription) {
        Log.d("rxdemo", "onSubscribe")
        s.request(2)
    }
    override fun onNext(integer: Int) {
        Log.d("rxdemo", "get the $integer")
    }
    override fun onError(t: Throwable) {
        Log.w("rxdemo", "onError: ", t)
    }
    override fun onComplete() {
        Log.d("rxdemo", "onComplete")
    }
})

对应输出成果为:

RxJava入门及常用操作符(Kotlin版)
看到这儿,你会对两个地方发生疑问,一个是onSubscribe()中的s.request(2),这儿是向调查者恳求处理2条数据的意思,假如没有这行代码,则咱们不恳求处理数据,程序则会触发这儿的背压战略:BackpressureStrategy.ERROR,直接报错。当然,背压战略不只这一个,还有其他几个:

背压战略

BackpressureStrategy.ERROR:直接抛出MissingBackpressureException反常; BackpressureStratery.MISSING:不运用背压,没有缓存,仅提示:缓存区满了 BackpressureStratery.BUFFER:缓存一切数据,直到调查者处理,假如调查者处理不及时也会呈现OOM,被调查者可无限发送事情,但实际上是放在缓存区。 BackpressureStratery.DROP:丢弃超越缓存区巨细(128)的数据 BackpressureStratery.LATEST:只保存最新的最终的事情,超越缓存区巨细(128)时用新数据掩盖老数据。

到此,咱们能够总结下,背压的呈现是为了解决两个方面首要问题:
当发送数据速度>接受数据速度,数据堆叠缓存会撑满;
当缓存区巨细存满,被调查者持续发送下一个事情时(仍是相当于撑爆了缓存区)

到这儿你会发现,这仍是个缓存区问题,那么这个缓存区是否便是128呢?咱们能够经过Flowable.bufferSize()来获取缓存的巨细,例如:

Flowable.create(
    { emitter ->
        //发送128个Hello buffer
        for (i in 0 until Flowable.bufferSize()) {
            Log.d("rxdemo", "Hello buffer $i")
            emitter.onNext("Hello buffer $i")
        }
    },
    BackpressureStrategy.ERROR
).subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .subscribe(object : Subscriber<String> {
    override fun onSubscribe(s: Subscription) {
        Log.d("rxdemo", "onSubscribe")
    }
    override fun onNext(str: String) {
        Log.d("rxdemo", "get the $str")
    }
    override fun onError(t: Throwable) {
        Log.w("rxdemo", "onError: ", t)
    }
    override fun onComplete() {
        Log.d("rxdemo", "onComplete")
    }
})

对应的日志输出为:

RxJava入门及常用操作符(Kotlin版)
由日志不难看出其发挥巨细为128,也便是默许缓存数据为128个,上述代码宣布了128个Hellobuffer。假如这个时分咱们多宣布来一个会怎样?修正下for循环条件i in 0 until Flowable.bufferSize()+1。最终会得到成果:

RxJava入门及常用操作符(Kotlin版)
毫无意外,Subscriber并没有恳求处理数据,缓存现已爆满,外加配置的背压战略为BackpressureStrategy.ERROR,所以这儿会在缓存撑爆的状况下告诉Subscriber发生过错,调用ERROR,打印MissingBackpressureException。其他几种背压战略有爱好的可自行测验。

另一种调用背压战略的办法

看到这儿你或许会想,假如不运用create办法创立Flowable,而是用range、interval这些操作符创立,那怎么配置战略?对此,Rxjava提供了对应的办法来匹配相应的背压战略:onBackpressureBuffer()、onBackpressureDrop()、onBackpressureLatest()(看姓名就知道对应的战略啦),例如:

Flowable.range(0,100)
    .onBackpressureLatest()
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .subscribe(object : Subscriber<Int> {
    override fun onSubscribe(s: Subscription) {
        Log.d("rxdemo", "onSubscribe")
    }
    override fun onNext(num: Int) {
        Log.d("rxdemo", "get the $num")
    }
    override fun onError(t: Throwable) {
        Log.w("rxdemo", "onError: ", t)
    }
    override fun onComplete() {
        Log.d("rxdemo", "onComplete")
    }
})

其实,到这儿你会发现Rxjava的强壮之处,能随意切换线程,跟retrofit结合做网络恳求框架,能用timer做守时操作,用interval做周期性操作,乃至进行数组、list的遍历等。

RxBus

一种依据Rxjava完结事情总线的一种思想,可完美替代EventBus,相关代码参阅github.com/AndroidKnif…

RxBinding

首要与RxJava结合用于一些View的事情绑定,相关代码参阅github.com/JakeWharton…

内存走漏

Rxjava运用不当会造成内存走漏,在页面毁掉后,Observable仍然还有事情等待发送和处理(比方interval做周期性操作而没有停下来),这个时分会导致Activity回收失败,然后致使内存走漏。

解决办法:
运用Disposable,封闭页面时调用dispose()取消订阅;
运用CompositeDisposable,增加一组Disposable,在封闭页面时同时取消订阅。 也能够将其与Activity基类生命周期进行绑定,在毁掉时取消订阅。


本文仅作抛砖引玉之用,为后续解读RxJava源码作准备,故而没有奉上一切操作符的解说(文件太大了,不想水文),假如需求相关资源可私信我。