敞开生长之旅!这是我参与「日新方案 12 月更文挑战」的第33天,点击检查活动概况
1.装修者形式
- 装修者形式时在保存原有结构的前提下增加新的功用,这些功用作为其原有结构的包装。
2.RxJava的装修者形式
1.被观察者Observable
- 依据
Observerable
的源码可知Observable
的结构接口是Observerablesource<T>
,里边有一个办法subscribe
用于和观察者完成订阅,源码如下
/**
* Represents a basic, non-backpressured {@link Observable} source base interface,
* consumable via an {@link Observer}.
*
* @param <T> the element type
* @since 2.0
*/
public interface ObservableSource<T> {
/**
* Subscribes the given Observer to this ObservableSource instance.
* @param observer the Observer, not null
* @throws NullPointerException if {@code observer} is null
*/
void subscribe(Observer<? super T> observer);
}
- 然后需求一个包装类,便是完成
ObservableSource
接口的类,便是Observable<T>
,它完成了ObservableSource
并在subscribe办法中调用了subscribeActual
办法与观察者完成订阅联系,源码如下
public abstract class Observable<T> implements ObservableSource<T> {
@Override
public final void subscribe(Observer<? super T> observer) {
...
subscribeActual(observer);
...
}
protected abstract void subscribeActual(Observer<? super T> observer);
}
- 第三步便是包装类了,包装类有很多有一百多个,如
ObservableAll
、ObservableAny
、ObservableCache
2.观察者Observer
- 第一步,
Observer
的结构的接口有Emitter
和Observer
,两个接口中的办法差不多,都是onNext
、OnError
、OnComplete
,用于被观察者的回调 - 第二步,完成
Emitter
或许Observer
接口的包装类,观察者中没有完成这两个接口的根底包装类,而是直接封装了很多包装类
3.被观察者和观察者的包装类有在创立的时分进行包装也有在调用的时分包装,那么他们的结构又是怎么样的
以RxJava的最根底用法来剖析,Observable.create().subscribeOn().observeOn().subscribe()
为例,层层调用后它的结构如下:
- 首先是
Observable.create
,经过创立ObservableCreate
目标进行第一层包装,把ObservableOnSubscribe
包在了里边
- 然后是
Observable.create().subscribeOn()
,调用时又进行了一层包装,把ObservableCreate包进去了
- 再然后就分别是
observeOn()
了,结构如下
- 总共进行了4层包装,能够理解为每调用一次操作符就会进行一层被观察者的包装,这样包装的好处便是为了增加额定的功用,那么每一层又增加了哪些额定的功用呢
4.被观察者的subscribe
办法
调用subscribe
办法后会从最外层的包装类一步一步的往里边调用,从被观察者的subscribe
办法中能够得知额定功用的完成是在subscribeActual
办法中,那么上面几层包装的subscribeActual
办法中又做了什么呢,剖析如下
- 先看最外层的包装
observerOn
的subscribeActual
办法做了什么,先看源码:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
...
}
- 源码中有一个
source
,这个source
是上一层包装类的实例,在source.subscribe()
中对观察者进行了一层包装,也便是ObserveOnObserver
,它在onNext
办法里边完成了线程切换,这个onNext
是在被观察者在通知观察者时会被回调,然后经过包装类完成额定的线程切换,这儿是切换到了主线程履行。此时观察者的结构如下:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
- 再看下一层的包装
subscribeOn
的subscribeActual
办法做了什么,先看源码
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
...
}
这儿又对观察者进行了一层包装,也便是SubscribeOnObserver
,这儿边的额定功用便是资源开释,包装完后的结构如下
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
...
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
在subscribeActual
办法中有一个调用是source.subscribe(parent)
,这个source
便是它的上一层的包装类ObservableCreate
,那么ObservableCreate
的subscribeActual
办法就会在子线程履行。
ObservableCreate
的subscribeActual
办法做了什么,先看源码
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
...
}
源码中的source
便是创立最原始的ObservableOnSubscribe
,这儿会回调到ObservableOnSubscribe
的subscribe办法
,在subscribeActual
办法中又对观察者进行了一层包装也便是CreateEmitter
,这个类里边做的工作是判断线程是否被开释,假如开释了则不再进行回调,这时分结构如下图
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
这儿因为上面的包装类现已切换到了子线程所以ObservableOnSubscribe
的subscribe
办法的履行也是在子线程;
3.总结
在创立被观察者的时分会对被观察者进行层层的包装,创立几回就包装几回,然后在被观察者调用subscribe
办法时,一层层回调被观察者的subscribeActual
办法,而在被观察者subscribeActual
办法中会对观察者做一层包装。也便是说被观察者是创立的时分包装,在subscribeActual
办法中完成额定的功用,观察者是在被观察者调用subscribeActual
办法时进行包装的,然后针对观察者完成自己的额定的功用,流程图如下:
终究的结构如下:
- 第一步:创立被观察者时或许运用操作符时会对被观察者进行包装
- 第二步:当被观察者和观察者产生订阅联系后,被观察者会一层层的回调被观察者的
subscribeActual
办法,在这个办法中对观察者进行包装,此时被观察者的功用完成是在subscribeActual
中,观察者的完成是在包装类里
- 第三步:被观察者和观察者不同的是,被观察者是在订阅成功后就履行了包装类相应的功用,而观察者是在事情回调的时分,会在观察者的包装类里完成相应的功用
- 终究流程图