RxSwift调度者

案例引进

Schedulers是RxSwift完成多线程的核心模块。它首要用于控制使命在哪个线程或行列运行。

咱们在平常的开发过程中,肯定都使用过网络恳求,网络恳求是在后台履行的,获取到数据之后,再在主线程更新UI。

先来一段代码引进

/// 子线程
DispatchQueue.global().async {
    print("恳求数据")
   let _ = self.actionBtn.rx.tap
    .subscribe(onNext: { () in
        /// 主线程(经过线程调度,调度回来,切换线程)
        print("tap =--- \(Thread.current)")
    })
}

RxSwift学习之 调度者

再来一段网络恳求伪代码。

DispatchQueue.global(qos: .userInitiated).async {
    let data = try? Data(contentsOf: url)
    DispatchQueue.main.async {
        // 更新UI
    }
}

假如用RxSwift来完成上面的网络恳求,则大致是这样的:

let rxData: Observable<Data> = ...
rxData
	.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
	.observeOn(MainScheduler.instance)
	.subscribe(onNext: { [weak self](data) in
    	// 更新UI
	})
	.disposed(by: disposeBag)

阐明:

  1. 咱们用 subscribeOn 来决议数据序列的构建函数在哪个 Scheduler 上运行。 在上面的比如中,因为获取 Data 需求花费很长的时刻,所以用 subsribeOn 切换到 后台Scheduler 来获取 Data 。这样就能够防止阻塞主线程。
  2. 咱们用 observeOn 来决议在哪个 Scheduler 监听这个数据序列。 在上面的比如中,经过 observerOn 办法切换到主线程来监听并处理结果。

MainScheduler

MainScheduler代表主线程。假如需求履行和UI相关的使命,就需求切换到该Scheduler运行。

RxSwift学习之 调度者

能够明晰的知道,在初始化时,在MainScheduler目标的内部,绑定了主行列DispatchQueue.main

SerialDispatchQueueScheduler

SerialDispatchQueueScheduler笼统了串行DispatchQueue。假如需求履行一些串行使命,能够切换到这个Scheduler履行。

RxSwift学习之 调度者

在初始化SerialDispatchQueueScheduler目标时,需求传入一个DispatchQueue,保存在self.configuration结构体中。

ConcurrentDispatchQueueScheduler

ConcurrentDispatchQueueScheduler笼统了并行DispatchQueue。假如需求履行一些并发使命,能够切换到这个Scheduler履行。

RxSwift学习之 调度者

OperationQueueScheduler

OperationQueueScheduler笼统了NSOperationQueue。它具备一些NSOperationQueue的特色。例如,能够经过设置maxConcurrentOperationCount来控制一起履行并发使命的最大数量。

RxSwift学习之 调度者

在初始化OperationQueueScheduler目标时,需求传入OperationQueue优先级queuePriority,作为初始化参数。

Scheduler的调度履行

所有的调度器Scheduler都继承自ImmediateSchedulerType协议。

RxSwift学习之 调度者

而这个协议只声明晰一个schedule办法,而经过注释能够知道,在调度器调度履行的时分都会调用这个schedule办法。

咱们现在以SerialDispatchQueueScheduler调度器为例:

/**
Schedules an action to be executed immediately.
- parameter state: State passed to the action to be executed.
- parameter action: Action to be executed.
- returns: The disposable object used to cancel the scheduled action (best effort).
    */
public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
    return self.scheduleInternal(state, action: action)
}
func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
    return self.configuration.schedule(state, action: action)
}
/**
  Schedules an action to be executed.
  - parameter state: State passed to the action to be executed.
  - parameter dueTime: Relative time after which to execute the action.
  - parameter action: Action to be executed.
  - returns: The disposable object used to cancel the scheduled action (best effort).
  */
  public final func scheduleRelative<StateType>(_ state: StateType, dueTime: RxTimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
      return self.configuration.scheduleRelative(state, dueTime: dueTime, action: action)
  }
/**
  Schedules a periodic piece of work.
  - parameter state: State passed to the action to be executed.
  - parameter startAfter: Period after which initial work should be run.
  - parameter period: Period for running the work periodically.
  - parameter action: Action to be executed.
  - returns: The disposable object used to cancel the scheduled action (best effort).
  */
  public func schedulePeriodic<StateType>(_ state: StateType, startAfter: RxTimeInterval, period: RxTimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
      return self.configuration.schedulePeriodic(state, startAfter: startAfter, period: period, action: action)
  }

剖析以上办法会发现,终究都会调用 self.configuration 的某个办法。而且检查几种调度器的源码能够知道,Scheduler 中都有一个重要的属性 let configuration: DispatchQueueConfiguration。其间保存了咱们需求的行列和leeway信息。

那么,咱们就来剖析 DispatchQueueConfiguration 中的办法。

RxSwift学习之 调度者

首先剖析schedule办法,虽然schedule办法中只要寥寥几句代码,可是也明晰的展现其核心逻辑就是在当时行列下面,异步调度履行了闭包 action(state)

func scheduleRelative<StateType>(_ state: StateType, dueTime: Foundation.TimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
    let deadline = DispatchTime.now() + dispatchInterval(dueTime)
    let compositeDisposable = CompositeDisposable()
    let timer = DispatchSource.makeTimerSource(queue: self.queue)
    timer.schedule(deadline: deadline, leeway: self.leeway)
    // 因篇幅原因,省掉部分代码 ...
    timer.setEventHandler(handler: {
        if compositeDisposable.isDisposed {
            return
        }
        _ = compositeDisposable.insert(action(state))
        cancelTimer.dispose()
    })
    timer.resume()
    _ = compositeDisposable.insert(cancelTimer)
    return compositeDisposable
}
func schedulePeriodic<StateType>(_ state: StateType, startAfter: TimeInterval, period: TimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
    let initial = DispatchTime.now() + dispatchInterval(startAfter)
    var timerState = state
    let timer = DispatchSource.makeTimerSource(queue: self.queue)
    timer.schedule(deadline: initial, repeating: dispatchInterval(period), leeway: self.leeway)
    // 因篇幅原因,省掉部分代码 ...
    timer.setEventHandler(handler: {
        if cancelTimer.isDisposed {
            return
        }
        timerState = action(timerState)
    })
    timer.resume()
    return cancelTimer
}

以上两个办法中虽然没有直接在当时行列中异步调用闭包,可是创立timer时,却是在当时行列中创立的,因此timer回调时也是在当时行列履行eventHandler,间接完成当时行列下的调度。

录入自|原文链接

分享:材料下载