我正在参与「启航方案」
前语
Rx
是一种响应式编程的思想,现在有很多语言都支持这种思想:RxJava
,RxJs
,RxSwift...
。它是根据特别的观察者方式来完结的,能够轻松的完结异步事件响应流,避免回调阴间的发生。这种思想和咱们平常的编程思想不太相同,它以数据流为中心,处理数据的输入,处理以及输出,因而这个结构学习起来是有一定难度的,加之 RxJava
操作符很多,这又给初学者立了个下马威。
本篇博客会 剖析其原理与思想 ,从源码层次对其进行深入剖析。
✔️ 本文阅读时长约为:10min
本篇博客适合已经会简略运用
RxJava
结构并想了解其原理与思想的同学~
RxJava的观察者方式
在 规范的观察者规划方式 中,一般都是一个被观察者,多个观察者。例如,在抖音中A
,B
,C
,D
四人都重视了一个博主 甲
。那么当博主 甲
发布新视频时,甲会去告诉A
,B
,C
,D
四人,告知他们我已经发布了新的视频,你们现在能够前去观看。
而 RxJava的观察者规划方式 与前者有少许差异,我的理解是在 RxJava
中只要一个被观察者和一个观察者,被观察者便是数据源也便是 起点
,而观察者便是 结尾
。起点
到 结尾
这一段进程做什么作业由咱们自己定义,像加卡片相同往里堆,但一向只要一个 起点
和一个 结尾
,这也便是俗称的卡片式编程。
举个栗子:
咱们的起点是发起网络恳求得到一张图片,结尾是得到一张加了 水印 和 高斯含糊 的图片。那么咱们就需要在 起点
到 结尾
这一进程中往里边加两张卡片。一张卡片是加水印的功用,另一张卡片是加高斯含糊的功用,终究咱们会在 结尾
得到咱们的想要的效果,这便是 RxJava
的一个进程,咱们看是不是比 规范观察者方式 的耦合度更低呢?
现在结合代码来看一下呢:
// 示例:创立Observable
// create办法中的参数是咱们自定义的source,下文会说到
Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
//----------------------------------被观察者--------------------------------------
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
// 数据源(起点),假定咱们这儿得到了一张图片,开端向下传递
e.onNext();
}
//-----------------------------------卡片1----------------------------------------
}).map(new Function<Bitmap, Bitmap>() {
// 1.加水印的卡片
@Override
public Bitmap apply(Bitmap bitmap) throws Exception {
// TODO 这儿咱们将图片加上水印
}
//-----------------------------------卡片2----------------------------------------
}).map(new Function<Bitmap, Bitmap>() {
// 2.加高斯含糊的卡片
@Override
public Bitmap apply(Bitmap bitmap) throws Exception {
// TODO 这儿咱们将图片加上高斯含糊
}
//-----------------------------------观察者---------------------------------------
// 订阅被观察者
}).subscribe(new Observer<Bitmap>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(Bitmap bitmap) {
// 终究获得加了水印和高斯含糊的图片
// do what you want...
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
});
咱们能够看到,RxJava
链式调用后在结尾能得到咱们想要的成果,咱们对 Bitmap
的两个操作以 卡片 方式加到了 起点
和 结尾
的进程中,这便是 RxJava
的魅力之一。
了解 RxJava
大体的履行流程后,现在咱们就从源码视点来解说整个结构内部逻辑是怎样工作的。
RxJava怎么将事件逐渐传递
首要咱们先来看看这些操作符内部逻辑是什么姿态的,这儿以 create
和 map
为比方展现:
1️⃣. create
// Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
// 常规空查看
ObjectHelper.requireNonNull(source, "source is null");
// 这儿是个hook,其实直接放回的是 ObservableCreate
// 回来的其实便是咱们传进来的参数
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
2️⃣. map
// Observable.java
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
// 常规空查看
ObjectHelper.requireNonNull(mapper, "mapper is null");
// 这儿是个hook,其实直接放回的是 ObservableMap
// 回来的其实便是咱们传进来的参数
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
从上面的比方中能够看出,这两个操作符不管是结构仍是逻辑简直是如出一辙,仅仅 onAssembly
办法中传递的参数不同(其实也能够说是相同的,他们都是 Observable
的子类)。实践上,简直所有的操作符都是这样的结构,会在内部 new ObservableXXX()
。
为什么咱们的操作符都能链式调用呢?这就跟上面说到的特点有关了,由于简直所有操作符都是上述结构,回来值的都是 ObservableXXX
,而他们都是 Observable
的子类,所以咱们能一向经过 .XXXX
的方式调用 Observable
中的操作符,一向往里边加 卡片 来完结咱们的需求。
⚡⚡ 那么,ObservableXXX类有什么用呢?
1.ObervableXXX的全体知道
其实这些 ObservableXXX
就像工厂中的一个个流水线拼装货品的机器,货品从 起点
向 结尾
传输,当货品传输到不同的机器上时,这台机器会操作货品完结这一层的拼装作业,终究会将其传输到下一个机器上去,完结那个机器的拼装使命,直至传输到 结尾
整个货品拼装结束。
回头看上面的比方,咱们参加的两张卡片就像是两台机器相同,map
机器完结水印作业后,将 货品 扔给后一个 map
机器,由它完结高斯含糊作业,终究 货品拼装完结,被传输到了结尾,在这儿咱们就能拿到终究期望的成果。
接下来我会举两个比方,带着咱们看看它们的内部逻辑。由于简直所有的 ObservableXXX
的逻辑和功用都相似,因而咱们在看完下面几个比方后,能够自行去源码中轻松查看你想看的部分。
2.ObservableMap内部工作逻辑
这样一说信任咱们对它有了一个全体的感受,那么咱们现在以 ObservableMap
为例,要点解说一下内部逻辑吧。
⚡ 在解说源码之前,咱们需要先知道一个点:代码傍边的 source
指的都是上一个操作符回来的 Observable
目标,function
指的是调用此操作符需要传递的参数,即匿名完结类,别的下文还会呈现 自定义source
,这个便是咱们在运用 create操作符
时传递的参数,也称 顶层source
,它是货品运送的 起点
。
// ObservableMap.java
// Observable承继自AbstractObservableWithUpstream,AbstractObservableWithUpstream又承继Observable
// Observable完结了ObservableSource接口
// 因而这儿会重写 subscribe 办法
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
// source 是上一层操作符回来的Observable目标
// function 是咱们运用此操作符传递的那个匿名完结类,<能够称其为这一个机器需要做的操作>
// 在本类中其实便是完结了Function接口的匿名类,里边有个办法叫作apply
// <本层机器处理货品时>,会看到apply办法
super(source);
this.function = function;
}
// 此办法是 subscribe 的详细完结办法
// 咱们进行订阅时,即调用subscribe时,终究会调用此办法,<终究告诉起点开端传输货品>
@Override
public void subscribeActual(Observer<? super U> t) {
// t代表咱们调用subscribe时传递的参数,即匿名完结类,<咱们能够称其为后一台机器要履行的操作>
// 将t和function包装一下,再调用 source 的 subscribe,<也便是前面那台机器的subscribe>
// 这儿的subscribe能够理解为告诉上层机器开端运送货品
source.subscribe(new MapObserver<T, U>(t, function));
}
// 内部类,<将后一台机器履行的操作和上一个机器绑定起来>
// 便是将后一个机器的操作做了层封装,包裹了一层
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
// <此办法是货品抵达本机器时,本机器的相应操作>
@Override
public void onNext(T t) {
// 这些判空直接不看
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
// 这儿会履行apply办法,是咱们运用map操作符传递的完结了Function接口的匿名内部类中的办法
// <本层机器开端处理上层机器传递过来的货品>
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
// 本层机器的货品处理结束,扔给基层机器持续处理
// <actual代表基层机器>
actual.onNext(v);
}
}
总的来说 ObservableMap
做了两件事,第一便是调用 subscribeActual
告诉上层机器让其开端传递货品,第二便是调用 onNext
处理传递到本层的货品,终究再扔给下一个机器去处理。
其实 ObservableXXX
结构和逻辑都差不太多,我将最开端给的比方用图表示了出来:
信任咱们能够从这张图中看出,整个工作逻辑是一个 U型结构,当咱们调用 subscribe
时,就会告诉 起点
运送货品,其实也便是设置了观察者。然后,货品就会一层一层往下传递,终究流到咱们的结尾。假如咱们中途想持续添加需求,直接往中心那个进程添加机器(或者说是卡片)就行了,非常便利,并且整个链式调用下来,代码会比不用 RxJava
显得愈加干净整齐易读。
3.ObservableCreate内部工作逻辑
那么起点是怎么接纳下面的 机器
传递上来的运送货品的信号并开端运送货品的呢?
public final class ObservableCreate<T> extends Observable<T> {
// 咱们自己写的source
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
// 这个source是咱们自定义的,也便是运用create操作符时传递的那个参数
// 这便是咱们的所说的起点、源头
this.source = source;
}
// 这个便是起点触发传送使命的办法
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 这个observer是从下面机器经过一层层包装后传递过来的observer
// 实践上调用的是咱们运用subscribe传递参数傍边的onSubscribe办法,即结尾的onSubscribe办法
observer.onSubscribe(parent);
// 开端传输使命,将货品往下传递
try {
// 这儿的source便是顶层的自定义source
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
// 这个便是发射器,用于将货品从起点往下传递的东西
// 一般情况下,咱们在自定义source中都会调用此类的onNext办法开端向下传输货品
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
// 向下传输货品的办法
@Override
public void onNext(T t) {
...
if (!isDisposed()) {
// 将货品扔给下一层机器去处理
observer.onNext(t);
}
}
...
}
...
}
至此,两个典型的 ObservableXXX
解说结束,可能咱们对源码傍边的 source
和 subscribeActual
中的参数 observer/t
傻傻分不清楚,不知道这些是指 前一个机器 ,仍是 后一个机器 ,那么接下来请看下面这幅图。
source 是前一个操作符回来的 Observable
目标,这个信任咱们都简单理解。这儿着重说一下上述图的下半部分,咱们还记得结尾是怎么告诉起点开端传输 货品
的吗?没错,是经过每一个
Observable.subscribeActual
办法 (机器)中的参数不断向起点传递,每个 subscribeActual
办法中都会调用 source.subscribe()
,也便是告诉前面机器传输包裹,每向前传递一次就 封一层包裹 ,终究触发 起点
传输货品,之后就开端一层层 拆包裹 ,终究拆到结尾(subscribe)中咱们自己完结的 ObservableOnSubscribe
中。
小结
为了便于咱们理解前面所叙述的知识点,这儿咱们用文章最初的那个比方持续深入解说,咱们来看上述代码是怎么履行的,再贴一遍代码:
Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
e.onNext();
}
}).map(new Function<Bitmap, Bitmap>() {
@Override
public Object apply(Bitmap bitmap) throws Exception {
// todo
}
}).map(new Function<Bitmap, Bitmap>() {
@Override
public ObservableSource<?> apply(Bitmap bitmap) throws Exception {
// todo
}
}).subscribe(new Observer<Bitmap>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(Bitmap bitmap) {}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
});
1️⃣ 咱们经过 .subscribe()
,发起了订阅,开端观察被观察者,结合咱们之前讲的,咱们实践上调用的是map
回来目标中的 subscribe
办法,即 ObservableMap
。
// ObservableMap.java
//它会持续调用上层机器的subscribe,告诉上层机器传输货品
public void subscribeActual(Observer<? super U> t) {
// 封包裹,传递给上层机器
source.subscribe(new MapObserver<T, U>(t, function));
}
2️⃣ 这儿它会持续调用上层机器的 subscribe
,由于上个操作符仍是 map
,所以咱们直接越过,看 create
操作符。
// ObservableCreate.java
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 这儿是调用下一层机器的onSubscribe办法
observer.onSubscribe(parent);
try {
// 调用咱们自定义的soruce,开端传输货品
// 比方中咱们自定义soruce中写的代码是emitter.onNext,开端传输货品
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
3️⃣ 到这儿,咱们已经告诉 起点
发起传送使命,那么接下来便是传输货品的进程,逐渐 拆包 的进程。跟着货品的运送,代码会不断调用最外层包裹中的 observable
目标的 onNext
办法,看起来就像在拆包裹相同,这其实也是前面说的运送包裹的进程,如此重复,终究抵达咱们的结尾的 onNext
办法,咱们拿到终究期望的货品,至此,整个进程根本结束。
✔️ 现在是不是脑海里边又呈现了前面所说的 U型结构 呢?
RxJava是怎么完结线程切换的?
RxJava
能完结线程切换是经过 subscribeOn
,observeOn
。正常情况下咱们都是这么用的:
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(...)
前面重复强调整个结构的履行进程是一个 U型结构,请咱们考虑一下,subscribeOn
和observeOn
切换线程的中心代码是在 subscribeActual
中仍是 onNext
中?
回忆一下,咱们告诉 起点 开端传输货品时,是经过 subscribeActual
逐层往上传递的,紧接着就开端运送货品处理货品了,就比方 map
机器中的 apply
办法便是处理货品的详细办法。因而,这一段使命都应该在 子线程 中完结,所以 subscribeOn的切换线程的中心代码是在subscribeActual中的。
observeOn
的作用便是让结尾的 onNext
办法中的代码在主线程中履行,那么理应只需要在 observeOn操作符中的onNext中切换线程即可。
有了上述猜想,咱们现在进入源码验证一下。
1.subscribeOn的线程切换
// 不仅有io线程,还有以下这些线程
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
咱们首要来看看运用该操作符时传递的参数 Schedulers.io()
到底是个什么东西。咱们一层层点进去终究会发现它其实便是 new了一个线程池 :
// 一路点进去,会发现其实便是new了一个线程池
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
其间的 CacheWorkerPool
的源码如下:
// IoSchedule.java,这个类将线程池中的创立、调度、履行等办法做了一下封装
...
// 这个办法便是得到线程池
public Worker createWorker() {
// EventLoopWorker承继自Worker,是对线程池做了一下封装,包裹了一层
return new EventLoopWorker(pool.get());
}
// 实践调用线程池的办法
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
// 里边便是调用线程池的submit等办法
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
...
// CacheWorkerPool是IoSchedule中的内部类
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
// 这儿的中心线程数是1就足够了,由于咱们子线程中的使命都是链式调用的
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime,
this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
...
所以,其实 .io()
便是传了一个中心线程数为1的线程池,当然咱们还能够传入 .computation()
,.newThread()
等办法,这些其实都是创立了一个特别的线程池。
咱们知道,这些操作符都是在内部 new ObservableXXX类,所以源码咱们直接看ObservableSubscirbeOn
。
// ObservableSubscribeOn.java
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
// 这个是调度器,便是对线程池封装了一下
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
// 咱们要点看在这个办法中是怎么完结线程切换的
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
// onSubscribe的回调
s.onSubscribe(parent);
// subscribeTask实践上是一个runnable
// 调用scheduler.scheduleDirect(runnable),将runnable放入线程池中去履行
// 这儿是重中之重,详细细节咱们稍后解说
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
...
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
// 上面那个runnable其实便是这个
// 这儿是调用上层机器的subscirbe,告诉起点开端传输货品
source.subscribe(parent);
}
}
}
从上面咱们能够知道,咱们将 source.subscribe()
扔进了线程池中,那么就做到了在其之后的所有代码都是在子线程中履行了,直到代码走到 observeOn
,在 ObservableObserveOn
中的 onNext()
办法将线程切回主线程。
接下来咱们讲讲详细细节,看看 scheduleDirect
里边的逻辑。
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// createWorker()办法眼熟吗?这便是在IoSchedule傍边的办法
// Worker类本质上便是线程池,仅仅对其做了一层封装
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 将传进来的runnable再做一层封装,这儿的runnable对应上面的比方便是source.subscribe
DisposeTask task = new DisposeTask(decoratedRun, w);
// 这个办法也眼熟吧?
// 在线程池中履行此使命
w.schedule(task, delay, unit);
return task;
}
由上面这一进程咱们能够知道,当告诉到 本层的机器
传输货品时,这一层机器会将 subscribeActual
办法扔进线程池中去履行,即后续的代码都是运行在子线程中了。
那么它又是怎么将线程切回来的呢?
2.observeOn的线程切换
理解了上面线程切换后,咱们很简单能理解下面切换主线程的代码,现在直接进入 ObservableObserveOn
源码。
// ObservableObserveOn.java
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 同样能够理解为对线程的封装,只不过这儿内部是handler
Scheduler.Worker w = scheduler.createWorker();
// 同样是封包裹,把自己封装一层,扔给前面的机器
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
...
// 要点重视onNext办法,切回主线程的代码都在这儿面
@Override
public void onNext(T t) {
if (done) {
return;
}
...
// 切换线程的办法在这儿面,内部调用的是HandlerWorker的schedule
// 里边又调用的是HandlerSchedule的schedule
schedule();
}
当 起点 开端传输货品时,即开端逐层拆包裹,这时候会逐层调用 onNext
,终究就会在 ObservableObserveOn
中的 onNext
办法中切换线程。而在 onNext
中又调用的是 HandlerSchedule
中的 schedule
办法进行终究的线程切换。
// HandlerSchedule.java
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
...
// 这儿的run便是下一层机器的onNext办法,比方中便是结尾的onNext
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
// 调用Handler,完结终究的线程切换
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// 假如dispose,就中止RxJava后续流程,整个运送带不会再作业了
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
画个图做个总结,让咱们理解更明晰一点:
写在终究
篇幅很长,能看到终究很不简单,给自己一个 大大的赞 吧!
假如觉得写的不错,就给个赞再走吧~
创造实属不易,你的肯定是我创造的动力,下次见!
假如本篇博客有任何过错的当地,请咱们批评指正,不胜感激。