跨线程订阅是 RxSwift Slack 频道上反复出现的一个问题。解说十分简单,所以我觉得把它放在 Blog 中是个好主意,这样不管何时需求,我都能够链接到它,而不必一遍又一遍地输入所有内容。
Observable 订阅
有关订阅(subscribing)和调查(observing)的术语在我看来有点紊乱,所以我们先把这个问题解决掉(不要跳过这一部分!)。
- 首先,你需求界说一个
Observable
,在某些情况下,你需求在闭包中供给一些代码来履行作业并向任何调查者发送元素。当你创立Observable
时,这些代码会被保存起来,以备将来运用,但不会当即履行。假如还没有调查者——Observable
只是静静地等待,不会做任何事情。 - 当你为订阅建模时,您能够运用一些操作符(如
map
、filter
等)来处理发射的元素。增加操作符依然不会履行任何作业——你只是在原始可调查目标的基础上创立了一个 “更专业” 的可调查目标。 - 只有在调用可调查目标的
subscribe(...)
方法时,才能 “启动” 可调查目标。调用subscribe(...)
将实践履行你在第一部分(上文)代码块中供给的代码。
因而,从这个意义上讲,这里有两个要点需求阐明:
-
subscription code 是通过
subscribe()
调用的代码,而且位于Observable.create { ...}
中。这是创立订阅并生成元素的代码。 -
observation code 是你调查发射元素的当地 – 这是你在
onNext: {...}
,onCompleted:{...}
等中供给的代码。这是你进行调查的当地。
我们用
subscribeOn
来决议数据序列的构建函数在哪个Scheduler上运行。
我们用
observeOn
来决议在哪个Scheduler监听这个数据序列。
Schedulers
RxSwift 自带了大量预界说调度程序,能够满意大多数情况下的需求。在 RxSwift 软件库房中有一个关于此主题的简短文档:schedulers。
4.6 Schedulers – 调度器 RxSwift 中文文档
本文将运用两个调度程序:
-
MainScheduler.instance
:调度程序在主线程上履行; -
ConcurrentDispatchQueueScheduler
:运用 GCD 在给定队列上履行作业;
Subscribing 和 subscribeOn
让我们来看看 subscribeOn
操作符——它答应你更改履行 subscription code 的 Schedulers。
默许情况下,订阅代码将在与调用 subscribe()
代码的相同线程上运行,除非运用 subscribeOn (...)
改变上下文。
例如:
Observable<Int>.create { observer in
observer.onNext(1)
sleep(1)
observer.onNext(2)
return Disposables.create()
}
.subscribe(onNext: { el in
print(Thread.isMainThread)
})
假如将此代码放在 viewDidLoad
中,由于订阅代码(subscription code)中运用了 sleep
,因而会阻塞主线程。
你的 onNext
代码将打印为 true
,由于你将始终停留在主线程上:
[main] subscribe() -> [main] create{ ... } -> [main] onNext { ... }
现在,你能够通过插入 subscribeOn
操作符来更改订阅的 scheduler:
Observable<Int>.create { observer in
observer.onNext(1)
sleep(1)
observer.onNext(2)
return Disposables.create()
}
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .background))
.subscribe(onNext: { el in
print(Thread.isMainThread)
})
这一次,你将在订阅的一起切换线程:
[main] subscribe() -> [background] create{ ... } -> [background] onNext { ... }
而 onNext
中的打印将输出 false
。
Observing 和 observeOn
现在我们来调查序列中的元素。这部分涉及 observation code。
在我们之前的示例中,你将订阅切换到了后台队列,由于它做了一些阻塞性作业。但实践上,你想要的是在主线程上运行 onNext {…}
中的代码,以便更新应用程序的用户界面。
这能够通过运用 observeOn
操作符来实现。顺便说一下,你能够将 observeOn
和 subscribeOn
放在操作符链中的任何方位,次序并不重要。
RxSwift 包括一个共享调度器,它运用主线程 MainScheduler.instance
,因而你能够像这样运用它来轻松调查元素:
Observable<Int>.create { observer in
observer.onNext(1)
sleep(1)
observer.onNext(2)
return Disposables.create()
}
.observeOn(MainScheduler.instance) // 在主线程监听
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .background)) // 在后台线程创立订阅
.subscribe(onNext: { el in
print(Thread.isMainThread)
})
这将像这样履行代码:
[main] subscribe() -> [background] create{ ... } -> [main] onNext { ... }
假如你常常做异步作业,就会常常用到这种形式,因而越早习惯越好。
我希望这篇文章能让我们对命名和用法有更明晰的认识。