RxSwift调度者
案例引进
Schedulers
是RxSwift完成多线程的核心模块。它首要用于控制使命在哪个线程或行列运行。
咱们在平常的开发过程中,肯定都使用过网络恳求,网络恳求是在后台履行的,获取到数据之后,再在主线程更新UI。
先来一段代码引进
/// 子线程
DispatchQueue.global().async {
print("恳求数据")
let _ = self.actionBtn.rx.tap
.subscribe(onNext: { () in
/// 主线程(经过线程调度,调度回来,切换线程)
print("tap =--- \(Thread.current)")
})
}
再来一段网络恳求伪代码。
DispatchQueue.global(qos: .userInitiated).async {
let data = try? Data(contentsOf: url)
DispatchQueue.main.async {
// 更新UI
}
}
假如用RxSwift来完成上面的网络恳求,则大致是这样的:
let rxData: Observable<Data> = ...
rxData
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
.observeOn(MainScheduler.instance)
.subscribe(onNext: { [weak self](data) in
// 更新UI
})
.disposed(by: disposeBag)
阐明:
- 咱们用
subscribeOn
来决议数据序列的构建函数在哪个 Scheduler 上运行。 在上面的比如中,因为获取Data
需求花费很长的时刻,所以用subsribeOn
切换到 后台Scheduler 来获取Data
。这样就能够防止阻塞主线程。 - 咱们用
observeOn
来决议在哪个 Scheduler 监听这个数据序列。 在上面的比如中,经过observerOn
办法切换到主线程来监听并处理结果。
MainScheduler
MainScheduler代表主线程。假如需求履行和UI相关的使命,就需求切换到该Scheduler运行。
能够明晰的知道,在初始化时,在MainScheduler
目标的内部,绑定了主行列DispatchQueue.main
。
SerialDispatchQueueScheduler
SerialDispatchQueueScheduler笼统了串行DispatchQueue。假如需求履行一些串行使命,能够切换到这个Scheduler履行。
在初始化SerialDispatchQueueScheduler
目标时,需求传入一个DispatchQueue
,保存在self.configuration
结构体中。
ConcurrentDispatchQueueScheduler
ConcurrentDispatchQueueScheduler笼统了并行DispatchQueue。假如需求履行一些并发使命,能够切换到这个Scheduler履行。
OperationQueueScheduler
OperationQueueScheduler笼统了NSOperationQueue。它具备一些NSOperationQueue的特色。例如,能够经过设置maxConcurrentOperationCount
来控制一起履行并发使命的最大数量。
在初始化OperationQueueScheduler
目标时,需求传入OperationQueue
和优先级queuePriority
,作为初始化参数。
Scheduler的调度履行
所有的调度器Scheduler
都继承自ImmediateSchedulerType
协议。
而这个协议只声明晰一个schedule
办法,而经过注释能够知道,在调度器调度履行的时分都会调用这个schedule
办法。
咱们现在以SerialDispatchQueueScheduler
调度器为例:
/**
Schedules an action to be executed immediately.
- parameter state: State passed to the action to be executed.
- parameter action: Action to be executed.
- returns: The disposable object used to cancel the scheduled action (best effort).
*/
public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.scheduleInternal(state, action: action)
}
func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.configuration.schedule(state, action: action)
}
/**
Schedules an action to be executed.
- parameter state: State passed to the action to be executed.
- parameter dueTime: Relative time after which to execute the action.
- parameter action: Action to be executed.
- returns: The disposable object used to cancel the scheduled action (best effort).
*/
public final func scheduleRelative<StateType>(_ state: StateType, dueTime: RxTimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.configuration.scheduleRelative(state, dueTime: dueTime, action: action)
}
/**
Schedules a periodic piece of work.
- parameter state: State passed to the action to be executed.
- parameter startAfter: Period after which initial work should be run.
- parameter period: Period for running the work periodically.
- parameter action: Action to be executed.
- returns: The disposable object used to cancel the scheduled action (best effort).
*/
public func schedulePeriodic<StateType>(_ state: StateType, startAfter: RxTimeInterval, period: RxTimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
return self.configuration.schedulePeriodic(state, startAfter: startAfter, period: period, action: action)
}
剖析以上办法会发现,终究都会调用 self.configuration
的某个办法。而且检查几种调度器的源码能够知道,Scheduler 中都有一个重要的属性 let configuration: DispatchQueueConfiguration
。其间保存了咱们需求的行列和leeway信息。
那么,咱们就来剖析 DispatchQueueConfiguration
中的办法。
首先剖析schedule
办法,虽然schedule
办法中只要寥寥几句代码,可是也明晰的展现其核心逻辑:就是在当时行列下面,异步调度履行了闭包 action(state) 。
func scheduleRelative<StateType>(_ state: StateType, dueTime: Foundation.TimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
let deadline = DispatchTime.now() + dispatchInterval(dueTime)
let compositeDisposable = CompositeDisposable()
let timer = DispatchSource.makeTimerSource(queue: self.queue)
timer.schedule(deadline: deadline, leeway: self.leeway)
// 因篇幅原因,省掉部分代码 ...
timer.setEventHandler(handler: {
if compositeDisposable.isDisposed {
return
}
_ = compositeDisposable.insert(action(state))
cancelTimer.dispose()
})
timer.resume()
_ = compositeDisposable.insert(cancelTimer)
return compositeDisposable
}
func schedulePeriodic<StateType>(_ state: StateType, startAfter: TimeInterval, period: TimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
let initial = DispatchTime.now() + dispatchInterval(startAfter)
var timerState = state
let timer = DispatchSource.makeTimerSource(queue: self.queue)
timer.schedule(deadline: initial, repeating: dispatchInterval(period), leeway: self.leeway)
// 因篇幅原因,省掉部分代码 ...
timer.setEventHandler(handler: {
if cancelTimer.isDisposed {
return
}
timerState = action(timerState)
})
timer.resume()
return cancelTimer
}
以上两个办法中虽然没有直接在当时行列中异步调用闭包,可是创立timer
时,却是在当时行列中创立的,因此timer
回调时也是在当时行列履行eventHandler
,间接完成当时行列下的调度。
录入自|原文链接