RxJava的本质上能够了解为一个异步操作库,能用十分简略的逻辑去处理繁琐的异步操作库。所以,它能在必定程度上代替项目中的handler、AsyncTask等等。有的时分,开源结构光是会用是不行的,想做进一步扩展就还得了解其原理。本文旨在梳理RxJava3源码流程,如有不当之处,请纵情指出。如需了解使用办法等,点此直达
调查者形式概念
调查者规划形式是RxJava中的一个核心规划形式。举个简单了解的比如,前段时间最火的世界杯,决赛阿根廷VS法兰西。
很多球迷熬夜看球。这时你能够把这场决赛当作被调查者,而咱们这些熬夜看球的球迷呢便是调查者,每逢梅西或者姆巴佩进球的时分,球迷就会欢呼,这就类似调查者形式中数据的改动。当然,梅西球迷看到姆巴佩进球不必定会有反响,就像姆巴佩球迷看到梅西进球相同,反响不会很大(调查数据不同)。
在APP开发进程中也会发生类似情况,假如你需求关心一个目标的数据,同时页面上的UI状态也会跟这个目标所绑定,则咱们在这个目标发生改动的时分,咱们就需求通知一切页面去做相应改动,这便是调查者形式。
再说个不是很恰当的比如,咱们关于微信公众号,当微信公众号发生消息推送的时分,咱们会去重视去看他,假如对其内容感兴趣,咱们会去点开它、阅览它。这也是一种调查者形式,所以能够说:假如A目标对B目标的数据十分灵敏,当B目标发生改动的一会儿,A目标要做出反响,这时分A目标便是调查者,B目标便是被调查者。同时,能够多个调查者对应一个被调查者,就像一个公众号能够有很多人重视相同。
调查者形式便是咱们众多的调查者对咱们的被调查者的数据高度灵敏改动之后的反响,这是一种多对一的联系,多个调查者对应一个被调查者。
下面是调查者形式的类图:
能够说,RxJava不属于传统意义上的调查者形式,实际上是属于一种扩展的调查者形式。(本文中的RxJava使用版本是3.0)
RxJava中的调查者形式
RxJava中的调查者形式少不了以下内容:1、被调查者;2、调查者;3、订阅。
一 、 被调查者 :
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext("something");
}
});
上述代码是一个最简略的被调查者,可见这一切的起始都是create(),咱们先看该办法的完结:
其间这一行代码:
Objects.requireNonNull(source, "source is null");
作用仅是非空判别,假如是空则抛出反常,
要害代码仍是在return那里:
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
在这儿RxJavaPlugins.onAssembly()是全局 hook 办法,为什么这样说呢,查看其他被调查者目标声明的办法时,不难发现终究仍是调用的这个办法,如:
所以,在这儿咱们要点重视此办法内部完结即可: 这儿onObservableAssembly变量默以为空,所以这儿作用便是直接把source参数回来出去。(假如我没记错的话,原RxJava1.0源码此办法内直接回来传入参数,2.X修改成如此)
所以需求重视的创立进程在传入RxJavaPlugins.onAssembly()的参数中,经过ObservableCreate目标生成的自界说source:
二 、 调查者 :
public interface Observer<@NonNull T> {
/**
* Provides the {@link Observer} with the means of cancelling (disposing) the
* connection (channel) with the {@link Observable} in both
* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
* @param d the {@link Disposable} instance whose {@link Disposable#dispose()} can
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(@NonNull Disposable d);
/**
* Provides the {@link Observer} with a new item to observe.
* <p>
* The {@link Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
void onNext(@NonNull T t);
/**
* Notifies the {@link Observer} that the {@link Observable} has experienced an error condition.
* <p>
* If the {@code Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onComplete}.
*
* @param e
* the exception encountered by the Observable
*/
void onError(@NonNull Throwable e);
/**
* Notifies the {@link Observer} that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@code Observable} will not call this method if it calls {@link #onError}.
*/
void onComplete();
}
源码中Observer是个接口。创立调查者便是创立这个接口的实例,而接口中的办法也便是直接暴露给用户。
三、订阅
这儿要要点重视Observable与Observer订阅的进程,都在subscribe()里。调用Observable的subscribe(),其办法源码如下:
前面两个Objects.requireNonNull()都是非空判别,这儿无需多言,可见要点是在subscribeActual()中。但这儿,此办法是个笼统办法:
毫无疑问,完结办法必定在ObservableCreate中(因为subscribe()前的目标是个Observable,上面非调查者的创立流程剖析后得出终究调用的是newObservableCreate)。
在此办法中,
CreateEmitter<T> parent = new CreateEmitter<>(observer);
生成一个发射器,传入参数是咱们自界说的调查者。
调查其源码会发现,CreateEmitter承继了ObservableEmitter接口,是ObservableEmitter的完结,其onNext()、onError()、onComplete()办法被调用时会调用调查者observer的同名办法。
而在创立新发射器之后、在source.subscribe(…)之前履行了observer.onSubscribe(parent);
此办法便是订阅的调查者接口中的onSubscribe()回调:
这也便是为什么在调查者的接口的几个办法中onSubscribe()会先履行的原因。随后的source.subscribe(parent);即自界说的source订阅了CreateEmitter,而这儿的自界说source即为ObservableOnSubscribe的实例,parent为CreateEmitter的实例。则咱们的代码中的被调查者的ObservableOnSubscribe的subscribe()会被回调:
两种调查者的差异
两点不同:
在标准的调查者规划形式中,是一个“被调查者”,多个“调查者”,而且需求“被调查者”宣布改动通知后,一切的“调查者”才干调查到;
在RxJava调查者规划形式中,是多个“被调查者”,一个“调查者”,而且需求起点和结尾在“订阅”一次后,才宣布改动通知,结尾(调查者)才干调查到。
map 操作符源码流程
map操作符能直接对发射出来的事情进行处理而且发生新的事情,然后再次发射,下面是一个很简略的代码比如:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
}
}).map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String s) throws Throwable {
return null;
}
}).subscribe(new Observer<Bitmap>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Bitmap bitmap) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
仍是了解的链式调用,了解的配方。仍是三个切入点:被调查者、调查者、订阅。其间被调查者构建流程已在上面讲过,这儿不赘述。终究仍是来到了map():
这儿调用的全局RxJavaPlugins.onAssembly(),可见要点在办法内的参数new ObservableMap<>(this, mapper) 这儿你会发现与咱们方才说的Observable有些类似,但也有不同,一些相同的揣度这儿就不说了。这儿.subscribe()终究仍是调用ObservableMap里边的subscribeActual()。而在subscribeActual()中又是对其Observer进行了一层包装: 而在咱们的代码中(注意,这儿不是源码),.map(…)之前的Observable.create(…): 会先经过ObseravbleCreate.subscribe()再调用subscribeActual(): 在这儿终究传入的observer目标是在ObservableMap中经过subscribeActual()包装后的目标。其onSubscribe()逻辑与Observable调用时序差不多。回调回代码中经过Function接口的apply()转化目标: 再接着建议Observer.onNext(),需求重视的点是onNext()流程。终究调用CreateEmitter发射器中的onNext()时,即observer.onNext(t),代码又会调入ObservableMap中的onNext(): 在此输出对应泛型并回调回去。
RxJava的操作符流程知晓Observable和map流程即可,其他操作符流程都可从中触类旁通。
线程切换
除了调查者形式,RxJava中另一个重要的核心点便是线程切换了。在RxJava中完结线程切换的主要是subscribeOn()和observerOn()。
subscribeOn 源码解析
作用一句话归纳,给subscribeOn()上面的代码分配线程。例如下述示例:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext("test");
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
代码很简略,这儿将发射器发射“test”事情丢到了io线程处理。这回咱们先看下Schedulers.io()中的源码。
又是了解的字眼,RxJavaPlugins。不难猜出这又是个全局hook办法,咱们调查下其完结办法:
这儿onIoHandler默以为null,实际上仍是回来经过apply()类型转化接口转化的defaultScheduler目标。此办法差不多究竟了,这时咱们回来上一级:
这儿的IO是个静态常量:
(这儿看出不仅仅是IO,Schedulers.线程常量都用staticfinal润饰了,在这个类被调用的时分就会初始化)
而这儿的办法RxJavaPlugins.initIoScheduler()也是个hook办法。
除掉榜首行的判别非空代码,和默以为null的onInitIoHandler。可揣度出终究回来的目标仍是defaultScheduler,即IOTask目标。
IOTask是个静态内部类,完结了Supplier接口且有回来值:
仍是一个静态内部类,终究回来的是IoScheduler()。相同的,假如不是IO战略,则对应其他的Schedulers.线程常量,终究回来的是各自的XXScheduler。
持续盯梢IoScheduler()。
顺藤摸瓜,持续盯梢this():
持续盯梢至start()里:
可看到很多线程池战略设置等,持续盯梢shutdown():
到这儿你会发现,不管是用的哪一种Schedulers战略,不管终究调用到哪一种XXScheduler(),终究都会调用到线程池。到这儿就能够了,假如要讲Java线程池的话,本文篇幅就不太够了。就此,Schedulers.io()解析完毕。
持续调查原代码的subscribeOn()
能够看到除了非空判别和hook办法,咱们要要点重视ObservableSubscribeOn()。
依据之前咱们对Observable(被调查者)和Observer(调查者)的源码流程的解析可知,ObservableCreate.subscribe()的完结办法是ObservableSubscribeOn()里的subscribeActual()。而subscribeActual()内的逻辑则是对传进来的调查者进行SubscribeOnObserver包装,然后将包装好后的目标传给SubscribeTask()。
此Task完结Runnable(),不难猜出这个目标是给线程池办理。持续看scheduler.scheduleDirect()。这儿的scheduler是传进来的Scheduler.IO()战略。这时咱们调查scheduleDirect():
此办法在Scheduler类中,IoScheduler是其子类。持续盯梢scheduleDirect():
createWorker()在这是个笼统办法,分布到详细子类中完结,例如IoScheduler中:
持续回到Scheduler的scheduleDirect()中,在createWorker()后,又是个hook办法:
在这儿相当所以把传入进来的Runnable包装了一层,然后交给PeriodicDirectTask():
而PeriodicDirectTask完结了Runnablehe、Disposable等接口,不难看出其间有中断操作。
而终究这个Task交给w.schedule()处理。而w,即worker目标,在战略是Scherdulers.IO()时,则是调用IoScheduler中的worker目标的schedule():
顺藤摸瓜,可看到终究调用的仍是JDK中的ScheduledExecutorService:
终究能够说,这几个办法的传递便是一个SubscribeTask使命不停的包装然后传递给线程池控制。现在回到subscribeActual()中来:
经过上述的揣度,可了解scheduler.scheduleDirect()作用是把new SubscribeTask(parent))这个包装好的使命放入特定的线程池办理中。
终究,订阅。
observeOn源码解析
作用一句话归纳,observeOn的作用是给observeOn()下面的代码分配线程。在源代码中咱们参加observeOn(AndroidSchedulers.mainThread())的设置:
调查AndroidSchedulers.mainThread()的内部完结:
持续调查onMainThreadScheduler():
能够看到又是了解的感觉,非空判别和默以为null的onMainThreadHandler,又是一个以传入值为准的hook办法。回来上一层,咱们要点调查常量MAIN_THREAD,能够看到其界说为:
回来的MainHolder.DEFAULT界说为:
可见,这儿经过new Handler(Looper.getMainLooper())传入HandlerScheduler()中的handler必定处于主线程状态(newHandler(…)这种做法不能避免用户在子线程进行new Handler的操作…)。
HandlerScheduler承继自Scheduler,那么其间必定也有createWorker():
而这儿创立的HandlerWorker中的handler是传入HandlerScheduler()中的必定主线程状态的handler。
而这个主线程handler又是在哪里起作用的呢?这儿要看HandlerWorker调用的HandlerScheduler()中的schedule()(详细怎样调用到此处,后边说observeOn的流程会提到):
同时在此办法内,主线程handler延时发送message。而在send Mesange之前,ScheduledRunnable(handler, run)也是将run丢到主线程履行。能够说AndroidSchedulers.mainThread()的作用便是拿到主线程的handler。
原代码中,咱们给observeOn()传入的参数也便是主线程的handler,接下来看observeOn的办法:
除掉之前常常见面的非空判别和hook办法,咱们调查ObservableObserveOn:
在其详细完结subscribeActual()中有一个scheduler是否属于TrampolineScheduler的判别,作用跟Schedulers.trampoline()战略相同,便是判别当前是否改动了Scheduler战略。假如属于TrampolineScheduler便是没有改动Scheduler战略,则照旧订阅流程,履行source.subscribe(observer)。假如发生了改动,则进行createWorker()等操作。咱们持续调查ObserveOnObserver,可看见onNext()等办法声明:
其间都有一个schedule(),咱们持续调查schedule():
此shedule调用的是HandlerScheduler中的schedule(),至此完结闭环。
注意这儿的一个this,阐明此类完结了Runnable():
那么此类必定覆写了run(): 而终究HandlerScheduler的schedule()经过主线程的handler履行了此类中的run(),outputFused默认false,代码履行到drainNormal():
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
... //省略
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
可看到终究代码经过a.onNext(v)将成果传递出去,而HandlerScheduler.schedule()则将成果事情切换到主线程,终究的成果在主线程中传递。
总结
1、Worker便是RxJava完结线程切换的要害,以ObserveOn为例,在履行subscribe时,会创立Woker并传入ObserveOnObserver,在终究履行onNext等回调时,会调用Worker的schedule办法来切换线程。
2、咱们知道subscribeOn只有榜首次调用起作用,而ObserveOn每一次调用都起作用。那是因为每一次调用subscribeOn其实便是在包装一次observer与observable,不管包装多少次,都会以最里边一层也便是榜首次调用subscribeOn那一层为主,所以只有榜首次起作用。而ObserveOn是在subscribe后包装了observer,在observer的onXXX()的schedule()中进行的线程转化,所以每一次调用都有作用。