本文以下面代码为例逐步解析
Observable.just("数据源")
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return 1;
}
})
.filter(integer -> {
return integer == 1;
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
从just开端
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
回来了一个将传入的参数封装成了一个 ObservableJust目标 其他的Rxjava创立操作符类似:比如create(), just(),fromArray(),fromIterable(),timer(),interval()等
ObservableJust类
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
private final T value;
public ObservableJust(final T value) { //将传入的参数赋值给value
this.value = value;
}
//要点办法 稍后看
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
observer.onSubscribe(sd);
sd.run();
}
@Override
public T call() {
return value;
}
}
map办法
因为just办法回来了一个ObservableJust目标,所以调用链的map办法调用的ObservableJust目标的map办法 可是咱们看到ObservableJust类中并没有map办法,所以去看他的父类Observable
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
在他的父类Observable中看到,map()依然是回来了一个ObservableMap目标,这个目标将当时目标(也便是上一步的ObservableJust目标)和map()传入的参数一起封装了起来 从上面的调用链来看便是这一段代码:
ObservableMap类
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source); //这儿的source也便是上一步的ObservableJust目标
this.function = function; //这儿的function便是map便是map()传入的参数
}
//这个办法相同待会剖析
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
}
这时候发现ObservableMap和上面的ObservableJust类相同,都完成了subscribeActual()
filter办法
接着持续剖析调用链上的办法filter,相同咱们去ObservableMap父类里去找这个办法,他的父类AbstractObservableWithUpstream里面没有这个办法,可是AbstractObservableWithUpstream跟ObservableJust相同承继自Observable
public final Observable<T> filter(Predicate<? super T> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
}
看到没 filter和前两个办法还是相同的套路,回来了一个ObservableFilter目标,不出意外这个ObservableFilter里面肯定也有一个subscribeActual办法,并且也是直接或许直接承继自Observable
public final class ObservableFilter<T> extends AbstractObservableWithUpstream<T, T> {
final Predicate<? super T> predicate;
public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) {
super(source);
this.predicate = predicate;
}
@Override
public void subscribeActual(Observer<? super T> observer) {
source.subscribe(new FilterObserver<T>(observer, predicate));
}
static final class FilterObserver<T> extends BasicFuseableObserver<T, T> {
final Predicate<? super T> filter;
FilterObserver(Observer<? super T> actual, Predicate<? super T> filter) {
super(actual);
this.filter = filter;
}
@Override
public void onNext(T t) {
if (sourceMode == NONE) {
boolean b;
try {
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return;
}
if (b) {
downstream.onNext(t);
}
} else {
downstream.onNext(null);
}
}
}
一模相同的套路
subscribeOn
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> downstream;
final AtomicReference<Disposable> upstream;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);
}
@Override
public void onNext(T t) {
downstream.onNext(t);
}
@Override
public void onError(Throwable t) {
downstream.onError(t);
}
@Override
public void onComplete() {
downstream.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(upstream);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
}
这个subscribeOn用于切换上游线程: 主要是这一句parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); scheduler便是咱们传入的Schedulers.io(),上面代码能够看到SubscribeTask是一个Runnable,run()里调用的sourcesource.subscribe(parent),还记得source吗,source便是调用链上一步回来的目标,也便是上一步的 ObservableFilter; 去看看Schedulers.io()回来的是个什么类
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
看到他回来的是一个Scheduler,去Scheduler中找scheduleDirect
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
持续往下追踪会发现终究将这个Runable通过各种封装,终究提交到一个线程池(ScheduledExecutorService)中去履行使命,这样就完成了SubscribeOn上游数据源代码的线程切换
至于下流代码线程切换来看ObserveOn
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
}
ObserveOnObserver类:
mplements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> downstream;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
SimpleQueue<T> queue;
Disposable upstream;
Throwable error;
volatile boolean done;
volatile boolean disposed;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) d;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
downstream.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
downstream.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
downstream.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
@Override
public void dispose() {
if (!disposed) {
disposed = true;
upstream.dispose();
worker.dispose();
if (!outputFused && getAndIncrement() == 0) {
queue.clear();
}
}
}
@Override
public boolean isDisposed() {
return disposed;
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
void drainFused() {
int missed = 1;
for (;;) {
if (disposed) {
return;
}
boolean d = done;
Throwable ex = error;
if (!delayError && d && ex != null) {
disposed = true;
downstream.onError(error);
worker.dispose();
return;
}
downstream.onNext(null);
if (d) {
disposed = true;
ex = error;
if (ex != null) {
downstream.onError(ex);
} else {
downstream.onComplete();
}
worker.dispose();
return;
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (disposed) {
queue.clear();
return true;
}
if (d) {
Throwable e = error;
if (delayError) {
if (empty) {
disposed = true;
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
if (e != null) {
disposed = true;
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
disposed = true;
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
@Override
public int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
outputFused = true;
return ASYNC;
}
return NONE;
}
@Nullable
@Override
public T poll() throws Exception {
return queue.poll();
}
@Override
public void clear() {
queue.clear();
}
@Override
public boolean isEmpty() {
return queue.isEmpty();
}
}
不想看代码直接总结,从ObserveOnObserver类中发现他的onSubscribe,onNext,onError,OnNext办法都调用了schedule(),追踪schedule()发现,终究相同是把使命交给了线程池处理,在本例子中因为传递的是AndroidSchedulers.mainThread(),所以下流是切换到主线程履行,这儿是用了Handler将使命提交给主线程
final class HandlerScheduler extends Scheduler {
private final Handler handler;
private final boolean async;
HandlerScheduler(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}
@Override
@SuppressLint("NewApi") // Async will only be true when the API is available to call.
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
if (async) {
message.setAsynchronous(true);
}
handler.sendMessageDelayed(message, unit.toMillis(delay));
return scheduled;
}
}
终于到了终究一步suscribe
这儿调用的是Observable的subscribe
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);//要点看这儿
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
看到subscribeActual()办法没,原来subscribe()里会调用subscribeActual;
现在往回追溯:
在subscribe办法中会调用当时目标的subscribeActual(),所以往回追溯他首先会去调ObservableObserveOn的subscribeActual(),参数便是终究传入的Observer
回忆一下ObservableObserveOn的subscribeActual()
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
持续将Observer封装成ObserveOnObserver,然后调用source.subcribe(),source还记得吧,便是调用链上一步的回来的目标,也便是ObservableSubscribeOn,这个类没有完成subscribe,可是他的父类有这个办法,那不便是Observable的subcribe()吗?是的,也便是跟调用链终究一步调用的subcribe()是同一个办法,只不过他的参数是基于下流的参数的进一步封装,那么相同我他会调用到susscribeActual()
@Override
public void subscribeActual(final Observer<? super T> observer) { //这儿的Observer便是将下流封装后的Observer
//将oberser持续封装
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
//通过方才的剖析 这儿是将使命交给线程池处理,所以去看SubscribeTask的run()
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
// SubscribeTask的run()
@Override
public void run() {
source.subscribe(parent); //相同持续调用source.subscribe,那么他也是同意是调用到调用链上一步回来目标的subscribeActual(),,也便是ObservableFilter目标目标
}
不出意外ObservableFilter目标里也是将Observer持续封装,然后调用source.subscribe
@Override
public void subscribeActual(Observer<? super T> observer) {
source.subscribe(new FilterObserver<T>(observer, predicate));
}
现在来到了第一步ObservableJust的subscribeActual():
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value); //将Observer和value进行封装,value便是咱们第一步传入的数据源了
observer.onSubscribe(sd);
sd.run();
}
//ScalarDisposable的run办法
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
observer.onNext(value); //这儿开端把数据源往下流传, value指数据源 observer便是下流一步一步封装的Observer啦
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
还记得回溯时封装的那些Observer吗?别离是MapObserver,FilterObserver,SubscribeOnObserver,ObserveOnObserver以及调用链上终究一步咱们自己自定义的Observer 别离再看他们的onNext(),其他办法套路一致
MapObserver:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
//处理数据源,将数据源转换成想要的类型
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
// 持续调用下流Observer的onNext
downstream.onNext(v);
}
FilterObserver
@Override
public void onNext(T t) {
if (sourceMode == NONE) {
boolean b;
try {
// 数据判别
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return;
}
//满足过滤条件持续调用下流onNext
if (b) {
downstream.onNext(t);
}
} else {
downstream.onNext(null);
}
}
SubscribeOnObserver
因为subscribeOn只是起到切换上游线程的效果,所以对下流他不做任何操作,持续调用下流的onNext
@Override
public void onNext(T t) {
downstream.onNext(t);
}
ObserveOnObserver:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule(); //切换下流线程,将使命交给线程池或许主线程handler,然后调用下流onNext
}
终究就调到咱们自定义的onNext()啦,整个流程就完毕了
总结一下
Rxjava的链式调用整个流程便是从下到上,由上而下 每一步的操作符都是将上游目标作为source封装成新的Observable,然后持续往下传递,直到终究的subsribe办法反向开端调用source.subscribe然后调用到每个soource目标的subscriActual(),每一步的subscribActual()又会将下流传递来的Observer一步步封装,直到传递到最上游,在最上游开端再一步步调用封装好的Observe的相关办法,这样就完成了将数据源传递到下流。
切换上游线程: 创立一个Task,承继自Runable,在Runable的run()里调用source.subscribe(),然后将这个Runable进一步封装,根据传递的参数创立对应的线程池或许主线程Handler,将Runable提交给线程池或许Handler去履行
切换下流线程: 封装的Observer的onSubscribe,onNext,onError,OnNext办法都调用了schedule(),追踪schedule()发现,终究相同是把使命交给了线程池处理,在本例子中因为传递的是AndroidSchedulers.mainThread(),所以下流是切换到主线程履行