前语

rxjava 能够很便利的进行线程切换, 那么rxjava是怎样进行线程切换的呢?阅读本文能够了解下rxjava 是怎样进行线程切换的及线程切换的影响点。


一个简略的代码:

Observable.create(new Observajava语言bleOnSubscribe<String>() {
@Override
public void subscribe(Obsejava面试题rvableEmitter<St源码编辑器编程猫下载ring> e) throws Exception {
Log.d("WanRxjava ", "subscrib  tandroid下载d ==" + Thread.currentThread().getName());
e.onNext("我在发送next");
e.onComplete()源码年代;
}
}).subscribeOn(SchedulerAndroids.io())
.observeOn(Androidandroid下载装置Schedulers.m线程安全ainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("WanRxjava ", "onSubscribe td ==" + Threadandroid下载装置.currentThread().getName());
}
@Override
public void onNeandroid体系xt(String value) {
Log.d("WanRxjava ", "onNext td ==" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
publandroid下载ic vo线程池id oandroid体系nComplete() {
Log.d源码年代("WanRxjava ", "onComplete td ==" + Thandroid下载read.currentThread().getName());
}
});

如上代码,完成了线程切换和观察者被观察者绑定的逻辑。咱们分四源码编程部分看上述代码逻辑create、subscribeOn、observeOn、subscribe

1.create

create 顾名思议是 创立被观察者,这儿有一个java编译器参数是 ObservableOnSubscribejava环境变量装备,这是个接口类,咱们看下create 的源码

@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreat源码编辑器编程猫下载e<T>(source));
}

将ObservableOnSubscribe 传入后 又调用了 new ObservableCrea线程池的七个参数te(source)

publ线程和进程的差异是什么ic final class ObservableCreate<T> extends Obse线程池面试题rvable<T&g源码编辑器编程猫下载t; {
final ObservableOnSubscribe<T> source;
public Observjava根底知识点ableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
}

ObservableCreate 有一个变量是 source,这儿仅仅将传入的ObservableOn源码本钱Subscribe 赋值给soujava模拟器rce,也就是做了一层包装,然后回来。

2.subsandroid平板电脑价格cribeOn

调用完creat源码本钱e后回来了 ObservableCreate线程是什么意思(Observable),然后持续调用subscribeOn,传入了一个变量 Schedulers.io()

@SchedulerSupandroid平板电脑价格port(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPluginsandroid下载.onAssembly(new ObservableSubscribeOn&ltjava根本数据类型;T>(this, scheduler));
}

咱们看到调用了new ObservableSubscribeOn(源码this, scheduler) 将本身和 schedu源码年代ler 传android平板电脑价格

public final class ObservableSubscribeOn<T> extends AbstractObservabandroid/yunosleWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource&java根本数据类型lt;T> source, Scheduler scheduler) {
super(source);
this.scheduler = sched线程撕裂者uler;源码编辑器手机版下载
}
}

Ob线程池的七个参数servableSubscribeOn 将scheduler 和 create 回来的目标又包装了一层 回来ObservableSubscribeOn

3.observeOn

有一个参数是 Scheduler

@SchedulerSuppor线程的几种状况t(SchedulerSupport.CUSTOM)
public fina源码网站l Observable<T&gandroid平板电脑价格t; observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@SchedulerSupport(SchedulerSupport.CUSTOM)
publi线程c final Observajava编译器ble<T> observeOn(Scheduler scheduler,android体系 boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bujavascriptfferSize, "bufferSize");
rjava面试题eturn RxJav线程和进程的差异是什么aPlugins.onAssembly(new ObservableObserveOn<T>(this, sched线程和进程的差异是什么uler, delayError, bufferSize));
}

Obsandroid是什么手机牌子ervableSubscribeOn(observable)android体系又调用了observeOn,然后调用了new ObservableObserveOn(this, scheduler, delayError, bufferSize)

public final class ObservableObserveOn<T> extends AbsjavascripttractObservableWithUpstream<T, T> {
final Scheduler scheduler;android什么意思
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T>源码编辑器手机版下载 source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.s线程池的创立方法有几种cheduler =java怎样读 scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize源码编辑器;
}
}

又是一个包装,将ObservableSubscribeOn 和 scheduler 包装成 Objava环境变量装备servajava怎样读bleObserv线程池的七个参数eOn

4.subjava环境变量装备scribe

上述毕竟一步即调用ObservableObserjavaeev线程安全eOn.subscribe,传入参数是一个 observer

//ObservableObserveOn.java
@SchedulerSupport(Sched源码编辑器ulerSupport.NONE)
@Override
pujava怎样读blic final void subjava根本数据类型scribe(Observer<? super T> obsandroid下载装置erver) {
ObjectHelper.r源码年代equireNonNull(observerandroid手机, "observer is null");
try {
observer = RxJavaPlugins.onSub线程撕裂者scribeandroid下载(this, observe线程和进程的差异是什么r);源码
ObjectHelper.requireNonandroid下载装置Null(obsejava怎样读rver, "Plugin returned null Observer");androidstudio装置教程
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
tandroid体系hrow e;
} catch (Throjavascriptwable e) {
Exceptions.throwIfFatal(e);
/java语言/ can't call onError because no way to know if a Di线程池sposable has bee源码编程n set or not
// can't call onSubscribe because the call might havandroidstudio装置教程e set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointeandroid是什么手机牌子rExcep源码tion("Actually not, but can't throw other exce线程池ptions due to RS");
npe.initCause(e);
throw npe;
}
}

能够看到调用androidstudio装置教程subscribe 后调用了subscribeActual(obsandroid体系erver);将observer 传入

咱们看下 subscribeActual(observer)

//ObservableObserveOandroid下载装置n.java
@Ove线程和进程的差异是什么rride
protected void subscribeActual(Observer&线程的几种状况lt;? super T> obJavaserver) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {java根本数据类型
Sc线程heduler.Wor线程是什么意思ker w = scheduler.createWorker();
source.subscribjavaeee(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}android体系

上面源码共享网的if 先不管,首要看下下面的逻辑,调用了 scheduler.createWorker(),这个scheduler 是 obs线程安全erveOn 传入的,然后调用

new ObserveOnObserver(observer, w, delayError, bufferSize);将worker /observer 又做了一次包装。

//Objavaeeser源码编辑器手机版下载vableObserveAndroidOn 内部类
static final class ObserveOnObserver<T> extends Basi源码集市cIntQueueDisposable<T&线程和进程的差异是什么gt;
implements Observer<T>, Runnable {
private static final long serialVeandroidstudio装置教程rsionUID = 65768966199线程的生命周期3098358java模拟器4L;源码本钱
final Observer<? super T> actual;
fina线程池的创立方法有几种l Schedu线程是什么意思ler.Worker worker;
final boolean delayError;
final int bufferSize;
SimpleQueue<T> queue;
D线程安全isposable s;
Throwable error;
volatile boolean done;
volatile boolean cancelled;
int sourceMode;
booleaandroid什么意思n outputFused源码集市;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker线程安全 worker, boolean d源码网站elayError, int bufferSize) {
this.actual = aandroid的drawable类ctual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;线程池
if (s instanceof QueueDisposable) {
@SuppressWarningandroid/yunoss(java面试题"unchecked")
QueueDisposable<T> qd = (QueueDisposable线程池<T>) s;
int m = qd.requestFusion(QueueDispos线程和进程的差异是什么able.Aandroid下载装置NY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
actual.onSub源码网站scribe(this);
scheduandroid的drawable类le();
return;
}
if (m == QueueDisposjava语言able.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
rAndroideturn;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
@Override
public vojavaeeid onError(Throwable t) {
if (do源码编辑器编程猫下载ne) {
RxJavaPlugins.onError(tjava面试题);
return;
}
error = t;
done = true;
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
@Override
public void dispose() {
if (!cancelled) {
c源码共享网ancelled = true;
s.dispose();
worker.dispose();
if (getAndjavaeeIncrement() == 0) {
queue.clear();
}
}
}
@Override
public boolean isDisposed() {
return cancelled;
}
void schedule() {
if (getAndIncreme线程撕裂者nt() == 0) {
worker.schedule(this);
}
}
voidjava根本数据类型 drainNormal() {
int missedjava环境变量装备 = 1;
final Sijava面试题mpleQueue&ltandroid下载装置;T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(doandroidstudio装置教程ne, q.isEmpty(), a)) {
return;
}
for (;;) {线程池
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispoandroid平板电脑价格se();
q.clear();
a.onError(ex);
randroid下载装置eturn;
}
boo线程池的七个参数lean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v线程池面试题);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
void draijava根底知识点nFused() {
int missed = 1;
for (;;) {
if (cancelled) {
return源码;
}
boolean d = done;
Throwable ex = error;
if (!delayError && d && ex != null) {
actual.onError(error);
worker.dispose();
return;
}
actual.onNext(null);
if (d)java根本数据类型 {
ex = error;
if (ex != null) {
actual.onError(ex);
} else {
act源码编辑器ual.onCjava编译器omplete源码网站();
}
worker.dispose();
return;
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
@Override
public void run() {
if (源码out线程安全putF线程池面试题used) {
drainFused();
} else {
drainNormal();
}
}
bandroid是什么手机牌子oolean checkTerminated(boolean d, boolean empty, Observer<? superjava面试题 T> a) {
if (cancelled) {
queue.clear();
return true;
}
if (d) {
Throwable e = error;
if (delayError源码网) {
if (empjava根底知识点ty) {
if (ejavascript != null) {
a.onError(e);
} e源码共享网lse {
a.onComplete();
}
worker.dispose();
reandroid下载装置turn true;
}
} else {
if (e != null) {
queue.clear();
a.onError(e);
worker.dispose();
return true;
} elsjavaeee
if (empty) {
a.onComplete();
worker.dispose();
return true源码本钱;
}
}
}
return falsejava根本数据类型;
}
@Override
public int requestFusion(int mode) {线程池
if ((mode & ASYNC) != 0) {
outputFused = true;
return ASYNC;
}
return NONE;
}
@Override
public T poll() throws Exceptjava编译器ion线程的几种状况 {
return queue.poll();
}
@Override
public void clear() {
queue.clear();
}
@Override
public boolean isEmandroid什么意思pty()源码年代 {
return qujava模拟器eue.isEmpty();
}
}

包装完Obsjava面试题er源码编辑器veOnObserver后,android体系调用了source.subscribe 这儿的source 即Obser线程池vableSubscribejava根底知识点On.subscribe,然后调用ObservableSubscribeOn.subscribeActual

//ObservableSubscribeOn.java
@Override
public void subscribeAcjava怎样读tual(final Observer<? super T> s) {
fina源码编辑器手机版下载l SubscribeOnObserver<T> parent = new scheduler<T>(s);
s.onSub线程是什么意思scribe(parent);
p线程的生命周期arent.setDisposable(schedule线程池的创立方法有几种r.scheduleDirect(new Runna源码编辑器ble() {
@Overjava环境变量装备ridandroid下载e
public void run() {
source.subscribe(parent);
}android/yunos
}));
}
static final class SubscribeOnObserver<T> extends AtomicRefe源码集市rence<Disposabl源码年代e> implements Observer<T>, Disposable源码编程 {
private static final long serialVersionUID = 809454android什么意思78860725292线程撕裂者08L;
final Obserandroid什么意思ver<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actuaandroid/yunosl;java面试题
this.s = new AtomicReference<Disposable>();
}
@Overrandroid是什么手机牌子ide
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext源码共享网(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.djava怎样读ispose(s);
DisposableHelper.dispose(this);
}
@Ojava怎样读verride
public boolean isDispose源码网站d() {
r源码编程eturn DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d线程和进程的差异是什么) {
DisposableHelper.set源码本钱Once(this, d);
}
}

ObservableSubsandroid下载装置cri源码本钱beOn.subscribeActual

首先将传入的观察者封装成 Subandroid体系scribeOnObserver
然后触发了 onSubscribe,接着调用 schedulandroid体系er.scheduleDirect(new Runnable() 这儿的scheduler 是 subscribeOn 传android平板电脑价格入的
毕竟调用了 scheduler.setsetDiJavasposab源码网le方法。

咱们看到 run 的方法体即source.subscribe(parent);这儿的source 即线程 Observabl源码eCreate线程是什么意思(ObservableOnSubscribe),传入了observer,然后调用 observer的OnNext 和 OnComplete 方法。

5.小结:
a. 调用Observer.OnSubscribe 方法是 不受线程调度影响的
b.subscribeOn 影响的是发送源码作业的线程
c.observerOn 影响的是观察者处理承受数据的线程,假设没有调用observeOn 则不java根底知识点会进行包装成 ObserveOnObserver,也就是说不会实施观线程安全察者的线程切换,和 发送者的线程一同
d.多次调用subscribe线程的几种状况On切换线程,每次都会new ObservableSubscribeOn,触发作业发送时会往上调线程撕裂者用,也就是第一次调用的subscribeOn传入的线程 会实施发送工线程撕裂者作,后边的线程切换无效
e.Observer.OnSubscribe 只会实施一次,由于调用Disp源码本钱osaandroid什么意思bleHelper.setOnce(this.s, s)
f.处理完onComplete 或许onError 后就不会再发出作业,由于被观察者发送完这两个作业后 就会调用disposed