我正在参与「启航方案」

前语

Rx 是一种响应式编程的思想,现在有很多语言都支持这种思想:RxJavaRxJsRxSwift...。它是根据特别的观察者方式来完结的,能够轻松的完结异步事件响应流,避免回调阴间的发生。这种思想和咱们平常的编程思想不太相同,它以数据流为中心,处理数据的输入,处理以及输出,因而这个结构学习起来是有一定难度的,加之 RxJava 操作符很多,这又给初学者立了个下马威。

本篇博客会 剖析其原理与思想 ,从源码层次对其进行深入剖析。

✔️ 本文阅读时长约为:10min

本篇博客适合已经会简略运用 RxJava 结构并想了解其原理与思想的同学~

RxJava的观察者方式

规范的观察者规划方式 中,一般都是一个被观察者,多个观察者。例如,在抖音中ABCD四人都重视了一个博主 。那么当博主 发布新视频时,甲会去告诉ABCD四人,告知他们我已经发布了新的视频,你们现在能够前去观看。

你可以不用RxJava,但必须得领悟它的思想!

RxJava的观察者规划方式 与前者有少许差异,我的理解是在 RxJava 中只要一个被观察者和一个观察者,被观察者便是数据源也便是 起点 ,而观察者便是 结尾起点结尾 这一段进程做什么作业由咱们自己定义,像加卡片相同往里堆,但一向只要一个 起点 和一个 结尾 ,这也便是俗称的卡片式编程。

你可以不用RxJava,但必须得领悟它的思想!

举个栗子

咱们的起点是发起网络恳求得到一张图片,结尾是得到一张加了 水印高斯含糊 的图片。那么咱们就需要在 起点结尾 这一进程中往里边加两张卡片。一张卡片是加水印的功用,另一张卡片是加高斯含糊的功用,终究咱们会在 结尾 得到咱们的想要的效果,这便是 RxJava 的一个进程,咱们看是不是比 规范观察者方式 的耦合度更低呢?

现在结合代码来看一下呢:

    // 示例:创立Observable
    // create办法中的参数是咱们自定义的source,下文会说到
    Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
    //----------------------------------被观察者--------------------------------------
        @Override
        public void subscribe(ObservableEmitter<Object> e) throws Exception {
            // 数据源(起点),假定咱们这儿得到了一张图片,开端向下传递
            e.onNext();
        }
    //-----------------------------------卡片1----------------------------------------
    }).map(new Function<Bitmap, Bitmap>() {
        // 1.加水印的卡片
        @Override
        public Bitmap apply(Bitmap bitmap) throws Exception {
            // TODO 这儿咱们将图片加上水印
        }
    //-----------------------------------卡片2----------------------------------------
    }).map(new Function<Bitmap, Bitmap>() {
        // 2.加高斯含糊的卡片
        @Override
        public Bitmap apply(Bitmap bitmap) throws Exception {
            // TODO 这儿咱们将图片加上高斯含糊
        }
    //-----------------------------------观察者---------------------------------------
    // 订阅被观察者
    }).subscribe(new Observer<Bitmap>() {
        @Override
        public void onSubscribe(Disposable d) {}
        @Override
        public void onNext(Bitmap bitmap) {
            // 终究获得加了水印和高斯含糊的图片
            // do what you want...
        }
        @Override
        public void onError(Throwable e) {}
        @Override
        public void onComplete() {}
    });

咱们能够看到,RxJava 链式调用后在结尾能得到咱们想要的成果,咱们对 Bitmap 的两个操作以 卡片 方式加到了 起点结尾 的进程中,这便是 RxJava 的魅力之一。

了解 RxJava 大体的履行流程后,现在咱们就从源码视点来解说整个结构内部逻辑是怎样工作的。

RxJava怎么将事件逐渐传递

首要咱们先来看看这些操作符内部逻辑是什么姿态的,这儿以 createmap 为比方展现:

1️⃣. create

// Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    // 常规空查看
    ObjectHelper.requireNonNull(source, "source is null");
    // 这儿是个hook,其实直接放回的是 ObservableCreate
    // 回来的其实便是咱们传进来的参数
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

2️⃣. map

// Observable.java
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    // 常规空查看
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    // 这儿是个hook,其实直接放回的是 ObservableMap
    // 回来的其实便是咱们传进来的参数
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

从上面的比方中能够看出,这两个操作符不管是结构仍是逻辑简直是如出一辙,仅仅 onAssembly 办法中传递的参数不同(其实也能够说是相同的,他们都是 Observable 的子类)。实践上,简直所有的操作符都是这样的结构,会在内部 new ObservableXXX()

为什么咱们的操作符都能链式调用呢?这就跟上面说到的特点有关了,由于简直所有操作符都是上述结构,回来值的都是 ObservableXXX,而他们都是 Observable 的子类,所以咱们能一向经过 .XXXX的方式调用 Observable 中的操作符,一向往里边加 卡片 来完结咱们的需求。

⚡⚡ 那么,ObservableXXX类有什么用呢?


1.ObervableXXX的全体知道

其实这些 ObservableXXX 就像工厂中的一个个流水线拼装货品的机器,货品从 起点结尾 传输,当货品传输到不同的机器上时,这台机器会操作货品完结这一层的拼装作业,终究会将其传输到下一个机器上去,完结那个机器的拼装使命,直至传输到 结尾 整个货品拼装结束。

回头看上面的比方,咱们参加的两张卡片就像是两台机器相同,map 机器完结水印作业后,将 货品 扔给后一个 map 机器,由它完结高斯含糊作业,终究 货品拼装完结,被传输到了结尾,在这儿咱们就能拿到终究期望的成果。

接下来我会举两个比方,带着咱们看看它们的内部逻辑。由于简直所有的 ObservableXXX 的逻辑和功用都相似,因而咱们在看完下面几个比方后,能够自行去源码中轻松查看你想看的部分。

2.ObservableMap内部工作逻辑

这样一说信任咱们对它有了一个全体的感受,那么咱们现在以 ObservableMap 为例,要点解说一下内部逻辑吧。

⚡ 在解说源码之前,咱们需要先知道一个点:代码傍边的 source 指的都是上一个操作符回来的 Observable 目标,function 指的是调用此操作符需要传递的参数,即匿名完结类,别的下文还会呈现 自定义source,这个便是咱们在运用 create操作符 时传递的参数,也称 顶层source ,它是货品运送的 起点

// ObservableMap.java
// Observable承继自AbstractObservableWithUpstream,AbstractObservableWithUpstream又承继Observable
// Observable完结了ObservableSource接口
// 因而这儿会重写 subscribe 办法
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        // source 是上一层操作符回来的Observable目标
        // function 是咱们运用此操作符传递的那个匿名完结类,<能够称其为这一个机器需要做的操作>
        // 在本类中其实便是完结了Function接口的匿名类,里边有个办法叫作apply
        // <本层机器处理货品时>,会看到apply办法
        super(source);
        this.function = function;
    }
    // 此办法是 subscribe 的详细完结办法
    // 咱们进行订阅时,即调用subscribe时,终究会调用此办法,<终究告诉起点开端传输货品>
    @Override
    public void subscribeActual(Observer<? super U> t) {
        // t代表咱们调用subscribe时传递的参数,即匿名完结类,<咱们能够称其为后一台机器要履行的操作>
        // 将t和function包装一下,再调用 source 的 subscribe,<也便是前面那台机器的subscribe>
        // 这儿的subscribe能够理解为告诉上层机器开端运送货品
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    // 内部类,<将后一台机器履行的操作和上一个机器绑定起来>
    // 便是将后一个机器的操作做了层封装,包裹了一层
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;
        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }
        // <此办法是货品抵达本机器时,本机器的相应操作>
        @Override
        public void onNext(T t) {
            // 这些判空直接不看
            if (done) {
                return;
            }
            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }
            U v;
            // 这儿会履行apply办法,是咱们运用map操作符传递的完结了Function接口的匿名内部类中的办法
            // <本层机器开端处理上层机器传递过来的货品>
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            // 本层机器的货品处理结束,扔给基层机器持续处理
            // <actual代表基层机器>
            actual.onNext(v);
        }
    }

总的来说 ObservableMap 做了两件事,第一便是调用 subscribeActual 告诉上层机器让其开端传递货品,第二便是调用 onNext 处理传递到本层的货品,终究再扔给下一个机器去处理。

其实 ObservableXXX 结构和逻辑都差不太多,我将最开端给的比方用图表示了出来:

你可以不用RxJava,但必须得领悟它的思想!

信任咱们能够从这张图中看出,整个工作逻辑是一个 U型结构,当咱们调用 subscribe 时,就会告诉 起点 运送货品,其实也便是设置了观察者。然后,货品就会一层一层往下传递,终究流到咱们的结尾。假如咱们中途想持续添加需求,直接往中心那个进程添加机器(或者说是卡片)就行了,非常便利,并且整个链式调用下来,代码会比不用 RxJava 显得愈加干净整齐易读。

3.ObservableCreate内部工作逻辑

那么起点是怎么接纳下面的 机器 传递上来的运送货品的信号并开端运送货品的呢?

public final class ObservableCreate<T> extends Observable<T> {
    // 咱们自己写的source
    final ObservableOnSubscribe<T> source;
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        // 这个source是咱们自定义的,也便是运用create操作符时传递的那个参数
        // 这便是咱们的所说的起点、源头
        this.source = source;
    }
    // 这个便是起点触发传送使命的办法
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // 这个observer是从下面机器经过一层层包装后传递过来的observer
        // 实践上调用的是咱们运用subscribe传递参数傍边的onSubscribe办法,即结尾的onSubscribe办法
        observer.onSubscribe(parent);
        // 开端传输使命,将货品往下传递
        try {
            // 这儿的source便是顶层的自定义source
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    // 这个便是发射器,用于将货品从起点往下传递的东西
    // 一般情况下,咱们在自定义source中都会调用此类的onNext办法开端向下传输货品
    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
        private static final long serialVersionUID = -3434801548987643227L;
        final Observer<? super T> observer;
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
        // 向下传输货品的办法
        @Override
        public void onNext(T t) {
            ...
            if (!isDisposed()) {
                // 将货品扔给下一层机器去处理
                observer.onNext(t);
            }
        }
        ...
    }
    ...
}

至此,两个典型的 ObservableXXX 解说结束,可能咱们对源码傍边的 sourcesubscribeActual 中的参数 observer/t 傻傻分不清楚,不知道这些是指 前一个机器 ,仍是 后一个机器 ,那么接下来请看下面这幅图。

你可以不用RxJava,但必须得领悟它的思想!

source 是前一个操作符回来的 Observable 目标,这个信任咱们都简单理解。这儿着重说一下上述图的下半部分,咱们还记得结尾是怎么告诉起点开端传输 货品 的吗?没错,是经过每一个 Observable.subscribeActual 办法 (机器)中的参数不断向起点传递,每个 subscribeActual 办法中都会调用 source.subscribe(),也便是告诉前面机器传输包裹,每向前传递一次就 封一层包裹 ,终究触发 起点 传输货品,之后就开端一层层 拆包裹 ,终究拆到结尾(subscribe)中咱们自己完结的 ObservableOnSubscribe 中。

小结

为了便于咱们理解前面所叙述的知识点,这儿咱们用文章最初的那个比方持续深入解说,咱们来看上述代码是怎么履行的,再贴一遍代码:

Observable<Object> observable = Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> e) throws Exception {
                e.onNext();
            }
        }).map(new Function<Bitmap, Bitmap>() {
            @Override
            public Object apply(Bitmap bitmap) throws Exception {
                // todo
            }
        }).map(new Function<Bitmap, Bitmap>() {
            @Override
            public ObservableSource<?> apply(Bitmap bitmap) throws Exception {
                // todo
            }
        }).subscribe(new Observer<Bitmap>() {
            @Override
            public void onSubscribe(Disposable d) {}
            @Override
            public void onNext(Bitmap bitmap) {}
            @Override
            public void onError(Throwable e) {}
            @Override
            public void onComplete() {}
    });

1️⃣ 咱们经过 .subscribe(),发起了订阅,开端观察被观察者,结合咱们之前讲的,咱们实践上调用的是map 回来目标中的 subscribe 办法,即 ObservableMap

// ObservableMap.java
//它会持续调用上层机器的subscribe,告诉上层机器传输货品
public void subscribeActual(Observer<? super U> t) {
    // 封包裹,传递给上层机器
    source.subscribe(new MapObserver<T, U>(t, function));
}

2️⃣ 这儿它会持续调用上层机器的 subscribe,由于上个操作符仍是 map,所以咱们直接越过,看 create 操作符。

// ObservableCreate.java
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    // 这儿是调用下一层机器的onSubscribe办法
    observer.onSubscribe(parent);
    try {
        // 调用咱们自定义的soruce,开端传输货品
        // 比方中咱们自定义soruce中写的代码是emitter.onNext,开端传输货品
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

3️⃣ 到这儿,咱们已经告诉 起点 发起传送使命,那么接下来便是传输货品的进程,逐渐 拆包 的进程。跟着货品的运送,代码会不断调用最外层包裹中的 observable 目标的 onNext 办法,看起来就像在拆包裹相同,这其实也是前面说的运送包裹的进程,如此重复,终究抵达咱们的结尾的 onNext 办法,咱们拿到终究期望的货品,至此,整个进程根本结束。

✔️ 现在是不是脑海里边又呈现了前面所说的 U型结构 呢?

RxJava是怎么完结线程切换的?

RxJava 能完结线程切换是经过 subscribeOnobserveOn。正常情况下咱们都是这么用的:

observable.subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(...)

前面重复强调整个结构的履行进程是一个 U型结构,请咱们考虑一下,subscribeOnobserveOn 切换线程的中心代码是在 subscribeActual 中仍是 onNext 中?

回忆一下,咱们告诉 起点 开端传输货品时,是经过 subscribeActual 逐层往上传递的,紧接着就开端运送货品处理货品了,就比方 map 机器中的 apply 办法便是处理货品的详细办法。因而,这一段使命都应该在 子线程 中完结,所以 subscribeOn的切换线程的中心代码是在subscribeActual中的。

observeOn 的作用便是让结尾的 onNext 办法中的代码在主线程中履行,那么理应只需要在 observeOn操作符中的onNext中切换线程即可。

有了上述猜想,咱们现在进入源码验证一下。

1.subscribeOn的线程切换

// 不仅有io线程,还有以下这些线程
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());

咱们首要来看看运用该操作符时传递的参数 Schedulers.io() 到底是个什么东西。咱们一层层点进去终究会发现它其实便是 new了一个线程池 :

// 一路点进去,会发现其实便是new了一个线程池
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);

其间的 CacheWorkerPool 的源码如下:

// IoSchedule.java,这个类将线程池中的创立、调度、履行等办法做了一下封装
...
// 这个办法便是得到线程池
public Worker createWorker() {
    // EventLoopWorker承继自Worker,是对线程池做了一下封装,包裹了一层
    return new EventLoopWorker(pool.get());
}
// 实践调用线程池的办法
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
    if (tasks.isDisposed()) {
        // don't schedule, we are unsubscribed
        return EmptyDisposable.INSTANCE;
    }
    // 里边便是调用线程池的submit等办法
    return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
...
// CacheWorkerPool是IoSchedule中的内部类
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
    this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
    this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
    this.allWorkers = new CompositeDisposable();
    this.threadFactory = threadFactory;
    ScheduledExecutorService evictor = null;
    Future<?> task = null;
    if (unit != null) {
        // 这儿的中心线程数是1就足够了,由于咱们子线程中的使命都是链式调用的
        evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
        task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, 
            this.keepAliveTime, TimeUnit.NANOSECONDS);
    }
    evictorService = evictor;
    evictorTask = task;
}
...

所以,其实 .io() 便是传了一个中心线程数为1的线程池,当然咱们还能够传入 .computation().newThread()等办法,这些其实都是创立了一个特别的线程池。

咱们知道,这些操作符都是在内部 new ObservableXXX类,所以源码咱们直接看ObservableSubscirbeOn

// ObservableSubscribeOn.java
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    // 这个是调度器,便是对线程池封装了一下
    final Scheduler scheduler;
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
    // 咱们要点看在这个办法中是怎么完结线程切换的
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        // onSubscribe的回调
        s.onSubscribe(parent);
        // subscribeTask实践上是一个runnable
        // 调用scheduler.scheduleDirect(runnable),将runnable放入线程池中去履行
        // 这儿是重中之重,详细细节咱们稍后解说
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    ...
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
        @Override
        public void run() {
            // 上面那个runnable其实便是这个
            // 这儿是调用上层机器的subscirbe,告诉起点开端传输货品
            source.subscribe(parent);
        }
    }
}

从上面咱们能够知道,咱们将 source.subscribe() 扔进了线程池中,那么就做到了在其之后的所有代码都是在子线程中履行了,直到代码走到 observeOn,在 ObservableObserveOn 中的 onNext() 办法将线程切回主线程。

接下来咱们讲讲详细细节,看看 scheduleDirect 里边的逻辑。

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    // createWorker()办法眼熟吗?这便是在IoSchedule傍边的办法
    // Worker类本质上便是线程池,仅仅对其做了一层封装
    final Worker w = createWorker();
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    // 将传进来的runnable再做一层封装,这儿的runnable对应上面的比方便是source.subscribe
    DisposeTask task = new DisposeTask(decoratedRun, w);
    // 这个办法也眼熟吧?
    // 在线程池中履行此使命
    w.schedule(task, delay, unit);
    return task;
}

由上面这一进程咱们能够知道,当告诉到 本层的机器 传输货品时,这一层机器会将 subscribeActual 办法扔进线程池中去履行,即后续的代码都是运行在子线程中了。

那么它又是怎么将线程切回来的呢?

2.observeOn的线程切换

理解了上面线程切换后,咱们很简单能理解下面切换主线程的代码,现在直接进入 ObservableObserveOn 源码。

// ObservableObserveOn.java
@Override
protected void subscribeActual(Observer<? super T> observer) {
   // 同样能够理解为对线程的封装,只不过这儿内部是handler
   Scheduler.Worker w = scheduler.createWorker();
   // 同样是封包裹,把自己封装一层,扔给前面的机器
   source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
   ...
    // 要点重视onNext办法,切回主线程的代码都在这儿面
    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }
        ...
        // 切换线程的办法在这儿面,内部调用的是HandlerWorker的schedule
        // 里边又调用的是HandlerSchedule的schedule
        schedule();
    }

起点 开端传输货品时,即开端逐层拆包裹,这时候会逐层调用 onNext,终究就会在 ObservableObserveOn 中的 onNext 办法中切换线程。而在 onNext 中又调用的是 HandlerSchedule 中的 schedule 办法进行终究的线程切换。

// HandlerSchedule.java
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    ...
    // 这儿的run便是下一层机器的onNext办法,比方中便是结尾的onNext
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.
    // 调用Handler,完结终究的线程切换
    handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
    // 假如dispose,就中止RxJava后续流程,整个运送带不会再作业了
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }
    return scheduled;
}

画个图做个总结,让咱们理解更明晰一点:

你可以不用RxJava,但必须得领悟它的思想!

写在终究

篇幅很长,能看到终究很不简单,给自己一个 大大的赞 吧!

假如觉得写的不错,就给个赞再走吧~

创造实属不易,你的肯定是我创造的动力,下次见!

假如本篇博客有任何过错的当地,请咱们批评指正,不胜感激。