一、RxSwift中心
1、observable订阅流程
RxObservable承继链:
Producer:Observable
> subscribe > currentThreadScheduler > self.run() > setSinkAndSubscription()
subscribe
:AnonymousObserver(内部观察者)<eventHandler>
> [ObserverBase](on onCore dispose)
> [ObserverType](on)
public class Observable<Element> : ObservableType {
init() {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
}
public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
rxAbstractMethod()
}
public func asObservable() -> Observable<Element> { self }
deinit {
#if TRACE_RESOURCES
_ = Resources.decrementTotal()
#endif
}
}
-
Observable
完结了一个协议ObservableType
,并且ObservableType
协议承继自ObservableConvertibleType
协议,所以在Observable
中完结了两个协议办法:subscribe
和asObservable
。 -
subscribe
办法没有详细完结的逻辑,需求子类去完结。 -
asObservable
办法回来的是self,看似用处不大,其实不是这样的。asObservable
是非常有用的,假如一类是Observable
的子类,咱们能够直接回来self,假如不是Observable
的子类,咱们能够经过重写这个协议办法来回来一个Observable
对象,这样保证了协议的一致性。在运用的时分咱们能够直接写类似self.asObservable().subscribe(observer)
这样的代码,有利于保持代码的简洁性,是杰出的封装性的体现。 -
_ = Resources.incrementTotal()
和_ = Resources.decrementTotal()
这两行代码其实是RxSwift内部完结的一个引证计数。这部分内容我会在后面的文章中再详解。
AnonymousObservableSink 的作用
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias Element = Observer.Element
typealias Parent = AnonymousObservable<Element>
// state
private let isStopped = AtomicInt(0)
#if DEBUG
private let synchronizationTracker = SynchronizationTracker()
#endif
override init(observer: Observer, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<Element>) {
#if DEBUG
self.synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self.synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self.isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self.isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
func run(_ parent: Parent) -> Disposable {
parent.subscribeHandler(AnyObserver(self))
}
}
-
AnonymousObservableSink
是Sink
的子类,AnonymousObservableSink
自身遵守ObseverType
协议,与此同时完结了run
办法,尽管没有完结subscribe
办法,但是已经足够了,这样AnonymousObservableSink
从某种程度来说也是Observable
。 -
AnonymousObservableSink
是Observer和Observable的联接的桥梁,也能够理解成管道。它存储了_observer
和销毁者_cancel
。经过sink就能够完结从Observable到Obsever的转变。 -
在
run
办法中的这行代码parent._subscribeHandler(AnyObserver(self))
,其中parent是一个AnonymousObservable
对象。_subscribeHandler
这个block调用,代码会履行到创立序列时的block。然后会调用发送信号的代码obserber.onNext("发送信号")
,然后代码会经过几个中间步骤会来到AnonymousObservableSink
类的on
办法。 -
其中
AnyObserver(self)
中的 self 便是observer是一个函数(function)on 。
完善中心流程图:
2、timer内部剖析
咱们常用的定时器:
NSTimer
GCD
CADiplayLink
直接上代码剖析:
//创立定时器
timer = Timer.init(timeInterval: 1, target: self, selector: #selector(timerFire), userInfo: nil, repeats: true)
timer.fire()
RunLoop.current.add(timer, forMode: .common)//加入runloop
//函数式写法
timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true, block: { timer in
print(timer)
})
//GCD写法
gcdTimer = DispatchSource.makeTimerSource()
gcdTimer?.schedule(deadline: .now(), repeating: .seconds(1))
gcdTimer?.setEventHandler(handler: {
print("hello GCD")
})
gcdTimer?.resume()
gcdTimer?.suspend()
gcdTimer?.cancel()
gcdTimer = nil
//CADisplayLink写法
cadTimer = CADisplayLink写法(target: self, selector: #selector(timerFire))
cadTimer?.preferredFramesPerSecond = 1
cadTimer?.add(to: .current, forMode: .common)
那假如运用RxSwift
要怎么完结呢?怎么保证定时器的精确性呢?
Observable<Int>.timer(.seconds(1), period: .seconds(1), scheduler: MainScheduler.instance)
.subscribe { result in
print(result)
}
.disposed(by: disposeBag)
RxSwift的timer
底层完结是GCD,咱们看下源码:
Timer 序列分为两种类型.
-
一次性履行
: 创立一个 Observable 在一段延时后,发生唯一的一个信号.一般用于某些只需求履行一次的推迟操作.如超时判别. -
重复屡次履行
: 创立一个 Observable 在一段延时后,每隔一段时间发生一个信号.
跟踪点开timer
:
public static func timer(_ dueTime: RxTimeInterval, period: RxTimeInterval? = nil, scheduler: SchedulerType)
-> Observable<Element> {
return Timer(
dueTime: dueTime,
period: period,
scheduler: scheduler
)
}
发现Timer
承继着Producer
(生产者
),在RxSwift 中,序列大部分都是承继者Producer
的.咱们在规划一些代码时,经过会让功用类似
的类承继着同一基类BaseClass
,这儿能够理解为Producer便是这些序列的基类,在基类中完结了序列的一些根本操作.例如订阅功用Subscribe()
.
运转代码调试:
发现self.period有值履行如上代码,创立履行TimerSink > sink.run()
然后持续进入代码(run() -> Disposable):
func run() -> Disposable {
return self.parent.scheduler.schedulePeriodic(0 as Observer.Element, startAfter: self.parent.dueTime, period: self.parent.period!) { state in
self.lock.performLocked { // 运用递归锁,避免互斥 RecursiveLock
self.forwardOn(.next(state)) //发送音讯 - 履行回调
return state &+ 1 //state 加1
}
}
}
持续跟踪断点: 判别假如disposed值发送改动,大概意思便是假如开端履行回收了,则直接回来,不履行 on发送音讯了。
归纳百川汇de->on函数
self.observer.on(event)
这段代码便是履行(->onCore),回调闭包代码,便是咱们上面剖析的中心流程,作者的思维确实凶猛,归纳百川回流大海=on
开始了解sink思维
:(AnonymousObserver水管(管理者或者生产者) -> (不同的管道)sink -><完结ObserverType>)on 函数在依据不同的 Element 类型进行分发履行回调函数(.next .error .completed)。
弥补:Observable的各种创立方式
// 首先来一个空的序列 - 本来序列事情是Int类型的,这儿调用emty函数 没有序列,只能complete
print("******** empty ********")
let emtyOb = Observable<Int>.empty()
_ = emtyOb.subscribe(onNext: { number in
print("订阅:", number)
}, onError: { error in
print("error:", error)
}, onCompleted: {
print("完结回调")
}) {
print("释放回调")
}
// 单个信号序列创立
let array = ["NY_EDU", "YYDS"]
Observable<[String]>.just(array)
.subscribe { event in
print(event)
}.disposed(by: disposeBag)
_ = Observable<[String]>.just(array)
.subscribe(onNext: { number in
print("订阅:", number)
}, onError: { error in
print("error:", error)
}, onCompleted: {
print("完结回调")
}) {
print("释放回调")
}
// 多个元素 - 针对序列处理
Observable<String>.of("NY_EDU", "YYDS")
.subscribe { event in
print(event)
}.disposed(by: disposeBag)
// 字典
Observable<[String: Any]>.of(["name":"LG_EDU", "age":18])
.subscribe { event in
print(event)
}.disposed(by: disposeBag)
// 数组
Observable<[String]>.of(["NY_EDU", "YYDS"])
.subscribe { event in
print(event)
}.disposed(by: disposeBag)
// 从调集中获取序列:数组,调集,set 获取序列 - 有可选项处理 - 更安全
Observable<[String]>.from(optional: ["NY_EDU", "YYDS"])
.subscribe { event in
print(event)
}.disposed(by: disposeBag)
总结
RXSwift中心原理
-
三部曲
:创立序列(observable) ,订阅序列,发送信号(音讯) -
中心中的订阅流程
:订阅subscribe->(创立AnonymousObserver点阅者,保存event闭包,创立AnonymousObservableSink 管道生产者) –AnonymousObservableSink
:这个家伙干了很多事,咱们称它大号下水管道,依据不同的需求分流到不同的(sink)管道。 -
on函数
:AnyObserver.on(‘我胡汉三又回来了
‘)遵循了ObservableType协议 -
分发(发送呼应)
:回调到详细的函数(.next .error .completed)代码块
然后咱们用RxSwift.timer 定时器来剖析了(内部完结原理):
-
Timer序列
:分两种,一次性履行,重复屡次履行。 -
Timer是GCD
:DispatchSourceTimer完结的定时器。 -
forwardOn
:终究回到AnyObserver.on 回调函数。