过错处理

到目前为止,在咱们编写的大部分代码中,咱们没有处理过错,而处理的都是“happy path”。在前面的文章中,咱们了解到,Combine Publisher 声明了两个约束:

  • Output界说 Publisher 宣布的值的类型;
  • Failure 界说 Publisher 宣布的失利的类型。

现在,咱们将深入了解 Failure 在 Publisher 中的作用。

过错

Never

失利类型为 Never 的 Publisher 表明永久不会宣布失利。它为这些 Publisher 供给了强壮的确保。这类 Publisher 可让咱们专注于运用值,一起必定确保 Publisher 只有成功完结的事情。

在新的 Playground 页面增加以下代码:

import Combine
import Foundation
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = truefunc example(_ desc: String, _ action:() -> Void) {
  print("--- (desc) ---")
  action()
}
​
var subscriptions = Set<AnyCancellable>()
​
example("Just") {
 Just("Hello")
}

咱们创立了一个带有 Hello 字符串值的 Just。 Just 是不会宣布失利的。 请按住 Command 并单击 Just 初始化程序并挑选 Jump to Definition,检查界说:

In contrast with Result.Publisher, a Just publisher can’t fail with an error. And unlike Optional.Publisher, a Just publisher always produces a value.

Combine 对 Never 的障确保不仅是理论上的,而是深深植根于结构及其各种 API 中。Combine 供给了几个 Operator,这些 Operator 仅在确保 Publisher 永久不会宣布失利事情时才可用。第一个是 sink 的变体,只处理值:

example("Just") {
 Just("Hello")
  .sink(receiveValue: { print($0) })
  .store(in: &subscriptions)
}

在上面的示例中,咱们运用 sink(receiveValue:) ,这种特定的重载使咱们能够忽略 Publisher 的完结事情,而只处理其宣布的值。

此重载仅适用于这类“牢靠”的 Publisher。在过错处理方面,Combine 是智能且安全的,假如或许抛出过错,它会强制咱们处理完结事情。要看到这一点,咱们需求将 Never 的 Publisher 变成或许宣布失利事情的 Publisher。

setFailureType(to:)

func setFailureType<E>(to failureType: E.Type) -> Publishers.SetFailureType<Self, E> where E : Error

Never Publisher 转变为或许宣布失利事情的 Publisher 的第一种办法是运用 setFailureType。这是另一个仅适用于失利类型为 Never 的 Publisher 的 Operator:

example("setFailureType") {
 Just("Hello")
  .setFailureType(to: MyError.self)
}

能够运用 .eraseToAnyPublisher(),来承认已改变的 Publisher 类型:

Combine | (V) Combine 中的错误处理和 Scheduler

持续修正上述代码:

enum MyError: Error {
  case ohNo
}
​
example("setFailureType") {
  Just("Hello")
    .setFailureType(to: MyError.self)
    .sink(
      receiveCompletion: { completion in
        switch completion {
        case .failure(.ohNo):
          print("Finished with OhNo!")
        case .finished:
          print("Finished successfully!")
        }
      },
      receiveValue: { value in
        print("Got value: (value)")
      }
    )
    .store(in: &subscriptions)
}

现在咱们只能运用 sink(receiveCompletion:receiveValue:)sink(receiveValue:) 重载不再可用,由于此 Publisher 或许会宣布失利事情。能够测验注释掉 receiveCompletion检查编译过错。

此外,失利类型为为 MyError,这使咱们能够针对.failure(.ohNo) 状况而无需进行不必要的强制转换来处理该过错。

当然,setFailureType 的作用只是类型界说。 由于原始 Publisher 是 Just,因而实践上也不会引发任何过错。

assign(to:on:)

assign Operator 仅适用于不会宣布失利事情的 Publisher,与 setFailureType 相同。 向供给的 keypath 发送过错会导致未界说的行为。增加以下示例进行测验:

example("assign(to:on:)") {
  class Person {
    var name = "Unknown"
  }
  let person = Person()
  print(person.name)
  Just("Layer")
    .handleEvents(
      receiveCompletion: { _ in 
        print(person.name) 
      }
    )
    .assign(to: .name, on: person)
    .store(in: &subscriptions)
}

咱们界说一个具有 name 特点的 Person 类。创立一个 Person 实例并当即打印其 name。一旦 Publisher 发送完结事情,运用 handleEvents 再次打印此 name。终究,运用 assignname 设置为 Publisher 宣布的值:

--- assign(to:on:) ---
Unknown
Layer

Just("Layer") 正下方增加以下行:

.setFailureType(to: Error.self)

这意味着它不再是 Publisher<String, Never>,而是现在的 Publisher<String, Error>。运转 Playground,咱们将进行验证:

Referencing instance method 'assign(to:on:)' on 'Publisher' requires the types 'any Error' and 'Never' be equivalent

assign(to:)

assign(to:on:) 有一个扎手的部分——它会 strong 捕获供给给 on 参数的目标。在上一个示例之后增加以下代码:

example("assign(to:)") {
 class MyViewModel: ObservableObject {
  @Published var currentDate = Date()
​
  init() {
   Timer.publish(every: 1, on: .main, in: .common)
    .autoconnect() 
    .prefix(3)
    .assign(to: .currentDate, on: self)
    .store(in: &subscriptions)
  }
 }
​
 let vm = MyViewModel()
 vm.$currentDate
  .sink(receiveValue: { print($0) })
  .store(in: &subscriptions)
}

咱们 MyViewModel 中界说一个 @Published 特点。 它的初始值为当时日期。在 init 中创立一个 Timer Publisher,它每秒宣布当时日期。运用 prefix Operator 只承受 3 个更新。运用 assign(to:on:) 将每个日期更新给@Published 特点。实例化 MyViewModelsink vm.$currentDate,并打印出每个值:

--- assign(to:) ---
2022-12-24 07:32:33 +0000
2022-12-24 07:32:34 +0000
2022-12-24 07:32:35 +0000
2022-12-24 07:32:36 +0000

看起来一切都很好。可是对assign(to:on:) 的调用创立了一个 strong 持有 self 的 Subscription。 导致 self 挂在Subscription 上,而 Subscription 挂在 self 上,创立了一个导致内存走漏的引证循环。

因而引入了该 Operator 的另一个重载 assign(to:)。该 Operator 经过对 Publisher 的 inout 引证来将值分配给 @Published 特点。因而以下两行:

.assign(to: .currentDate, on: self)
.store(in: &subscriptions)

能够被替换为:

.assign(to: &$currentDate)

运用 assign(to:) Operator 将 inout 引证 Publisher 会打破引证循环。此外,它会在内部主动处理 Subscription 的内存办理,这样咱们就能够省掉 store(in: &subscriptions)

assertNoFailure(_:file:line:)

当咱们在开发过程承认 Publisher 以失利事情完结时,assertNoFailure Operator 十分有用。它不会阻挠上游宣布失利事情。可是,假如它检测到过错,它会因过错而溃散:

example("assertNoFailure") {
 Just("Hello")
  .setFailureType(to: MyError.self)
  .assertNoFailure()
  .sink(receiveValue: { print("Got value: ($0) ")}) 
  .store(in: &subscriptions)
}

咱们运用 Just 创立一个“牢靠”的 Publisher 并将其过错类型设置为 MyError。假如 Publisher 以过错事情完结,则运用 assertNoFailure 以溃散。这会将 Publisher 的失利类型转回 Never。运用 sink 打印出任何接纳到的值。请留意,由于 assertNoFailure 将失利类型设置回 Never,因而 sink(receiveValue:) 重载能够直接运用。

运转 Playground,它能够正常作业:

--- assertNoFailure ---
Got value: Hello 

setFailureType 之后,增加以下行:

.tryMap { _ in throw MyError.ohNo }

一旦 Hello 被推送到下流,运用 tryMap 抛出过错。再次运转 Playground:

Playground execution failed:

error: Execution was interrupted, reason: EXC_BAD_INSTRUCTION (code=EXC_I386_INVOP, subcode=0x0).

...

frame #0: 0x00007fff232fbbf2 Combine`Combine.Publishers.AssertNoFailure...

由于 Publisher 宣布失利事情,playground 会 crash。 在某种程度上,咱们能够将 assertNoFailure() 视为代码的保护机制。 尽管咱们不该该在生产环境中运用它,但在开发过程中提前发现问题十分有用。

处理过错

try* Operator

Combine 供给了一个差异或许引发过错和或许不会引发过错的 Operator 的办法:try 前缀。

留意:Combine 中一切以 try 为前缀的 Operator 在遇到过错时的行为相同。咱们将只在本章中测验运用 tryMap Operator。

example("tryMap") {
  enum NameError: Error {
    case tooShort(String)
    case unknown
  }
  
  ["Aaaa", "Bbbbb", "Cccccc"]
    .publisher
    .map { value in
      return value.count
    }
    .sink(
      receiveCompletion: { print("Completed with ($0)") },
      receiveValue: { print("Got value: ($0)") }
    )
}

在上面的示例中,咱们界说一个 NameError 过错枚举。创立发布三个字符串的 Publisher。将每个字符串映射到它的长度。运转示例并检查操控台输出:

--- tryMap ---
Got value: 4
Got value: 5
Got value: 6
Completed with finished

将上面示例中的 map 替换为以下内容:

.tryMap { value -> Int in
  let length = value.count
  guard length >= 5 else {
    throw NameError.tooShort(value)
  }
  return value.count
}

咱们检查字符串的长度是否大于等于 5。否则,咱们会抛出过错:

--- tryMap ---
Completed with failure(Page_Contents.(unknown context at $10e3cb984).(unknown context at $10e3cba6c).(unknown context at $10e3cbaa8).NameError.tooShort("Aaaa"))

映射过错

maptryMap 之间的差异不仅仅是后者答应抛出过错。 map 继承了现有的失利类型并且只操作 Publisher 的值,但 tryMap 没有——它实践大将过错类型擦除为普通的 Swift 过错。 与带有 try 前缀的一切 Operator 都是如此。

example("map vs tryMap") {
 enum NameError: Error {
  case tooShort(String)
  case unknown
 }
​
 Just("Hello")
  .setFailureType(to: NameError.self)
  .map { $0 + " World!" }
  .sink(
   receiveCompletion: { completion in
    switch completion {
    case .finished:
     print("Done!")
    case .failure(.tooShort(let name)):
     print("(name) is too short!")
    case .failure(.unknown):
     print("An unknown name error occurred")
    }
   },
   receiveValue: { print("Got value ($0)") }
  )
  .store(in: &subscriptions)
}

咱们界说一个用于此示例的 NameError。创立一个只宣布字符串 HelloJust。运用 setFailureType 设置失利类型为 NameError。运用 map 将另一个字符串附加。终究,运用 sinkreceiveCompletionNameError 的每个状况打印出适当的消息。运转 Playground:

--- map vs tryMap ---
Got value Hello World!
Done!

Combine | (V) Combine 中的错误处理和 Scheduler

Completion 的失利类型是 NameError,这正是咱们想要的。 setFailureType 答应咱们专门针对 NameError 进行处理,例如 failure(.tooShort(let name))

map 更改为 tryMap

.tryMap { throw NameError.tooShort($0) }

咱们会当即留意到 Playground 不再编译。 再次点击 completion

Combine | (V) Combine 中的错误处理和 Scheduler

tryMap 删除了咱们的类型过错并将其替换为通用 Swift.Error 类型。即便咱们实践上并没有从 tryMap 中抛出过错,也会产生这种状况。

原因很简单:Swift 还不支持类型化 throws,尽管自 2015 年以来 Swift Evolution 中一向在讨论这个主题。这意味着当咱们运用带有 try 前缀的 Operator 时,咱们的过错类型将总是被抹去到最常见的父类:Swift.Error

一种办法是将通用过错手动转换为特定的过错类型,但这不是最理想的。它打破了严厉类型过错的整个意图。幸运的是,Combine 为这个问题供给了一个很好的解决方案,称为 mapError

在调用 tryMap 之后,增加以下行:

.mapError { $0 as? NameError ?? .unknown }

mapError 接纳上游 Publisher 抛出的任何过错,并将其映射到咱们想要的任何过错。在这种状况下,咱们能够运用它将过错转换回 NameError。这会将 Failure 康复为所需求的类型,并将咱们的 Publisher 转回 Publisher<String, NameError>。构建并运转 Playground,终究能够按预期编译和作业:

--- map vs tryMap ---
Hello is too short!

捕获过错并重试

很多时分,当咱们请求资源或履行某些核算时,失利或许是由于网络不稳定或其他资源不可用而导致的一次性事情。

在这些状况下,咱们通常会编写一个机制来重试不同的作业,盯梢测验次数,并处理假如一切测验都失利的状况。Combine 让这一切变得十分简单。

retry Operator 承受一个数字。假如 Publisher 失利,它将从头订阅上游并重试至咱们指定的次数。假如一切重试都失利,它将过错推送到下流,就像没有 retry Operator 一样:

example("Catching and retrying") {
  enum MyError: Error {
    case network
  }
  var service1 = PassthroughSubject<Int, MyError>()
  service1.send(completion: .failure(.network))
 
  service1
    .handleEvents(
      receiveSubscription: { _ in print("Trying ...") },
      receiveCompletion: {
        guard case .failure(let error) = $0 else { return }
        print("Got error: (error)")
      }
    )
    .retry(3)
    .sink(
      receiveCompletion: { print("($0)") },
      receiveValue: { number in
        print("Got Number: (number)")
      }
    )
    .store(in: &subscriptions)
}

咱们有一个 service1,它宣布了失利事情。因而,订阅 service1 必定会取得失利事情。咱们测验三次,并经过 handleEvents 打印订阅和完结:

--- Catching and retrying ---
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
failure(Page_Contents.(unknown context at $10fc7b584).(unknown context at $10fc7b77c).(unknown context at $10fc7b7b8).MyError.network)

运转 Playerground,咱们会看到有四次 Trying。初始 Trying,加上由 retry Operator 触发的三次重试。 由于 service1 不断失利,因而 Operator 会耗尽一切重试测验并将过错推送到 sink

调整代码:

example("Catching and retrying") {
  enum MyError: Error {
    case network
  }
  var service1 = PassthroughSubject<Int, MyError>()
  service1.send(completion: .failure(.network))
  
  service1
    .handleEvents(
      receiveSubscription: { _ in print("Trying ...") },
      receiveCompletion: {
        guard case .failure(let error) = $0 else { return }
        print("Got error: (error)")
      }
    )
    .retry(3)
    .replaceError(with: 1)
    .sink(
      receiveCompletion: { print("($0)") },
      receiveValue: { number in
        print("Got Number: (number)")
      }
    )
    .store(in: &subscriptions)
}

service1 重试后,若还是失利,咱们将经过 replaceError 将失利替换为 1:

--- Catching and retrying ---
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
Got Number: 1
finished

或者,咱们能够运用 catch 捕获 service1 的失利,并为下流供给另一个 Publisher:

example("Catching and retrying") {
  enum MyError: Error {
    case network
  }
  var service1 = PassthroughSubject<Int, MyError>()
  service1.send(completion: .failure(.network))
  var service2 = PassthroughSubject<Int, MyError>()
  
  service1
    .handleEvents(
      receiveSubscription: { _ in print("Trying ...") },
      receiveCompletion: {
        guard case .failure(let error) = $0 else { return }
        print("Got error: (error)")
      }
    )
    .retry(3)
    .catch { error in
      return service2
    }
    .sink(
      receiveCompletion: { print("($0)") },
      receiveValue: { number in
        print("Got Number: (number)")
      }
    )
    .store(in: &subscriptions)
  
  service2.send(2)
  service2.send(completion: .finished)
}

此时,下流将取得到 service2 宣布的值 2 和完结事情:

--- Catching and retrying ---
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
Trying ...
Got error: network
Got Number: 2
finished

cheduler

咱们现已遇到了一些将 Scheduler 作为参数的 Operator。大多数状况下,咱们会简单地运用 DispatchQueue.main,由于它方便、易于了解。除了 DispatchQueue.main,咱们必定现已运用了大局并发行列,或创立一个串行调度行列来运转操作。

可是为什么 Combine 需求一个新的类似概念呢?咱们接着将了解为什么会呈现 Scheduler 的概念,将探究 Combine 怎么使异步事情和操作更易于运用,当然,咱们还会试运用 Combine 供给的一切 Scheduler。

Scheduler 简介

根据 Apple 的文档,Scheduler 是一种界说何时及怎么履行闭包的协议。Scheduler 供给上下文以尽快或在将来的某个事情履行未来的操作。该操作便是协议自身中界说的闭包。闭包也能够躲藏 Publisher 在特定 Scheduler 上履行的某些值的传递。

咱们会留意到此界说有意防止对线程的任何引证,这是由于详细的完成是在 Scheduler 协议中,供给的“上下文”中的。因而,咱们的代码将在哪个线程上履行取决于挑选的 Scheduler。

记住这个重要的概念:Scheduler 不等于线程。咱们将在后面详细了解这对每个 Scheduler 意味着什么。让咱们从事情流的角度来看 Scheduler 的概念:

Combine | (V) Combine 中的错误处理和 Scheduler

咱们在上图中看到的内容:

  • 在主 (UI) 线程上产生用户操作,如按钮按下;

  • 它会触发一些作业在 Background Scheduler 上进行处理;

  • 要显现的终究数据在主线程上传递给 Subscriber,Subscriber 能够更新 UI。

咱们能够看到 Scheduler 的概念深深植根于前台/后台履行的概念。此外,根据咱们挑选的完成,作业能够串行化或并行化。

因而,要全面了解 Scheduler,需求检查哪些类契合 Scheduler 协议。首先,咱们需求了解与 Scheduler 相关的两个重要 Operator。

Scheduler Operator

Combine 供给了两个基本的 Operator 来运用 Scheduler:

  • subscribe(on:)subscribe(on:options:) 在指定的 Scheduler 上创立 Subscription(开端作业);
  • receive(on:)receive(on:options:) 在指定的 Scheduler 上传递值。

此外,以下 Operator 将 Scheduler 和 Scheduler options 作为参数:

  • debounce(for:scheduler:options:)

  • delay(for:tolerance:scheduler:options:)

  • measureInterval(using:options:)

  • throttle(for:scheduler:latest:)

  • timeout(_:scheduler:options:customError:)

subscribe(on:)receive(on:)

在咱们订阅它之前,Publisher 是一个无生命的实体。可是当咱们订阅 Publisher 时会产生什么?有几个过程:

Combine | (V) Combine 中的错误处理和 Scheduler

  1. Publiser receive Subscriber 并创立 Subscription;
  2. Subscriber receive Subscription 并从 Publiser 请求值(虚线);
  3. Publiser 开端作业(经过 Subscription);
  4. Publiser 宣布值(经过 Subscription);
  5. Operator 转换值;
  6. Subscriber 收到终究值。

当代码订阅 Publiser 时,过程一、二和三通常产生在当时线程上。 可是当咱们运用 subscribe(on:) Operator 时,一切这些操作都在咱们指定的 Scheduler 上运转。

咱们或许希望 Publiser 在后台履行一些昂贵的核算以防止堵塞主线程。 履行此操作的简单办法是运用 subscribe(on:)。以下是伪代码:

let queue = DispatchQueue(label: "serial queue")
let subscription = publisher
  .subscribe(on: queue)
  .sink { value in ...

假如咱们收到值后,想更新一些 UI 怎么办?咱们能够在闭包中履行类似 DispatchQueue.main.async { ... } 的操作,从主线程履行 UI 更新。有一种更有效的办法能够运用 Combine 的 receive(on:):

let subscription = publisher
  .subscribe(on: queue)
  .receive(on: DispatchQueue.main)
  .sink { value in ...

即便核算作业正常并从后台线程宣布结果,咱们现在也能够确保始终在主行列上接纳值。这是安全地履行 UI 更新所需求的。

Scheduler 完成

Apple 供给了几种 Scheduler 协议的详细完成:

  • ImmediateScheduler:一个简单的 Scheduler,它当即在当时线程上履行代码,这是默认的履行上下文,除非运用 subscribe(on:)receive(on:) 或任何其他将 Scheduler 作为参数的 Operator 进行修正。
  • RunLoop:绑定到 Foundation 的 Thread 目标。
  • DispatchQueue:能够是串行的或并发的。
  • OperationQueue:标准作业项履行的行列。

这里省掉了 TestScheduler,是一个虚拟的、模仿的 Scheduler,它是任何响应式编程结构测验时不可或缺的一部分。

ImmediateScheduler

在 Playground 中新增代码:

example("ImmediateScheduler") {
    let source = Timer
      .publish(every: 1.0, on: .main, in: .common)
      .autoconnect()
      .scan(0) { counter, _ in counter + 1 }
    let publisher = source
        .receive(on: ImmediateScheduler.shared)
        .eraseToAnyPublisher()
    publisher.sink(receiveValue: { _ in
        print(Thread.current)
    })
    .store(in: &subscriptions)
}

运转 Playground,咱们会看到 Publisher 宣布的每个值,都是在 MainThread 上:

--- ImmediateScheduler ---
<_NSMainThread: 0x129617390>{number = 1, name = main}
<_NSMainThread: 0x129617390>{number = 1, name = main}
<_NSMainThread: 0x129617390>{number = 1, name = main}
<_NSMainThread: 0x129617390>{number = 1, name = main}
<_NSMainThread: 0x129617390>{number = 1, name = main}

当时线程是主线程, ImmediateScheduler 当即在当时线程上调度。当咱们在 .receive(on: ImmediateScheduler.shared) 前增加一行:

.receive(on: DispatchQueue.global())

履行 Playground,咱们将在不同的线程收到值:

--- ImmediateScheduler ---
<NSThread: 0x12e7286c0>{number = 4, name = (null)}
<NSThread: 0x12e7286c0>{number = 4, name = (null)}
<NSThread: 0x11f005310>{number = 2, name = (null)}
<NSThread: 0x11f005310>{number = 2, name = (null)}
<NSThread: 0x12e7286c0>{number = 4, name = (null)}

ImmediateScheduler options 由于大多数 Operator 在其参数中承受 Scheduler,咱们还能够找到一个承受 SchedulerOptions 值的参数。在 ImmediateScheduler 的状况下,此类型被界说为 Never,因而在运用 ImmediateScheduler 时,咱们永久不该该为 Operator 的 options 参数传递值。

ImmediateScheduler 的圈套 关于 ImmediateScheduler 的一件事是它是即时的。咱们无法运用 Scheduler 协议的任何 schedule(after:) 变体,由于咱们需求指定的 SchedulerTimeType 没有初始化办法,关于 ImmediateScheduler 无意义。

RunLoop scheduler

RunLoop 早于 DispatchQueue,它是一种在线程等级办理输入源的办法。主线程有一个相关的 RunLoop,咱们还能够经过从当时线程调用 RunLoop.current 为任何线程获取一个 RunLoop。

在 Playground 中增加此代码:

example("RunLoop") {
    let source = Timer
      .publish(every: 1.0, on: .main, in: .common)
      .autoconnect()
      .scan(0) { counter, _ in counter + 1 }
    let publisher = source
        .receive(on: DispatchQueue.global())
        .handleEvents(receiveOutput: { _ in
            print("DispatchQueue.global: \(Thread.current)")
        })
        .receive(on: RunLoop.current)
        .handleEvents(receiveOutput: { _ in
            print("RunLoop.current: \(Thread.current)")
        })
        .eraseToAnyPublisher()
    publisher.sink(receiveValue: { _ in
    })
    .store(in: &subscriptions)
}

当时 RunLoop.current 便是主线程的 RunLoop。履行 Playground:

--- RunLoop ---
DispatchQueue.global: <NSThread: 0x12a71cd20>{number = 3, name = (null)}
RunLoop.current: <_NSMainThread: 0x12a705760>{number = 1, name = main}
DispatchQueue.global: <NSThread: 0x12a71cd20>{number = 3, name = (null)}
RunLoop.current: <_NSMainThread: 0x12a705760>{number = 1, name = main}
DispatchQueue.global: <NSThread: 0x12a71cd20>{number = 3, name = (null)}
RunLoop.current: <_NSMainThread: 0x12a705760>{number = 1, name = main}

每宣布一个值,都经过一个大局并发行列的线程,然后在主线程上持续。

RunLoop OptionsImmediateScheduler 一样,RunLoop 不供给 SchedulerOptions 参数。

RunLoop 圈套 RunLoop 的运用应仅限于主线程的 RunLoop,以及咱们在需求时操控的 Foundation 线程中可用的 RunLoop。要防止的一个是在 DispatchQueue 上履行的代码中运用 RunLoop.current。这是由于 DispatchQueue 线程或许是时间短的,这使得它们简直不或许依赖 RunLoop。

DispatchQueue Scheduler

DispatchQueue 契合 Scheduler 协议,并且完全可用于一切将 Scheduler 作为参数的 Operator。Dispatch 结构是 Foundation 的一个强壮组件,它答应咱们经过向体系办理的调度行列提交作业来在多核硬件上一起履行代码。DispatchQueue 能够是串行的(默认)或并发的。串行行列按次序履行你供给给它的一切作业项。并发行列将并行启动多个作业项,以最大极限地进步 CPU 运用率:

  • 串行行列通常用于确保某些操作不堆叠。因而,假如一切操作都产生在同一个行列中,他们能够运用共享资源而无需加锁。
  • 并发行列将一起履行尽或许多的操作。因而,它更适合纯核算。

咱们一向运用的最熟悉的行列是 DispatchQueue.main。它直接映射到主线程,在这个行列上履行的一切操作都能够自由地更新用户界面。 当然,UI 更新只能在主线程进行。一切其他行列,无论是串行的还是并发的,都在体系办理的线程池中履行它们的代码。这意味着咱们永久不该该对行列中运转的代码中的当时线程做出任何假定。尤其不该运用 RunLoop.current 来组织作业,由于 DispatchQueue 办理其线程的方式有不同。

一切调度行列共享同一个线程池,履行的串行行列将运用该池中的任何可用线程。一个直接的结果是,来自同一行列的两个连续作业项或许运用不同的线程,但仍能够按次序履行。这是一个重要的差异:当运用 subscribe(on:)receive(on:) 或任何其他有 Scheduler 参数的 Operator 时,咱们永久不该假定线程每次都是相同的。

在 Playground 中增加代码:

example("DispatchQueue") {
    let source = PassthroughSubject<Void, Never>()
    let sourceQueue = DispatchQueue.main
    let subscription = sourceQueue.schedule(after: sourceQueue.now,
                                            interval: .seconds(1)) {
        source.send()
    }
    .store(in: &subscriptions)
    let serialQueue = DispatchQueue(label: "Serial queue")
    source
        .handleEvents(receiveOutput: { _ in
            print("\(Thread.current)")
        })
        .receive(on: serialQueue)
        .handleEvents(receiveOutput: { _ in
            print("\(Thread.current)")
        })
        .sink(receiveValue: { _ in
        })
        .store(in: &subscriptions)
}

Timer 在主行列 sourceQueue 上触发并经过 source 发送 Void 值。接着在串行行列 serialQueue 上接纳值:

--- DispatchQueue ---
<_NSMainThread: 0x126f0a250>{number = 1, name = main}
<NSThread: 0x128025cd0>{number = 2, name = (null)}
<_NSMainThread: 0x126f0a250>{number = 1, name = main}
<NSThread: 0x1178243e0>{number = 6, name = (null)}
<_NSMainThread: 0x126f0a250>{number = 1, name = main}
<NSThread: 0x117904d90>{number = 5, name = (null)}
<_NSMainThread: 0x126f0a250>{number = 1, name = main}
<NSThread: 0x1178243e0>{number = 6, name = (null)}
<_NSMainThread: 0x126f0a250>{number = 1, name = main}
<NSThread: 0x1178243e0>{number = 6, name = (null)}

将 sourceQueue 也改为 DispatchQueue(label: "Serial queue"),也将在大局并发行列上宣布值:

--- DispatchQueue ---
<NSThread: 0x137e275b0>{number = 6, name = (null)}
<NSThread: 0x130905310>{number = 2, name = (null)}
<NSThread: 0x130905310>{number = 2, name = (null)}
<NSThread: 0x130905310>{number = 2, name = (null)}
<NSThread: 0x127e0f400>{number = 4, name = (null)}
<NSThread: 0x137e275b0>{number = 6, name = (null)}

DispatchQueue Options DispatchQueue 是仅有供给一组 Options 的 Scheduler,当 Operator 需求 SchedulerOptions 参数时,咱们能够传递这些 Options。首要围绕 QoS(服务质量)值,独立于 DispatchQueue 上已设置的值。例如:

.receive(
  on: serialQueue,
  options: DispatchQueue.SchedulerOptions(qos: .userInteractive)
)

咱们将 DispatchQueue.SchedulerOptions 的实例传递.userInteractive。在实践开发中运用这些 Options 有助于操作体系决定在一起有许多行列繁忙的状况下首先组织哪个使命。

OperationQueue Scheduler

由于 OperationQueue 在内部运用 Dispatch,因而在表面上简直没有差异:

example("OperationQueue") {
    let queue = OperationQueue()
    let subscription = (1...10).publisher
        .receive(on: queue)
        .print()
        .sink { value in
            print("Received \(value)")
        }
        .store(in: &subscriptions)
}

创立一个简单的 Publisher 宣布 1 到 10 之间的数字,然后打印该值,履行 Playground:

--- OperationQueue ---
receive subscription: (ReceiveOn)
request unlimited
receive value: (1)
Received 1
receive value: (8)
Received 8
receive value: (9)
Received 9
receive value: (6)
Received 6
receive value: (3)
Received 3
receive value: (5)
Received 5
receive finished
receive value: (10)
receive value: (4)
receive value: (7)
receive value: (2)

按次序宣布但无序抵达!咱们能够更改打印行以显现当时线程:

print("Received \(value) on thread \(Thread.current)")

再次履行 Playground:

--- OperationQueue ---
receive subscription: (ReceiveOn)
request unlimited
receive value: (4)
Received 4 on thread <NSThread: 0x14d720980>{number = 2, name = (null)}
receive value: (10)
Received 10 on thread <NSThread: 0x14d720980>{number = 2, name = (null)}
receive value: (3)
Received 3 on thread <NSThread: 0x14e833620>{number = 6, name = (null)}
receive value: (5)
Received 5 on thread <NSThread: 0x14e80dfd0>{number = 4, name = (null)}
receive value: (1)
Received 1 on thread <NSThread: 0x14d70d840>{number = 5, name = (null)}
receive finished
receive value: (2)
receive value: (9)
receive value: (8)
receive value: (6)

每个值都是在不同的线程上接纳的!假如咱们检查有关 OperationQueue 的文档,有一条关于线程的说明,OperationQueue 运用 Dispatch 结构(因而是 DispatchQueue)来履行操作。这意味着它不确保它会为每个交给的值运用相同的底层线程。

此外,每个 OperationQueue 中都有一个参数能够解释一切:它是 maxConcurrentOperationCount。它默认为体系界说的数字,答应操作行列一起履行很多操作。由于 Publisher 简直在同一时间宣布一切值,它们被 Dispatch 的并发行列分派到多个线程。

对代码进行一些修正:

queue.maxConcurrentOperationCount = 1

再次履行 Playground:

--- OperationQueue ---
receive subscription: (ReceiveOn)
request unlimited
receive value: (1)
Received 1 on thread <NSThread: 0x117609390>{number = 4, name = (null)}
receive value: (2)
Received 2 on thread <NSThread: 0x117609390>{number = 4, name = (null)}
receive value: (3)
Received 3 on thread <NSThread: 0x117609390>{number = 4, name = (null)}
receive value: (4)
Received 4 on thread <NSThread: 0x117609390>{number = 4, name = (null)}
receive value: (5)
Received 5 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive value: (6)
Received 6 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive value: (7)
Received 7 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive value: (8)
Received 8 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive value: (9)
Received 9 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive value: (10)
Received 10 on thread <NSThread: 0x117627160>{number = 6, name = (null)}
receive finished

这一次,咱们将取得真实的次序履行——将 maxConcurrentOperationCount 设置为 1 相当于运用串行行列。

OperationQueue Options OperationQueue 没有可用的 SchedulerOptions。它实践上是 RunLoop.SchedulerOptions 类型,自身没有供给任何 Options。

OperationQueue 圈套 咱们刚刚看到 OperationQueue 默认并发履行操作,咱们需求十分清楚这一点,由于它或许会给咱们带来费事。当咱们的 Publisher 宣布值时都有很多作业要履行时,它或许是一个很好的工具。咱们能够经过调整 maxConcurrentOperationCount 参数来操控负载。

内容参阅

  • Combine | Apple Developer Documentation;

  • 来自 Kodeco 的书籍《Combine: Asynchronous Programming with Swift》;

  • 对上述 Kodeco 书籍的汉语自译版 《Combine: Asynchronous Programming with Swift》收拾与补充。