前语
NSOperation是Foundation中用于多线程并发使命的抽象类,它能够放到NSOperationQueue使命行列中去履行。它底层还是基于GCD来完结的,相比GCD它支撑撤销操作和依靠管理。
Apple用swift重写了Foundation库并且开源,能够经过阅览NSOperation的swift源代码来了解其完结原理。
github.com/apple/swift…
在剖析源代码之前也请阅览一下苹果的《Concurrency Programming Guide》的Operation Queues一章,要了解怎么运用Operation。
同步和异步
NSOperation是抽象类,运用它需求子类承继它,并覆写一些成员办法。
它有两种形式,一种是同步履行,一种是异步履行。
同步履行,需求覆写main办法即可。
@interface MyOperation : NSOperation
@end
@implementation MyOperation
- (void)main {
// 要完结的事务逻辑写这儿即可
}
@end
当main办法回来的时分,使命也就完毕了。
异步履行,需求覆写start办法,一起完结asynchronous、concurrent、executing和finished属性。
@interface MyOperation : NSOperation {
BOOL _executing;
BOOL _finished;
}
@property (nonatomic, assign, getter = isExecuting) BOOL executing;
@property (nonatomic, assign, getter = isFinished) BOOL finished;
@end
@implementation MyOperation
@synthesize executing = _executing;
@synthesize finished = _finished;
- (void)start {
// 要完结的事务逻辑写这儿即可
}
- (BOOL)isAsynchronous {
return YES;
}
- (BOOL)isConcurrent {
return YES;
}
- (void)setFinished:(BOOL)finished {
[self willChangeValueForKey:@"isFinished"];
_finished = finished;
[self didChangeValueForKey:@"isFinished"];
}
- (void)setExecuting:(BOOL)executing {
[self willChangeValueForKey:@"isExecuting"];
_executing = executing;
[self didChangeValueForKey:@"isExecuting"];
}
@end
当start办法回来的时分,使命能够持续履行,直到使命真正履行完毕,我们履行self.finished=YES; self.executing=YES;经过KVO的办法告诉NSOperation使命现已履行完毕,能够从NSOperationQueue中移除此使命了。
OperationQueue
OperationQueue是使命的履行行列
初始化
OperationQueue有两个初始化办法:
一个是public办法给大家创建使命行列运用的
open class OperationQueue : NSObject, ProgressReporting {
// 使命行列名字
var __name: String?
// 对外的初始化办法
public override init() {
super.init()
__name = "NSOperationQueue (Unmanaged<OperationQueue>.passUnretained(self).toOpaque())"
}
}
另一个是internal办法,专门用来初始化mainQueue的
extension OperationQueue {
public static let defaultMaxConcurrentOperationCount: Int = -1
}
open class OperationQueue : NSObject, ProgressReporting {
// 是否是主线程行列
var __mainQ: Bool = false
// 并发数上限
var __maxNumOps: Int = OperationQueue.defaultMaxConcurrentOperationCount
// 实践并发数上限
var __actualMaxNumOps: Int32 = .max
// 使命行列名字
var __name: String?
// QOS,使命行列的优先级
var __propertyQoS: QualityOfService?
// asMainQueue是一个空元组,仅仅为了重载init办法,便利内部运用
internal init(asMainQueue: ()) {
super.init()
__mainQ = true
__maxNumOps = 1
__actualMaxNumOps = 1
__name = "NSOperationQueue Main Queue"
#if canImport(Darwin)
__propertyQoS = QualityOfService(qos_class_main())
#else
__propertyQoS = QualityOfService.userInteractive
#endif
}
// 回来主线程使命行列
open class var main: OperationQueue {
get {
struct Once {
// 运用static确保了变量只初始化一次
static let mainQ = OperationQueue(asMainQueue: ())
}
return Once.mainQ
}
}
}
增加使命
增加一个Operation到使命行列中,对外提供了两个办法,能够增加单个使命或许批量使命。
open class OperationQueue : NSObject, ProgressReporting {
// 增加单个使命
open func addOperation(_ op: Operation) {
_addOperations([op], barrier: false)
}
// 增加多个使命,一起支撑设置是否同步等候
open func addOperations(_ ops: [Operation], waitUntilFinished wait: Bool) {
_addOperations(ops, barrier: false)
if wait {
for op in ops {
op.waitUntilFinished()
}
}
}
}
对内提供了一个下划线最初的批量增加使命办法,一切的增加使命,都最终会走到此办法中
open class OperationQueue : NSObject, ProgressReporting {
internal func _addOperations(_ ops: [Operation], barrier: Bool = false) {
// 判空维护
if ops.isEmpty { return }
// 失利数
var failures = 0
// 成功数
var successes = 0
// 第一个新使命
var firstNewOp: Unmanaged<Operation>?
// 上一个新使命
var lastNewOp: Unmanaged<Operation>?
// 循环传入的使命数组
for op in ops {
// 对op的状况进行CAS修正,从initialized修正成enqueuing
if op._compareAndSwapState(.initialized, .enqueuing) {
// 假如修正成功,成功数累加1
successes += 1
// 判别之前的修正是否都成功
if 0 == failures {
// 走到这儿,阐明一切的使命的状况都正确修正成enqueuing
// retain一下使命目标
let retained = Unmanaged.passRetained(op)
// 假如使命的没有未完结的依靠使命,则为isReady为YES,缓存到_cachedIsReady
op._cachedIsReady = op.isReady
// GCD的工作目标
let schedule: DispatchWorkItem
// 创建GCD的工作目标
if let qos = op.__propertyQoS?.qosClass {
// 使命有设置过QoS,enforceQoS表明运用传入的qos
schedule = DispatchWorkItem.init(qos: qos, flags: .enforceQoS, block: {
// 当工作目标放入DispatchQueue的时分,会开端履行此block
// self是OperationQueue目标,履行其_schedule办法,来调度使命
self._schedule(op)
})
} else {
// 使命没有设置过Qos,assignCurrentContext表明运用当时线程的Qos
schedule = DispatchWorkItem(flags: .assignCurrentContext, block: {
// 重复代码,同上
self._schedule(op)
})
}
// 调用Operation的_adopt办法,设置__queue和__schedule成员变量
op._adopt(queue: self, schedule: schedule)
// 使命行列实践上是双向链表存储,设置使命的前后链表节点
op.__previousOperation = lastNewOp
op.__nextOperation = nil
if let lastNewOperation = lastNewOp?.takeUnretainedValue() {
lastNewOperation.__nextOperation = retained
} else {
firstNewOp = retained
}
lastNewOp = retained
} else {
// 前面的使命状况有修正失利的场景,回滚当时使命的状况修正
_ = op._compareAndSwapState(.enqueuing, .initialized)
}
} else {
// 假如修正失利,失利数累加1
failures += 1
}
}
// 判别是否有使命入队失利
if 0 < failures {
// 由于有失利状况产生,要回滚前面的操作,firstNewOp便是双向链表的头节点
while let firstNewOperation = firstNewOp?.takeUnretainedValue() {
// 取出下一个节点
let nextNewOp = firstNewOperation.__nextOperation
// 对当时使命目标做重置操作
firstNewOperation._invalidateQueue()
// 清理链接节点指针
firstNewOperation.__previousOperation = nil
firstNewOperation.__nextOperation = nil
// 回滚使命的状况
_ = firstNewOperation._compareAndSwapState(.enqueuing, .initialized)
// 释放使命
firstNewOp?.release()
// 持续处理后继节点
firstNewOp = nextNewOp
}
// 回滚完,还是要报fatalError
fatalError("operations finished, executing or already in a queue cannot be enqueued")
}
// Attach any operations pending attachment to main list
// 非堵塞使命
if !barrier {
// 加锁,由于后续要修正OperationQueue的成员变量
_lock()
// OperationCount自增1,这儿自增1不是很对啊
_incrementOperationCount()
}
// 前面现已处理好使命的状况,并生成了一个双向链表
// firstNewOp便是链表头,lastNewOp便是链表最后一个元素
// __firstOperation和__lastOperation是OperationQueue现已存在的双向链表
// 这儿是要把此次传入的使命链表,挂到OperationQueue的双向链表上去
var pending = firstNewOp
if let pendingOperation = pending?.takeUnretainedValue() {
let old_last = __lastOperation
pendingOperation.__previousOperation = old_last
if let old = old_last?.takeUnretainedValue() {
old.__nextOperation = pending
} else {
__firstOperation = pending
}
__lastOperation = lastNewOp
}
// pending便是firstNewOp,是链表表头,这儿便是循环处理使命链表
while let pendingOperation = pending?.takeUnretainedValue() {
if !barrier {
// 假如使命中有要堵塞的barrie使命,是经过把它增加到依靠使命中,来完结的
var barrierOp = _firstPriorityOperation(Operation.QueuePriority.barrier)
while let barrierOperation = barrierOp?.takeUnretainedValue() {
pendingOperation._addDependency(barrierOperation)
barrierOp = barrierOperation.__nextPriorityOperation
}
}
// 使命的状况从enqueuing改成enqueued
_ = pendingOperation._compareAndSwapState(.enqueuing, .enqueued)
// 依据__priorityValue和__propertyQoS设置一下使命的优先级
// 假如未设置过优先级,默许是Operation.QueuePriority.normal等级
var pri = pendingOperation.__priorityValue
if pri == nil {
let v = __actualMaxNumOps == 1 ? nil : pendingOperation.__propertyQoS
if let qos = v {
switch qos {
case .default: pri = Operation.QueuePriority.normal.rawValue
case .userInteractive: pri = Operation.QueuePriority.veryHigh.rawValue
case .userInitiated: pri = Operation.QueuePriority.high.rawValue
case .utility: pri = Operation.QueuePriority.low.rawValue
case .background: pri = Operation.QueuePriority.veryLow.rawValue
}
} else {
pri = Operation.QueuePriority.normal.rawValue
}
}
// 按照使命的优先级,还有一个单链表,链表节点字段是Operation的__nextPriorityOperation
// 单链表的头节点和尾节点是OperationQueue的__firstPriorityOperation和__lastPriorityOperation
// 它俩是元组目标,包含一切等级的单链表的头节点和尾节点
pendingOperation.__nextPriorityOperation = nil
if let old_last = _lastPriorityOperation(pri)?.takeUnretainedValue() {
old_last.__nextPriorityOperation = pending
} else {
_setFirstPriorityOperation(pri!, pending)
}
_setlastPriorityOperation(pri!, pending)
// 循环下一个链表
pending = pendingOperation.__nextOperation
}
// 非堵塞使命
if !barrier {
// 解锁,使命增加完毕
_unlock()
}
// 堵塞使命由调用方完结调度逻辑
if !barrier {
// 调度使命
_schedule()
}
}
}
调度使命
internal func _schedule() {
// 重试使命数组
var retestOps = [Operation]()
// 由于要对OperationQueue操作,所以要先加锁
_lock()
// 可用的槽位数
var slotsAvail = __actualMaxNumOps - __numExecOps
// 要循环一下优先级单链表上的使命
for prio in Operation.QueuePriority.priorities {
// 假如没有可用的槽位,或许当时使命行列被暂停,则跳出循环
if 0 >= slotsAvail || _suspended {
break
}
// 依据使命优先级获取使命优先级单链表的表头
var op = _firstPriorityOperation(prio)
// 前驱节点,移除当时节点的时分要运用
var prev: Unmanaged<Operation>?
while let operation = op?.takeUnretainedValue() {
// 维护判别一下
if 0 >= slotsAvail || _suspended {
break
}
// 后继节点
let next = operation.__nextPriorityOperation
// 是否要重试
var retest = false
// if the cached state is possibly not valid then the isReady value needs to be re-updated
// 假如使命状况现已是enqueued,并且没有未完结的依靠,则开端履行此使命
if Operation.__NSOperationState.enqueued == operation._state && operation._fetchCachedIsReady(&retest) {
// 从使命优先级单链表上移除此使命
if let previous = prev?.takeUnretainedValue() {
previous.__nextPriorityOperation = next
} else {
_setFirstPriorityOperation(prio, next)
}
if next == nil {
_setlastPriorityOperation(prio, prev)
}
// 现已从单链表上移除,清空后继节点
operation.__nextPriorityOperation = nil
// 状况修正成dispatching
operation._state = .dispatching
// __numExecOps自增1
_incrementExecutingOperations()
// 可用的槽位数减1
slotsAvail -= 1
// 选择GCD的Queue
let queue: DispatchQueue
if __mainQ {
// 假如是主线程,则运用主线程的GCD的Queue
queue = DispatchQueue.main
} else {
// 假如未设置__dispatch_queue,会依据QoS合成一个对应的GCD DispatchQueue
queue = __dispatch_queue ?? _synthesizeBackingQueue()
}
// 取出使命的GCD工作目标
if let schedule = operation.__schedule {
if operation is _BarrierOperation {
// 假如是Barrier堵塞使命,则调用GCD的时分也设置barrier参数
queue.async(flags: .barrier, execute: {
schedule.perform()
})
} else {
// 异步履行使命
queue.async(execute: schedule)
}
}
// 处理后续使命
op = next
} else {
// 假如使命还未准备好
if retest {
// 放到重试数组中
retestOps.append(operation)
}
// 持续循环后续使命
prev = op
op = next
}
}
}
// 解锁
_unlock()
// 尝试从头计算未准备好的使命的ready状况
for op in retestOps {
if op.isReady {
op._cachedIsReady = true
}
}
}
DispatchWorkItem的block会履行OperationQueue的_schedule(_ op: Operation)办法,便是下面这个办法,这个block是_addOperations的时分设置的
internal func _schedule(_ op: Operation) {
// 设置使命状况为starting
op._state = .starting
// set current tsd
// 这样设置今后,在当时线程中的start办法能够找到OperationQueue的引证,Operation没有对外露出其引证
// 便是OperationQueue.current
OperationQueue._currentQueue.set(self)
// 开端履行Operation的start办法
op.start()
// 重置currentQueue
OperationQueue._currentQueue.clear()
// We've just cleared _currentQueue storage.
// NSThreadSpecific doesn't release stored value on clear.
// This means `self` will leak if we don't release manually.
Unmanaged.passUnretained(self).release()
// 假如使命的isFinished回来YES,可是状况还不是finished,触发一下KVO,完毕当时使命
// 这个归于补偿逻辑,一般设置了finished = YES,按理说就应该触发其KVO
// unset current tsd
if op.isFinished && op._state.rawValue < Operation.__NSOperationState.finishing.rawValue {
Operation.observeValue(forKeyPath: _NSOperationIsFinished, ofObject: op)
}
}
履行使命
在同步形式的时分,还有一个_execute办法会履行
internal func _execute(_ op: Operation) {
// 同步形式能够反应使命进展
var operationProgress: Progress? = nil
if !(op is _BarrierOperation) && _isReportingProgress {
let opProgress = Progress(parent: nil, userInfo: nil)
opProgress.totalUnitCount = 1
progress.addChild(opProgress, withPendingUnitCount: 1)
operationProgress = opProgress
}
operationProgress?.becomeCurrent(withPendingUnitCount: 1)
defer { operationProgress?.resignCurrent() }
// 履行使命的main办法
op.main()
}
完毕使命
internal func _operationFinished(_ op: Operation, _ previousState: Operation.__NSOperationState) {
// There are only three cases where an operation might have a nil queue
// A) The operation was never added to a queue and we got here by a normal KVO change
// B) The operation was somehow already finished
// C) the operation was attempted to be added to a queue but an exception occured and was ignored...
// Option C is NOT supported!
// 判别使命是否是堵塞使命
let isBarrier = op is _BarrierOperation
// 加锁
_lock()
// 取后续使命,移除链表的时分要运用
let nextOp = op.__nextOperation
// 判别使命状况是否已完毕
if Operation.__NSOperationState.finished == op._state {
// 使命已履行完,讲使命移除链表
let prevOp = op.__previousOperation
if let prev = prevOp {
prev.takeUnretainedValue().__nextOperation = nextOp
} else {
__firstOperation = nextOp
}
if let next = nextOp {
next.takeUnretainedValue().__previousOperation = prevOp
} else {
__lastOperation = prevOp
}
// only decrement execution count on operations that were executing! (the execution was initially set to __NSOperationStateDispatching so we must compare from that or later)
// else the number of executing operations might underflow
// 减少正在履行的使命数
if previousState.rawValue >= Operation.__NSOperationState.dispatching.rawValue {
_decrementExecutingOperations()
}
// 重置使命的前后链表节点
op.__previousOperation = nil
op.__nextOperation = nil
// 重置使命行列
op._invalidateQueue()
}
// 减少使命数
if !isBarrier {
_decrementOperationCount()
}
// 解锁
_unlock()
// 在调度一下其他使命
_schedule()
// 释放使命
if previousState.rawValue >= Operation.__NSOperationState.enqueuing.rawValue {
Unmanaged.passUnretained(op).release()
}
}
Operation
使命状况
Operation有一个__state成员变量,表明当时使命状况,__NSOperationState枚举有8个值,初始化的时分initialized,每次对__state状况修正,都有__atomicLoad锁来维护。
open class Operation : NSObject {
// 使命状况枚举
enum __NSOperationState : UInt8 {
case initialized = 0x00
case enqueuing = 0x48
case enqueued = 0x50
case dispatching = 0x88
case starting = 0xD8
case executing = 0xE0
case finishing = 0xF0
case finished = 0xF4
}
// 使命状况
internal var __state: __NSOperationState = .initialized
// 负责同步使命状况的锁
var __atomicLoad = NSLock()
// 使命状况的get和set办法都由__atomicLoad来同步
internal var _state: __NSOperationState {
get {
__atomicLoad.lock()
defer { __atomicLoad.unlock() }
return __state
}
set(newValue) {
__atomicLoad.lock()
defer { __atomicLoad.unlock() }
__state = newValue
}
}
// CAS办法,用来修正使命状况
internal func _compareAndSwapState(_ old: __NSOperationState, _ new: __NSOperationState) -> Bool {
__atomicLoad.lock()
defer { __atomicLoad.unlock() }
if __state != old { return false }
__state = new
return true
}
}
初始化
Operation的init办法是一个空完结,子类完结的时分能够重载init办法,带一些事务数据字段,将来能够在start办法中运用,比方,下载的链接地址NSURL目标。
open class Operation : NSObject {
public override init() { }
}
start
OperationQueue调度使命履行的时分,最终会履行使命的start办法,假如是异步形式,子类是要覆写start办法的,假如是同步形式,start办法不用动,start办法内部最终会调用main办法。
open func start() {
let state = _state
// 判别一下使命的状况,假如现已完毕就直接回来
if __NSOperationState.finished == state { return }
// 判别使命是否是重复履行
if !_compareAndSwapState(__NSOperationState.initialized, __NSOperationState.starting) && !(__NSOperationState.starting == state && __queue != nil) {
switch state {
case .executing: fallthrough
case .finishing:
fatalError("(self): receiver is already executing")
default:
fatalError("(self): something is trying to start the receiver simultaneously from more than one thread")
}
}
// 使命状况是已入队,可是还没有ready,就开端履行,也要报错
if state.rawValue < __NSOperationState.enqueued.rawValue && !isReady {
_state = state
fatalError("(self): receiver is not yet ready to execute")
}
// 判别使命是否已撤销
let isCanc = _isCancelled
if !isCanc {
// 使命未撤销,状况改成executing开端履行
_state = .executing
Operation.observeValue(forKeyPath: _NSOperationIsExecuting, ofObject: self)
// 假如使命放到了行列中,就调用行列的_execute办法,不然直接履行main办法
_queue?._execute(self) ?? main()
}
// 完毕使命
if __NSOperationState.executing == _state {
_state = .finishing
Operation.observeValue(forKeyPath: _NSOperationIsExecuting, ofObject: self)
Operation.observeValue(forKeyPath: _NSOperationIsFinished, ofObject: self)
} else {
_state = .finishing
Operation.observeValue(forKeyPath: _NSOperationIsFinished, ofObject: self)
}
}
KVO监控处理
internal static func observeValue(forKeyPath keyPath: String, ofObject op: Operation) {
enum Transition {
case toFinished
case toExecuting
case toReady
}
// 依据keyPath来设置kind
let kind: Transition?
if keyPath == _NSOperationIsFinished || keyPath == _NSOperationIsFinishedAlternate {
kind = .toFinished
} else if keyPath == _NSOperationIsExecuting || keyPath == _NSOperationIsExecutingAlternate {
kind = .toExecuting
} else if keyPath == _NSOperationIsReady || keyPath == _NSOperationIsReadyAlternate {
kind = .toReady
} else {
kind = nil
}
if let transition = kind {
switch transition {
case .toFinished: // we only care about NO -> YES
// 触发了isFinished KVO
if !op.isFinished {
// 误触发,isFinished回来NO,先回来
return
}
// 以及ready的依靠使命
var ready_deps = [Operation]()
op._lock()
let state = op._state
// 打印警告信息,使命还未开端履行,就现已设置了isFinished KVO
if op.__queue != nil && state.rawValue < __NSOperationState.starting.rawValue {
print("*** (type(of: op)) (Unmanaged.passUnretained(op).toOpaque()) went isFinished=YES without being started by the queue it is in")
}
if state.rawValue < __NSOperationState.finishing.rawValue {
op._state = .finishing
} else if state == .finished {
op._unlock()
return
}
let down_deps = op.__downDependencies
op.__downDependencies.removeAll()
if 0 < down_deps.count {
for down in down_deps {
let idown = down.contents.takeUnretainedValue()
idown._lock()
if idown._unfinishedDependencyCount == 1 {
ready_deps.append(idown)
} else if idown._unfinishedDependencyCount > 1 {
idown._decrementUnfinishedDependencyCount()
} else {
assert(idown._unfinishedDependencyCount == 0)
assert(idown._isCancelled == true)
}
idown._unlock()
}
}
// 使命完毕,修正状况
op._state = .finished
// 重置使命行列引证
let oq = op.__queue
op.__queue = nil
// 是否使命锁
op._unlock()
if 0 < ready_deps.count {
for down in ready_deps {
down._lock()
if down._unfinishedDependencyCount >= 1 {
down._decrementUnfinishedDependencyCount()
}
down._unlock()
Operation.observeValue(forKeyPath: _NSOperationIsReady, ofObject: down)
}
}
// waitUntilFinished办法能够回来了
op.__waitCondition.lock()
op.__waitCondition.broadcast()
op.__waitCondition.unlock()
// 假如设置__completion回调,这异步履行一下回调
if let complete = op.__completion {
let held = Unmanaged.passRetained(op)
DispatchQueue.global(qos: .default).async {
complete()
held.release()
}
}
// 从使命行列中移除现已完结的使命
if let queue = oq {
queue.takeUnretainedValue()._operationFinished(op, state)
queue.release()
}
case .toExecuting:
let isExecuting = op.isExecuting
op._lock()
// 更新状况为正在履行
if op._state.rawValue < __NSOperationState.executing.rawValue && isExecuting {
op._state = .executing
}
op._unlock()
case .toReady:
// 使命现已ready,调度履行一下使命
let r = op.isReady
op._cachedIsReady = r
let q = op._queue
if r {
q?._schedule()
}
}
}
}
总结
以上便是Operation的核心处理流程,通读一遍源代码,对了解Operation Queue有非常大的协助。