敞开生长之旅!这是我参与「日新方案 12 月更文挑战」的第33天,点击检查活动概况

1.装修者形式

  • 装修者形式时在保存原有结构的前提下增加新的功用,这些功用作为其原有结构的包装。

2.RxJava的装修者形式

1.被观察者Observable

  • 依据Observerable的源码可知Observable的结构接口是Observerablesource<T>,里边有一个办法subscribe用于和观察者完成订阅,源码如下
/**
 * Represents a basic, non-backpressured {@link Observable} source base interface,
 * consumable via an {@link Observer}.
 *
 * @param <T> the element type
 * @since 2.0
 */
public interface ObservableSource<T> {
    /**
     * Subscribes the given Observer to this ObservableSource instance.
     * @param observer the Observer, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(Observer<? super T> observer);
}
  • 然后需求一个包装类,便是完成ObservableSource接口的类,便是Observable<T>,它完成了ObservableSource并在subscribe办法中调用了subscribeActual办法与观察者完成订阅联系,源码如下
public abstract class Observable<T> implements ObservableSource<T> {
	@Override
    public final void subscribe(Observer<? super T> observer) {
       	...
         subscribeActual(observer);
        ...
    }
    protected abstract void subscribeActual(Observer<? super T> observer);
}
  • 第三步便是包装类了,包装类有很多有一百多个,如ObservableAllObservableAnyObservableCache

RxJava装饰者模式

2.观察者Observer

  • 第一步,Observer的结构的接口有EmitterObserver,两个接口中的办法差不多,都是onNextOnErrorOnComplete,用于被观察者的回调
  • 第二步,完成Emitter或许Observer接口的包装类,观察者中没有完成这两个接口的根底包装类,而是直接封装了很多包装类

RxJava装饰者模式

3.被观察者和观察者的包装类有在创立的时分进行包装也有在调用的时分包装,那么他们的结构又是怎么样的

以RxJava的最根底用法来剖析,Observable.create().subscribeOn().observeOn().subscribe()为例,层层调用后它的结构如下:

  • 首先是Observable.create,经过创立ObservableCreate目标进行第一层包装,把ObservableOnSubscribe包在了里边

RxJava装饰者模式

  • 然后是Observable.create().subscribeOn(),调用时又进行了一层包装,把ObservableCreate包进去了

RxJava装饰者模式

  • 再然后就分别是observeOn()了,结构如下

RxJava装饰者模式

  • 总共进行了4层包装,能够理解为每调用一次操作符就会进行一层被观察者的包装,这样包装的好处便是为了增加额定的功用,那么每一层又增加了哪些额定的功用呢

4.被观察者的subscribe办法

调用subscribe办法后会从最外层的包装类一步一步的往里边调用,从被观察者的subscribe办法中能够得知额定功用的完成是在subscribeActual办法中,那么上面几层包装的subscribeActual办法中又做了什么呢,剖析如下

  • 先看最外层的包装observerOnsubscribeActual办法做了什么,先看源码:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    ...
}
  • 源码中有一个source,这个source是上一层包装类的实例,在source.subscribe()中对观察者进行了一层包装,也便是ObserveOnObserver,它在onNext办法里边完成了线程切换,这个onNext是在被观察者在通知观察者时会被回调,然后经过包装类完成额定的线程切换,这儿是切换到了主线程履行。此时观察者的结构如下:
@Override
public void onNext(T t) {
    if (done) {
        return;
    }
    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

RxJava装饰者模式

  • 再看下一层的包装subscribeOnsubscribeActual办法做了什么,先看源码
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }));
    }
    ...
}

这儿又对观察者进行了一层包装,也便是SubscribeOnObserver,这儿边的额定功用便是资源开释,包装完后的结构如下

 static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
        private static final long serialVersionUID = 8094547886072529208L;
       		...
        @Override
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }
        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

RxJava装饰者模式

subscribeActual办法中有一个调用是source.subscribe(parent),这个source便是它的上一层的包装类ObservableCreate,那么ObservableCreatesubscribeActual办法就会在子线程履行。

ObservableCreatesubscribeActual办法做了什么,先看源码

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    	...
}

源码中的source便是创立最原始的ObservableOnSubscribe,这儿会回调到ObservableOnSubscribesubscribe办法,在subscribeActual办法中又对观察者进行了一层包装也便是CreateEmitter,这个类里边做的工作是判断线程是否被开释,假如开释了则不再进行回调,这时分结构如下图

 @Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}

RxJava装饰者模式

这儿因为上面的包装类现已切换到了子线程所以ObservableOnSubscribesubscribe办法的履行也是在子线程;

3.总结

在创立被观察者的时分会对被观察者进行层层的包装,创立几回就包装几回,然后在被观察者调用subscribe办法时,一层层回调被观察者的subscribeActual办法,而在被观察者subscribeActual办法中会对观察者做一层包装。也便是说被观察者是创立的时分包装,在subscribeActual办法中完成额定的功用,观察者是在被观察者调用subscribeActual办法时进行包装的,然后针对观察者完成自己的额定的功用,流程图如下:

RxJava装饰者模式

终究的结构如下:

  • 第一步:创立被观察者时或许运用操作符时会对被观察者进行包装

RxJava装饰者模式

  • 第二步:当被观察者和观察者产生订阅联系后,被观察者会一层层的回调被观察者的subscribeActual办法,在这个办法中对观察者进行包装,此时被观察者的功用完成是在subscribeActual中,观察者的完成是在包装类里

RxJava装饰者模式

  • 第三步:被观察者和观察者不同的是,被观察者是在订阅成功后就履行了包装类相应的功用,而观察者是在事情回调的时分,会在观察者的包装类里完成相应的功用
  • 终究流程图

RxJava装饰者模式