敞开成长之旅!这是我参加「日新方案 12 月更文挑战」的第33天,点击检查活动概况
1.RxJava的观察者形式
RxJava的观察者形式是扩展的观察者形式,扩展的当地首要体现在事情告诉的办法有许多种
2.RxJava的观察者形式涉及到几个类
- Observable:被观察者
- Observer:观察者
- Subscribe:订阅
- Event:被观察者告诉观察者的事情
3.Obsercerable与Observer经过Subscribe完成相关,Event首要向Observer告诉Observeble的变化,Event有几个告诉办法
- Next:惯例事情,可以传递各式各样的数据
- Error:异常事情,当被观察者发送异常事情后那么其他的事情就不会再继续发送了
- Completed:完毕事情,当观察者接纳到这个事情后就不会再接纳后续被观察者发送过来的事情
4.代码完成
- 首要定义一个观察者Observer
public abstract class Observer<T> {
//和被观察者订阅后,会回调这个办法
public static void onSubscribe(Emitter emitter);
// 传递惯例事情,用于传递数据
public abstract void onNext(T t);
// 传递异常事情
public abstract void onError(Throwable e);
// 传递完毕事情
public abstract void onComplete();
}
Observer中的办法都是回调,其间多了一个Emitter的接口类,他是一个发射器
public interface Emitter<T> {
void onNext(T t);
void onError(Throwable error);
void onCompleted();
}
完成逻辑便是经过包装Observer,里边终究是经过Observer进行回调的
public class CreateEmitter<T> implements Emitter<T> {
final Observer<T> observer;
CreateEmitter(Observer<T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
observer.onNext(t);
}
@Override
public void onError(Throwable error) {
observer.onError(error);
}
@Override
public void onComplete() {
observer.onComplete();
}
}
- 被观察者的完成
public abstract class Observable<T>{
public void subscribe(Observer<T> observer) {
//经过传入的Observer包装成CreateEmitter,用于回调
CreateEmitter emitter = new CreateEmitter(observer);
//回调订阅成功的办法
observer.onSubscribe(emitter);
//回调发射器emitter
subscribe(emitter);
}
/**
* 订阅成功后,进行回调
*/
public abstract void subscribe(Emitter<T> emitter);
}
就两步,第一步用于订阅,第二步用于回调
- 具体的使用
private void observer() {
// 第一步,创立被观察者
Observable<String> observable = new Observable<String>() {
@Override
public void subscribe(Emitter<String> emitter) {
emitter.onNext("第一次");
emitter.onNext("第二次");
emitter.onNext("第三次");
emitter.onComplete();
}
};
// 第二步,创立观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Emitter emitter) {
Log.i("TAG", " onSubscribe ");
}
@Override
public void onNext(String s) {
Log.i("TAG", " onNext s:" + s);
}
@Override
public void onError(Throwable e) {
Log.i("TAG", " onError e:" + e.toString());
}
@Override
public void onComplete() {
Log.i("TAG", " onComplete ");
}
};
// 第三步,被观察者订阅观察者
observable.subscribe(observer);
}
被订阅成功后,被观察者的subscribe里边就可以经过发射器发送事情了,终究在观察者的办法里进行回调。
RxJava也是观察者和被观察者订阅的进程,只不过被观察者有变化的时候是由发射器进行发送的,这样就不止有一种事情了
1.RxJava的装修者形式
-
- 装修者形式:在不改动原有的架构根底上添加一些新的功用,是作为其原有结构的包装,这个进程称为装修。
- RxJava的装修者形式首要是用于完成Observable和Observer的包装,首要是为了与RxJava的观察者形式合作完成代码的办法更简洁。
- 拆解RxJava的装修器形式
-
-
- 被观察者Observable
-
参考手机包装的比如
第一步:要有一个笼统接口,在RxJava中这个笼统接口是ObservableSource,里边有一个办法subscribe
public interface ObservableSource<T> {
/**
* Subscribes the given Observer to this ObservableSource instance.
* @param observer the Observer, not null
* @throws NullPointerException if {@code observer} is null
*/
void subscribe(@NonNull Observer<? super T> observer);
}
第二步:要有一个包装类,完成了ObservableSource的,RxJava的包装类是Observable,完成了对应的接口,
并且在subscribe办法里经过调用笼统办法subscribeActual,来对观察者进行订阅
public abstract class Observable<T> implements ObservableSource<T> {
...
@Override
public final void subscribe(Observer<? super T> observer) {
...
subscribeActual(observer);
...
}
protected abstract void subscribeActual(Observer<? super T> observer);
...
}
第三步:这便是具体的包装类了如图所示
2.观察者Observer:
- 第一步:要有一个笼统接口,而RxJava的接口是Emitter和Observer,里边有好几个办法基本一样,onNext,onError,onComplete,用于被观察者进行回调;
- 第二步:要有一个包装类,完成了Emitter或许Observer,但是观察者比较特殊,没有一个根底的包装类,而是直接封装了许多的包装类
RxJava的的被观察者是在创立的时候进行包装的,例如第一步的Observable.create办法,经过Observable.create的创立后进行了第一层包装,结构如下
第二步的subscribeO办法调用时进行了第二层的包装,此刻结构如下:
第三步的observerOn办法调用时,进行了第四层的包装,那么结构便是下面的样子
终究调用订阅办法的时候已经进行了四次包装,那么可以了解每调用一次操作符就会进行一层被观察者的包装。
那么这样包装的优点是什么呢?
这便是装修者形式的特性,在不改动原有功用的根底上添加额定的功用。
5.总结
咱们在创立被观察者的时候,会对被观察者做一层包装,创立几回就包装几回,然后在被观察者调用subscribe办法时,一层层回调被观察者的subscribeAcutal办法,而在被观察者的subscribeAcutal办法里,会对观察者做一层包装;
也便是说被观察者是在创立的时候进行包装,然后在subscribeActual中完成额定的功用;
而观察者是在被观察者调用subscribeActual办法里进行包装的,然后针对观察者完成自己额定的功用;
流程图如下: