一、RxSwift中心

1、observable订阅流程

RxObservable承继链:

iOS探索RxSwift核心续
Producer:Observable > subscribe > currentThreadScheduler > self.run() > setSinkAndSubscription()

subscribe:AnonymousObserver(内部观察者)<eventHandler> > [ObserverBase](on onCore dispose) > [ObserverType](on)

iOS探索RxSwift核心续

public class Observable<Element> : ObservableType {
  init() {
#if TRACE_RESOURCES
    _ = Resources.incrementTotal()
#endif
  }
  public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
    rxAbstractMethod()
  }
  public func asObservable() -> Observable<Element> { self }
  deinit {
#if TRACE_RESOURCES
    _ = Resources.decrementTotal()
#endif
  }
}
  • Observable完结了一个协议ObservableType,并且ObservableType协议承继自ObservableConvertibleType协议,所以在Observable中完结了两个协议办法:subscribeasObservable

  • subscribe办法没有详细完结的逻辑,需求子类去完结。

  • asObservable办法回来的是self,看似用处不大,其实不是这样的。asObservable是非常有用的,假如一类是Observable的子类,咱们能够直接回来self,假如不是Observable的子类,咱们能够经过重写这个协议办法来回来一个Observable对象,这样保证了协议的一致性。在运用的时分咱们能够直接写类似self.asObservable().subscribe(observer)这样的代码,有利于保持代码的简洁性,是杰出的封装性的体现。

  • _ = Resources.incrementTotal()_ = Resources.decrementTotal()这两行代码其实是RxSwift内部完结的一个引证计数。这部分内容我会在后面的文章中再详解。

AnonymousObservableSink 的作用

final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
  typealias Element = Observer.Element
  typealias Parent = AnonymousObservable<Element>
  // state
  private let isStopped = AtomicInt(0)
  #if DEBUG
    private let synchronizationTracker = SynchronizationTracker()
  #endif
  override init(observer: Observer, cancel: Cancelable) {
    super.init(observer: observer, cancel: cancel)
  }
  func on(_ event: Event<Element>) {
    #if DEBUG
      self.synchronizationTracker.register(synchronizationErrorMessage: .default)
      defer { self.synchronizationTracker.unregister() }
    #endif
    switch event {
    case .next:
      if load(self.isStopped) == 1 {
        return
      }
      self.forwardOn(event)
    case .error, .completed:
      if fetchOr(self.isStopped, 1) == 0 {
        self.forwardOn(event)
        self.dispose()
      }
    }
  }
  func run(_ parent: Parent) -> Disposable {
    parent.subscribeHandler(AnyObserver(self))
  }
}
  • AnonymousObservableSinkSink的子类,AnonymousObservableSink自身遵守ObseverType协议,与此同时完结了run办法,尽管没有完结subscribe办法,但是已经足够了,这样AnonymousObservableSink从某种程度来说也是Observable

  • AnonymousObservableSink是Observer和Observable的联接的桥梁,也能够理解成管道。它存储了_observer和销毁者_cancel。经过sink就能够完结从Observable到Obsever的转变。

  • run办法中的这行代码parent._subscribeHandler(AnyObserver(self)) ,其中parent是一个AnonymousObservable对象。_subscribeHandler这个block调用,代码会履行到创立序列时的block。然后会调用发送信号的代码obserber.onNext("发送信号"),然后代码会经过几个中间步骤会来到AnonymousObservableSink类的on办法。

  • 其中AnyObserver(self) 中的 self 便是observer是一个函数(function)on 。

iOS探索RxSwift核心续
完善中心流程图:
iOS探索RxSwift核心续

2、timer内部剖析

咱们常用的定时器:

  • NSTimer
  • GCD
  • CADiplayLink

直接上代码剖析:

//创立定时器
timer = Timer.init(timeInterval: 1, target: self, selector: #selector(timerFire), userInfo: nil, repeats: true)
timer.fire()
RunLoop.current.add(timer, forMode: .common)//加入runloop
//函数式写法
timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true, block: { timer in
     print(timer)
        })
//GCD写法
gcdTimer = DispatchSource.makeTimerSource()
gcdTimer?.schedule(deadline: .now(), repeating: .seconds(1))
gcdTimer?.setEventHandler(handler: {
      print("hello GCD")
    })
gcdTimer?.resume()
gcdTimer?.suspend()
gcdTimer?.cancel()
gcdTimer = nil
//CADisplayLink写法
cadTimer = CADisplayLink写法(target: self, selector: #selector(timerFire))
cadTimer?.preferredFramesPerSecond = 1
cadTimer?.add(to: .current, forMode: .common)

那假如运用RxSwift要怎么完结呢?怎么保证定时器的精确性呢?

Observable<Int>.timer(.seconds(1), period: .seconds(1), scheduler: MainScheduler.instance)
      .subscribe { result in
        print(result)
      }
      .disposed(by: disposeBag)

iOS探索RxSwift核心续
RxSwift的timer底层完结是GCD,咱们看下源码:
iOS探索RxSwift核心续
Timer 序列分为两种类型.

  • 一次性履行: 创立一个 Observable 在一段延时后,发生唯一的一个信号.一般用于某些只需求履行一次的推迟操作.如超时判别.
  • 重复屡次履行: 创立一个 Observable 在一段延时后,每隔一段时间发生一个信号.

跟踪点开timer

public static func timer(_ dueTime: RxTimeInterval, period: RxTimeInterval? = nil, scheduler: SchedulerType)
    -> Observable<Element> {
    return Timer(
      dueTime: dueTime,
      period: period,
      scheduler: scheduler
    )
  }

发现Timer 承继着Producer(生产者),在RxSwift 中,序列大部分都是承继者Producer的.咱们在规划一些代码时,经过会让功用类似的类承继着同一基类BaseClass,这儿能够理解为Producer便是这些序列的基类,在基类中完结了序列的一些根本操作.例如订阅功用Subscribe().

iOS探索RxSwift核心续
运转代码调试:
iOS探索RxSwift核心续
发现self.period有值履行如上代码,创立履行TimerSink > sink.run() 然后持续进入代码(run() -> Disposable):

func run() -> Disposable {
    return self.parent.scheduler.schedulePeriodic(0 as Observer.Element, startAfter: self.parent.dueTime, period: self.parent.period!) { state in
      self.lock.performLocked {    // 运用递归锁,避免互斥 RecursiveLock 
        self.forwardOn(.next(state)) //发送音讯 - 履行回调
        return state &+ 1 //state 加1
      }
    }
  }

持续跟踪断点:

iOS探索RxSwift核心续
判别假如disposed值发送改动,大概意思便是假如开端履行回收了,则直接回来,不履行 on发送音讯了。

归纳百川汇de->on函数

self.observer.on(event)这段代码便是履行(->onCore),回调闭包代码,便是咱们上面剖析的中心流程,作者的思维确实凶猛,归纳百川回流大海=on

开始了解sink思维:(AnonymousObserver水管(管理者或者生产者) -> (不同的管道)sink -><完结ObserverType>)on 函数在依据不同的 Element 类型进行分发履行回调函数(.next .error .completed)。

弥补:Observable的各种创立方式

// 首先来一个空的序列 - 本来序列事情是Int类型的,这儿调用emty函数 没有序列,只能complete
print("******** empty ********")
let emtyOb = Observable<Int>.empty()
_ = emtyOb.subscribe(onNext: { number in
    print("订阅:", number)
}, onError: { error in
    print("error:", error)
}, onCompleted: {
    print("完结回调")
}) {
    print("释放回调")
}
// 单个信号序列创立
let array = ["NY_EDU", "YYDS"]
Observable<[String]>.just(array)
            .subscribe { event in
        print(event)
      }.disposed(by: disposeBag)
    _ = Observable<[String]>.just(array)
      .subscribe(onNext: { number in
        print("订阅:", number)
      }, onError: { error in
        print("error:", error)
      }, onCompleted: {
        print("完结回调")
      }) {
        print("释放回调")
      }
// 多个元素 - 针对序列处理
Observable<String>.of("NY_EDU", "YYDS")
      .subscribe { event in
        print(event)
      }.disposed(by: disposeBag)
// 字典
Observable<[String: Any]>.of(["name":"LG_EDU", "age":18])
      .subscribe { event in
        print(event)
      }.disposed(by: disposeBag)
// 数组
Observable<[String]>.of(["NY_EDU", "YYDS"])
            .subscribe { event in
        print(event)
      }.disposed(by: disposeBag)
// 从调集中获取序列:数组,调集,set 获取序列 - 有可选项处理 - 更安全
Observable<[String]>.from(optional: ["NY_EDU", "YYDS"])
      .subscribe { event in
        print(event)
      }.disposed(by: disposeBag)    

总结

RXSwift中心原理

  • 三部曲 :创立序列(observable) ,订阅序列,发送信号(音讯)
  • 中心中的订阅流程:订阅subscribe->(创立AnonymousObserver点阅者,保存event闭包,创立AnonymousObservableSink 管道生产者) –AnonymousObservableSink:这个家伙干了很多事,咱们称它大号下水管道,依据不同的需求分流到不同的(sink)管道。
  • on函数:AnyObserver.on(‘我胡汉三又回来了‘)遵循了ObservableType协议
  • 分发(发送呼应):回调到详细的函数(.next .error .completed)代码块

然后咱们用RxSwift.timer 定时器来剖析了(内部完结原理):

  • Timer序列:分两种,一次性履行,重复屡次履行。
  • Timer是GCD:DispatchSourceTimer完结的定时器。
  • forwardOn:终究回到AnyObserver.on 回调函数。