前语
上篇文章介绍了RxSwift的相关概念,了解了RxSwift的基本编程思维,函数呼应式编程思维,
那么它的底层是怎样完成的呐?为什么它能够呼应,为什么能够将改变的值传递给呼应的函数当中去然后依据改变的值完成自己想完成的工能?下面我来探究下RxSwift的中心逻辑,解读下其部分源码。
1、observable可调查序列
observable是Rx中的一个重要的概念,这个概念有一个细节,有时分可调查序列不一定是完整的,或者说能够是有限的也能够是无限的,比方下面timwer这个例子:
例子
- 无限的
// 无限的
//加上period: .seconds(2)两秒履行一次
let ob = Observable<Int>.timer(.seconds(1), period: .seconds(2), scheduler: MainScheduler.instance)
ob.subscribe { num in
print(num)
} onCompleted: {
print("done")
}
.disposed(by: disposeBag)
打印是每格两秒打印一次,无限循环
- 有限的
//有限的
//打印到4就结束
let ob = Observable.of(1, 2, 3, 4)//just/from
ob.subscribe { num in
print(num)
} onCompleted: {
print("done")
}
.disposed(by: disposeBag)
打印成果
\
- 错误请求
// 网络请求错误
URLSession.shared.rx.response(request: URLRequest.init(url: URL.init(string: "http://www.baidu.xxx")!))
.subscribe { response, data in
}
.onCompleted: {
print("1")
}
.onError: { error in
print(error)
}
.disposed(by: disposeBag)
// 成果会打印错误信息,不会打印1
考虑
那么问题来为什么只打印错误信息,而不打印1,还有这个信号是怎样来的,怎样能够来到onCompleted或者onError的,它是怎样进行订阅到的?
这些问题能够理解为三步曲:发生、订阅、销毁
- 序列信号的发生
let ob = Observable<Any>.create { observer in
return Disposables.create()
}
- 订阅信号
//
let _ = ob.subscribe { text in
print("订阅的广东爱情故事到了:(text)")
} onError: { error in
print("error:(error)")
} onCompleted: {
print("完结")
} onDisposed: {
print("销毁")
}
.disposed(by: disposeBag)
error、onCompleted等这些是怎样来的,其实是从信号发生的observer中来的,接下来是第三步发送信号
- 发送信号
//第一步信号发生中插入发送信息号
let ob = Observable<Any>.create { observer in
// 3、发送信号
observer.onNext("人在广东流浪现已...")
observer.onError(NSError.init(domain: "myError", code: 10080, userInfo: nil))
//observer.onCompleted()
return Disposables.create()
}
这儿需要注意的是:一般完结onCompleted和错误onNext只能二选一。
看下面这张图
那么UI是怎样进行呼应的呐?比方一个UIControl的点击事情用Rx办法完成,他是怎样完成,基本思路是差不多
拓宽
这儿顺便拓宽下observable的创立:
(图片出自逻辑教育)
2、源码探索
信号发生
源码:
extension ObservableType {
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
AnonymousObservable(subscribe)//是一个匿名序列内部类
}
}
// 承继自Producer
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let subscribeHandler: SubscribeHandler
// 初始化的时分,做的是一个subscribe的回调记录,保存了当前的逃逸闭包,将外部的闭包当作参数
//这便是函数式编程思维
init(_ subscribeHandler: @escaping SubscribeHandler) {
self.subscribeHandler = subscribeHandler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
从这段源码能够看到,create回来一个匿名的序列AnonymousObservable,这个序列承继自Producer,初始化时保存了闭包SubscribeHandler。这个闭包便是前面的闭包:
订阅信号
public func subscribe(
onNext: ((Element) -> Void)? = nil,
onError: ((Swift.Error) -> Void)? = nil,
onCompleted: (() -> Void)? = nil,
onDisposed: (() -> Void)? = nil
) -> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
// 这儿是个要点
let observer = AnonymousObserver<Element> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
//这儿是个要点
// 这儿是Producer中的subsceibe办法
// observer
self.asObservable().subscribe(observer),
disposable
)
}
这个subscribe办法回来的是一个Disposable协议,要点在里边的let observer = AnonymousObserver { event in …},看下这个AnonymousObserver源码:
final class AnonymousObserver<Element>: ObserverBase<Element> {
typealias EventHandler = (Event<Element>) -> Void
private let eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self.eventHandler = eventHandler//这儿是要点:也是保存了一个闭包
}
override func onCore(_ event: Event<Element>) {
self.eventHandler(event)
}
#if TRACE_RESOURCES
deinit {
_ = Resources.decrementTotal()
}
#endif
}
AnonymousObserver类也是在初始化的时分EventHandler保存了外部进来的闭包,这个闭包是:
这个EventHandler包含了event,event其实便是subscribe函数传进来的onNext、onError、onCompleted、onDisposed,也便是EventHandler保存这些事情回调。
这个EventHandler是个闭包前面一句讲过了,那么它闭包的参数是什么呐?点击进去这Event
能够看到这是一个枚举,next、error、completed。它便是闭包EventHandler的参数event
回到信号发生的AnonymousObservable这个类这儿
上图括号里的参数便是create办法的闭包函数的那个参数observer
看这个AnyObserver,里边也是保存了一个闭包,这个闭包的参数也是这Event
还有很重要的一点AnyObserver承继自ObserverType。看下ObserverType源码,
create办法的闭包函数的那个参数observer的祖宗类便是这个ObserverType
public protocol ObserverType {
associatedtype Element
func on(_ event: Event<Element>)
}
/// Convenience API extensions to provide alternate next, error, completed events
extension ObserverType {
public func onNext(_ element: Element) {
self.on(.next(element))
}
public func onCompleted() {
self.on(.completed)
}
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
很好,这个时分就能够到第三 步了
发送信号
第二步中回来的Disposables中的subscribe的源码,如下:
class Producer<Element>: Observable<Element> {
override init() {
super.init()
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
// 首要看到这个run办法,是由子类来完成具体需要完成的内容
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
rxAbstractMethod()
}
}
第一步中的AnonymousObservable承继自现在的这个Producer,一起重写了这个run办法,把传过来的observer给AnonymousObservableSink,然后调sink.run(self)
这个subscribeHandler便是第一步中的闭包,AnyObserver(self)则是第一步中闭包的参数observer,这个时分闭包跳转终于串起来了。
这儿很显然是调了ObserverType的on办法,on办法的参数Event便是之前的看到的那个枚举,因而只有我调用这个onNext,那么就回调用第二个过程中的onNext,其间参数text便是”人在广东流浪现已…”。
小结