时刻操作操作符(Time Manipulation Operators)

反应式编后的中心思维是跟着时刻的推移处理异步事情流。Combine 供给了一系列答应咱们和时刻相关的 Operator:跟着时刻的推移,序列对值做出处理。Combine 办理序列的时刻维度简略直接,这是 Combine 这的优势。

本节演示运用的 Playground 根底代码:

import Combine
import Foundation
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
func example(_ desc: String, _ action:() -> Void) {
    print("--- \(desc) ---")
    action()
}
var subscriptions = Set<AnyCancellable>()
Timer.publish(every: 1.0, on: .main, in: .common)
    .autoconnect()
    .scan(-1, { last, _ in return last + 1 })
    .sink { print("\($0) second has passed...") }
    .store(in: &subscriptions)

咱们运用 needsIndefiniteExecution 使 Playground 无限的执行。example 是现已熟知的帮咱们封装每个 example 的办法。subscriptions 帮咱们处理 Subscription。最终的 Timer 帮咱们在每 1 秒曩昔后打印信息。

Timer 是 Foundation Timer 类的 Combine 扩展。 它需求一个 RunLoop 和 RunLoop.Mode,Timer 是可衔接的(connectable) Publisher 类的一部分,需求在开始宣布值之前被衔接。咱们运用 autoconnect() 在第一次订阅时当即衔接。咱们将在后文了解更多 Timer 的信息。

时移值

delay(for:tolerance:scheduler:options)

func delay<S>(
    for interval: S.SchedulerTimeType.Stride,
    tolerance: S.SchedulerTimeType.Stride? = nil,
    scheduler: S,
    options: S.SchedulerOptions? = nil
) -> Publishers.Delay<Self, S> where S : Scheduler

最基本的时刻操作 Operator 是推迟来自 Publisher 的值,使其比实际呈现的时刻晚。delay(for:tolerance:scheduler:options) Operator 对整个值序列进行时移:每次上游 Publisher 宣布一个值时,该 Operator 都会将其保存一段时刻,然后在要求的推迟后,在指定的 Scheduler 上宣布值。

在 Playground 上增加代码:

example("delay") {
    let valuesPerSecond = 1.0
    let delayInSeconds = 1.2
    let sourcePublisher = PassthroughSubject<Date, Never>()
    let delayedPublisher = sourcePublisher.delay(for: .seconds(delayInSeconds), scheduler: DispatchQueue.main)
    let subscription = Timer
        .publish(every: 1.0 / valuesPerSecond, on: .main, in: .common)
        .autoconnect()
        .subscribe(sourcePublisher)
        .store(in: &subscriptions)
    sourcePublisher
        .sink {  print("sourcePublisher: \($0)") }
        .store(in: &subscriptions)
    delayedPublisher
        .sink {  print("delayedPublisher: \($0)") }
        .store(in: &subscriptions)
}

咱们界说了两个常量,别离表明每秒宣布值的数量、推迟宣布值的秒。

咱们界说了 sourcePublisher,咱们将供给 Timer 宣布的 Date 类型的值。 值实际的类型其实并不重要,咱们只关心 Publisher 在宣布值被推迟。

delayPublisher 将推迟来自 sourcePublisher 的值,并将它们发送到 DispatchQueue.main。咱们后续将了解所有关于 Scheduler 的更多内容。

创立一个在主线程上每秒发送一个值的 Timer。 运用 autoconnect() 当即发动它,并经过 sourcePublisher 接纳它宣布的值。

最终,咱们再别离订阅 sourcePublisherdelayPublisher,增加一些 print 了解正在有值被宣布。

咱们用弹珠图描绘上述进程,sourcePublisher 宣布的值,将被推迟 1.2 秒后,被 delayPublisher 宣布:

delay.png

运转 Playground,咱们将看到:

--- delay ---
0 second has passed...
sourcePublisher: 2022-12-10 07:46:26 +0000
1 second has passed...
sourcePublisher: 2022-12-10 07:46:27 +0000
delayedPublisher: 2022-12-10 07:46:26 +0000
2 second has passed...
sourcePublisher: 2022-12-10 07:46:28 +0000
delayedPublisher: 2022-12-10 07:46:27 +0000
3 second has passed...
sourcePublisher: 2022-12-10 07:46:29 +0000
delayedPublisher: 2022-12-10 07:46:28 +0000
4 second has passed...
sourcePublisher: 2022-12-10 07:46:30 +0000
delayedPublisher: 2022-12-10 07:46:29 +0000
...

搜集值

collect(_:options:)

func collect<S>(
    _ strategy: Publishers.TimeGroupingStrategy<S>,
    options: S.SchedulerOptions? = nil
) -> Publishers.CollectByTime<Self, S> where S : Scheduler

在某些情况下,咱们或许需求以指定的时刻距离从 Publisher 那里搜集值,这是一种缓冲方式。 例如,咱们想要搜集某时刻段内的一组值并核算平均值并输出。

将之前 Playground 的 example 代码注释或删除,增加以下代码:

example("collect") {
    let valuesPerSecond = 1.0
    let collectTimeStride = 3.0
    let sourcePublisher = PassthroughSubject<Date, Never>()
    let collectedPublisher = sourcePublisher
        .collect(.byTime(DispatchQueue.main, .seconds(collectTimeStride)))
    let subscription = Timer
        .publish(every: 1.0 / valuesPerSecond, on: .main, in: .common)
        .autoconnect()
        .subscribe(sourcePublisher)
        .store(in: &subscriptions)
    sourcePublisher
        .sink {  print("sourcePublisher: \($0)") }
        .store(in: &subscriptions)
    collectedPublisher
        .sink {  print("collectedPublisher:\($0)") }
        .store(in: &subscriptions)
}

咱们界说了两个常量,别离表明每秒宣布值的数量、期望搜集一次值的时刻距离。

咱们界说了 sourcePublisher,将接纳 Timer 宣布的 Date 类型的值并再宣布。

collectedPublisher 将搜集来自 sourcePublisher 的值,并将它们发送到 DispatchQueue.main

最终,咱们再别离订阅 sourcePublishercollectedPublisher

咱们用弹珠图描绘上述进程,sourcePublisher 宣布的值,将被 collectedPublisher 搜集宣布:

collect.png

运转 Playground,咱们将看到:

--- collect ---
0 second has passed...
sourcePublisher: 2022-12-10 08:16:19 +0000
1 second has passed...
sourcePublisher: 2022-12-10 08:16:20 +0000
2 second has passed...
sourcePublisher: 2022-12-10 08:16:21 +0000
collectedPublisher:[2022-12-10 08:16:19 +0000, 2022-12-10 08:16:20 +0000, 2022-12-10 08:16:21 +0000]
3 second has passed...
sourcePublisher: 2022-12-10 08:16:22 +0000
4 second has passed...
sourcePublisher: 2022-12-10 08:16:23 +0000
5 second has passed...
sourcePublisher: 2022-12-10 08:16:24 +0000
collectedPublisher:[2022-12-10 08:16:22 +0000, 2022-12-10 08:16:23 +0000, 2022-12-10 08:16:24 +0000]
6 second has passed...
sourcePublisher: 2022-12-10 08:16:25 +0000
7 second has passed...
sourcePublisher: 2022-12-10 08:16:26 +0000

推迟事情

咱们或许期望在用户查找时,请求回来输入联想的展现。当然,不能在用户输入一个字母时都发送请求,需求有某种机制来操控请求的时机:能够仅在用户完结一段时刻的输入后。

Combine 供给了两个能够在这里为咱们供给帮助的 Operator:防抖(Debounce)和节省(Throttle)。

debounce(for:scheduler:options:)

func debounce<S>(
    for dueTime: S.SchedulerTimeType.Stride,
    scheduler: S,
    options: S.SchedulerOptions? = nil
) -> Publishers.Debounce<Self, S> where S : Scheduler

debounce Operator 经过指定的时刻距离后才会发布值。用来对从上游 Publisher 传递值,和传递值之间的时刻做操控。此 Operator 可用于处理突发事情流或大量事情流,咱们需求将传递到下流的值的数量削减到指定的速率。

注释调之前的代码,包含咱们记时用的 Timer,增加以下代码:

example("debounce") {
    let subject = PassthroughSubject<Int, Never>()
    let debounced = subject.debounce(for: .seconds(0.5), scheduler: DispatchQueue.main)
    subject
        .sink {  print("subject: \($0)") }
        .store(in: &subscriptions)
    debounced
        .sink {  print("debounced: \($0)") }
        .store(in: &subscriptions)
    let bounces:[(Int,TimeInterval)] = [
        (0, 0),
        (1, 0.3),   // 0.3s interval since last index
        (2, 1),     // 0.7s interval since last index
        (3, 1.3),   // 0.3s interval since last index
        (4, 1.5),   // 0.2s interval since last index
        (5, 2.1)    // 0.6s interval since last index
    ]
    for bounce in bounces {
        DispatchQueue.main.asyncAfter(deadline: .now() + bounce.1) {
            subject.send(bounce.0)
        }
    }
}

咱们让 subject 每隔 bounces 元组的第二个 TimeInterval 值宣布元组的第二个 Int 值。

咱们运用 debounced 限定了抵达速率为 0.5。即值在 0.5 秒内,没有下一个值被宣布,则该值可传递到下流。

只要值 1、值 4、值 5 后 0.5 秒内,没有值宣布。因而运转 Playground,将输出:

--- debounce ---
subject: 0
subject: 1
debounced: 1
subject: 2
subject: 3
subject: 4
debounced: 4
subject: 5
debounced: 5

上述进程,咱们运用弹珠图来描绘:

debounced.png

throttle(for:scheduler:latest:)

func throttle<S>(
    for interval: S.SchedulerTimeType.Stride,
    scheduler: S,
    latest: Bool
) -> Publishers.Throttle<Self, S> where S : Scheduler

throttle 在指定的时刻距离内,发布由上游 Publisher 发布的第一个(latest 为 false)或最终一个(latest 为 true)值。

以上一个例子为根底,咱们在 Playground 中增加代码:

example("throttle") {
    let subject = PassthroughSubject<Int, Never>()
    let throttled = subject.throttle(for: .seconds(1), scheduler: DispatchQueue.main, latest: false)
    subject
        .sink {  print("subject: \($0)") }
        .store(in: &subscriptions)
    throttled
        .sink {  print("throttled: \($0)") }
        .store(in: &subscriptions)
    let values:[(Int,TimeInterval)] = [
        (0, 0),
        (1, 0.1),
        (2, 0.5),
        (3, 3.5),
        (4, 3.9),
        (5, 4.1),
        (6, 4.4),
    ]
    for bounce in bounces {
        DispatchQueue.main.asyncAfter(deadline: .now() + bounce.1) {
            subject.send(bounce.0)
        }
    }
}

因为异步,时刻距离过短或许卡时刻点,都会有不准确的问题。咱们放大了示例代码中的时刻。

咱们指定的时刻为 1 秒。throttled 直接发布 0,在 1 秒时,从 (0, 1) 秒区间找到并发布 1。后续因为超越了 1 秒,收到 3 就直接发布,在 (3.5, 4.5) 秒区间找到并发布 1:

--- throttle ---
subject: 0
throttled: 0
subject: 1
subject: 2
throttled: 1
subject: 3
throttled: 3
subject: 4
subject: 5
subject: 6
throttled: 4

上述进程用弹珠图来描绘为:

throttle.png

咱们将 latest 改为 true:

 let throttled = subject.throttle(for: .seconds(1), scheduler: DispatchQueue.main, latest: true)

则会输出每个区间中最新的值:

--- throttle ---
subject: 0
throttled: 0
subject: 1
subject: 2
throttled: 2
subject: 3
throttled: 3
subject: 4
subject: 5
subject: 6
throttled: 6

throttle2.png

现在咱们有了防抖动和节省的底子差异:

  • 防抖等候它接纳到的值的事情暂停,在指定的时刻距离后宣布最新的值。
  • 节省等候指定的时刻距离,然后宣布它在该时刻距离内收到的第一个或最新的值。

超时

timeout(_:scheduler:options:customError:)

func timeout<S>(
    _ interval: S.SchedulerTimeType.Stride,
    scheduler: S,
    options: S.SchedulerOptions? = nil,
    customError: (() -> Self.Failure)? = nil
) -> Publishers.Timeout<Self, S> where S : Scheduler

当超时触发时, Publisher 要不宣布 .finished ,要不宣布咱们指定的错误:

example("timeout") {
    enum TimeoutError: Error {
        case timedOut
    }
    let subject = PassthroughSubject<Void, TimeoutError>()
    let timedOutSubject1 = subject.timeout(.seconds(3), scheduler: DispatchQueue.main)
    let timedOutSubject2 = subject.timeout(.seconds(3), scheduler: DispatchQueue.main, customError: { .timedOut })
    timedOutSubject1
        .sink(
            receiveCompletion: { print("timedOutSubject1: \($0)") },
            receiveValue: {  print("timedOutSubject1: \($0)") }
        )
        .store(in: &subscriptions)
    timedOutSubject2
        .sink(
            receiveCompletion: { print("timedOutSubject2: \($0)") },
            receiveValue: {  print("timedOutSubject2: \($0)") }
        )
        .store(in: &subscriptions)
}

在上述代码中,timedOutSubject1 没有供给 customError 字段,而 timedOutSubject2 供给了。因而,当 subject 超越 3 秒未宣布事情,将触发超时:

--- timeout ---
timedOutSubject1: finished
timedOutSubject2: failure(Page_Contents.(unknown context at $10f5bfc44).(unknown context at $10f5bfc6c).(unknown context at $10f5bfcac).TimeoutError.timedOut)

timeout.png

时刻丈量

measureInterval(using:options:)

有时咱们需求找出 Publisher 宣布的两个连续值之间经过的时刻时,measureInterval Operator 是咱们的工具。

增加代码:

example("measureInterval") {
    let subject = PassthroughSubject<Int, Never>()
    let measureSubject = subject.measureInterval(using: DispatchQueue.main)
    subject.sink { print("emitted: \($0)") }
    .store(in: &subscriptions)
    measureSubject.sink { print("Measure emitted: \(Double($0.magnitude) / 1_000_000_000.0)") }
    .store(in: &subscriptions)
    let bounces:[(Int,TimeInterval)] = [
        (0, 0),
        (1, 0.1),
        (2, 0.5),
        (3, 3.5),
        (4, 3.9),
        (5, 4.1),
        (6, 4.4),
    ]
    for bounce in bounces {
        DispatchQueue.main.asyncAfter(deadline: .now() + bounce.1) {
            subject.send(bounce.0)
        }
    }
}

measureSubject 将宣布 subject 每次宣布的值距离上次宣布值的距离。

因为measureIntervalDispatchQueue 的情况下,TimeInterval 解释为:运用此类型的值创立的 DispatchTimeInterval,以纳秒为单位。因而进行了 Double($0.magnitude) / 1_000_000_000.0 转化:

--- measureInterval ---
Measure emitted: 0.010662459
emitted: 0
Measure emitted: 0.09147425
emitted: 1
Measure emitted: 0.421239375
emitted: 2
Measure emitted: 3.15006375
emitted: 3
Measure emitted: 0.4142835
emitted: 4
Measure emitted: 0.014388208
emitted: 5
Measure emitted: 0.517094042
emitted: 6

假如咱们进行以下更改,运用 RunLoop:

let measureSubject2 = subject.measureInterval(using: RunLoop.main)

则无需进行上述 Double($0.magnitude) / 1_000_000_000.0 的转化:

--- measureInterval ---
emitted: 0
Measure emitted: Stride(magnitude: 0.008463025093078613)
emitted: 1
Measure emitted: Stride(magnitude: 0.0957329273223877)
emitted: 2
Measure emitted: Stride(magnitude: 0.4204070568084717)
emitted: 3
Measure emitted: Stride(magnitude: 3.1296679973602295)
emitted: 4
Measure emitted: Stride(magnitude: 0.4410020112991333)
emitted: 5
Measure emitted: Stride(magnitude: 0.2083679437637329)
emitted: 6
Measure emitted: Stride(magnitude: 0.108254075050354)

序列操作符(Sequence Operators)

Publisher 自身便是序列。序列 Operator 与 Publisher 的值一同运用,就像 Array 或 Set 相同——当然,它们是有限序列。考虑到这一点,序列 Operator 主要将 Publisher 作为一个全体来处理,而不是像其他 Operator 那样处理单个值。此类中的许多 Operator 的名称和行为与 Swift 规范库中的对应办法几乎相同。

寻觅值

min() max()

func min() -> Publishers.Comparison<Self>
func max() -> Publishers.Comparison<Self>

min Operator 帮咱们找到 Publisher 宣布的最小值。 它是贪婪的,意味着必须等候 Publisher 发送一个完结事情后, Operator 的下流会宣布最小值:

min.png

上述弹珠图用代码表明为:

example("min") {
    let publisher = [1, -5, 10, 0].publisher
    publisher
        .print("publisher: ")
        .min()
        .sink(receiveCompletion: { print($0) }, 
              receiveValue: { print("Lowest value is \($0)") })
        .store(in: &subscriptions)
}

运转 Playground,将输出:

--- min ---
publisher: : receive subscription: ([1, -5, 10, 0])
publisher: : request unlimited
publisher: : receive value: (1)
publisher: : receive value: (-5)
publisher: : receive value: (10)
publisher: : receive value: (0)
publisher: : receive finished
Lowest value is -5
finished

将在 publisher 完结后,才会宣布最小值再完结。

Combine 知道这些数字中的哪一个是最小值,要归功于 Int 符合 Comparable 协议。咱们以在宣布 Comparable 类型的 Publisher 上直接运用 min() ,无需任何参数。

假如咱们的值不符合 Comparable ,咱们能够运用 min(by:) Operator 供给自己的比较闭包:

考虑以下示例,你的发布者宣布许多数据,而你期望找到最小的数据。

在以下代码示例中,咱们比较了 Data 的长度:

example("min non-Comparable") {
    let publisher =  ["12345",
                     "ab",
                     "!!@@##$$"]
        .map { Data($0.utf8) }
        .publisher
        .print("publisher")
        .min(by: { $0.count < $1.count })
        .sink(receiveCompletion: { print($0) }, 
              receiveValue: { data in
            let string = String(data: data, encoding: .utf8)!
            print("Smallest data is \(string), \(data.count) bytes")
        })
        .store(in: &subscriptions)
}

运转 Playground 将输出:

--- min non-Comparable ---
publisher: receive subscription: ([5 bytes, 2 bytes, 8 bytes])
publisher: request unlimited
publisher: receive value: (5 bytes)
publisher: receive value: (2 bytes)
publisher: receive value: (8 bytes)
publisher: receive finished
Smallest data is ab, 2 bytes
finished

max Operator 同理,不在进行赘述。

first() last()

func first() -> Publishers.First<Self>
func last() -> Publishers.Last<Self>

first 让第一个宣布的值经过然后完结。它是 lazy 的,这意味着它不会等候上游发布者完结,而是会在接纳到宣布的第一个值时撤销订阅。而 last 是贪婪的,需求在上游宣布完结事情后才会完结:

firstAndLast.png

上述弹珠图用代码表明为:

example("first and last") {
    let publisher = [1, 2, 3, 4].publisher
    publisher
        .print("first: ")
        .first()
        .sink(receiveCompletion: { print($0) }, 
            receiveValue: { print("First value is \($0)") })
        .store(in: &subscriptions)
    publisher
        .print("last: ")
        .last()
        .sink(receiveCompletion: { print($0) }, 
            receiveValue: { print("Last value is \($0)") })
        .store(in: &subscriptions)
}

运转 Playground 将输出:

--- first and last ---
first: : receive subscription: ([1, 2, 3, 4])
first: : request unlimited
first: : receive value: (1)
first: : receive cancel
First value is 1
finished
last: : receive subscription: ([1, 2, 3, 4])
last: : request unlimited
last: : receive value: (1)
last: : receive value: (2)
last: : receive value: (3)
last: : receive value: (4)
last: : receive finished
Last value is 4
finished

假如咱们需求更精细的操控,能够运用 first(where:)last(where:)。 假如有的话,它将宣布与供给的条件匹配的第一个、最终一个值:

修正上述代码:

example("first and last") {
    let publisher = [1, 2, 3, 4].publisher
    publisher
        .print("first: ")
        .first(where: { $0 % 2 == 0 })
        .sink(receiveCompletion: { print($0) }, 
            receiveValue: { print("First value is \($0)") })
        .store(in: &subscriptions)
    publisher
        .print("last: ")
        .last(where: { $0 % 3 == 0 })
        .sink(receiveCompletion: { print($0) }, 
            receiveValue: { print("Last value is \($0)") })
        .store(in: &subscriptions)
}

运转 Playground 将输出:

--- first and last ---
first: : receive subscription: ([1, 2, 3, 4])
first: : request unlimited
first: : receive value: (1)
first: : receive value: (2)
first: : receive cancel
First value is 2
finished
last: : receive subscription: ([1, 2, 3, 4])
last: : request unlimited
last: : receive value: (1)
last: : receive value: (2)
last: : receive value: (3)
last: : receive value: (4)
last: : receive finished
Last value is 3
finished

假如没有满意条件的值,下流也将直接纳到完结事情,不会有值宣布。

output(at:) output(in:)

output Operator 将查找上游发布者在指定索引处宣布的值:

output.png

上述弹珠图用代码表明为:

example("output") {
    let publisher = [0, 1, 2, 3, 4].publisher
    publisher
        .print("output at: ")
        .output(at: 1)
        .sink(receiveCompletion: { print($0) }, 
            receiveValue: { print($0) })
        .store(in: &subscriptions)
    publisher
        .print("output in: ")
        .output(in: 1...3)
        .sink(receiveCompletion: { print($0) }, 
            receiveValue: { print($0) })
        .store(in: &subscriptions)
}

运转 Playground 将输出:

--- output ---
output at: : receive subscription: ([0, 1, 2, 3, 4])
output at: : request unlimited
output at: : receive value: (0)
output at: : request max: (1) (synchronous)
output at: : receive value: (1)
1
output at: : receive cancel
finished
output in: : receive subscription: ([0, 1, 2, 3, 4])
output in: : request unlimited
output in: : receive value: (0)
output in: : request max: (1) (synchronous)
output in: : receive value: (1)
1
output in: : receive value: (2)
2
output in: : receive value: (3)
3
output in: : receive cancel
finished

该 Operator 会在收到所供给范围内的所有值后当即撤销订阅。

查询值

count()

func count() -> Publishers.Count<Self>

count Operator 宣布单个值, 一旦上游 Publisher 发布完结事情,Operator 将宣布接纳到的值的数量:

count.png

上述弹珠图用代码表明为:

example("count") {
    let publisher = [1, 2, 3, 4].publisher
    publisher
        .print("publisher")
        .count()
        .sink(receiveCompletion: { print($0) }, 
            receiveValue: { print($0) })
        .store(in: &subscriptions)
}

运转 Playground 将输出:

--- count ---
publisher: receive subscription: ([1, 2, 3, 4])
publisher: request unlimited
publisher: receive value: (1)
publisher: receive value: (2)
publisher: receive value: (3)
publisher: receive value: (4)
publisher: receive finished
4
finished

正如预期的那样,只要在上游 Publisher 发送完结事情后,才会打印出值 4。

contains(_: contains(where:)

func contains(_ output: Self.Output) -> Publishers.Contains<Self>
func contains(where predicate: @escaping (Self.Output) -> Bool) -> Publishers.ContainsWhere<Self>

假如上游 Publisher 宣布指定的值,则 contains 操作符将宣布 true 并当即撤销订阅,假如宣布的值都不等于指定的值,则回来 false:

example("contains") {
    let publisher = [1, 2, 3, 4].publisher
    publisher
        .print("publisher")
        .contains(3)
        .sink(receiveCompletion: { print($0) }, 
            receiveValue: { print($0) })
        .store(in: &subscriptions)
    publisher
        .print("publisher")
        .contains(where: { $0 % 5 == 0 })
        .sink(receiveCompletion: { print($0) }, 
            receiveValue: { print($0) })
        .store(in: &subscriptions)
}

运转 Playground 将输出:

--- contains ---
publisher: receive subscription: ([1, 2, 3, 4])
publisher: request unlimited
publisher: receive value: (1)
publisher: receive value: (2)
publisher: receive value: (3)
publisher: receive cancel
true
finished
publisher: receive subscription: ([1, 2, 3, 4])
publisher: request unlimited
publisher: receive value: (1)
publisher: receive value: (2)
publisher: receive value: (3)
publisher: receive value: (4)
publisher: receive finished
false
finished

allSatisfy(_:)

allSatisfy 承受一个闭包,宣布一个布尔值,指示上游 Publisher 宣布的所有值是否与条件匹配。它是贪婪的,若每个值都满意,会比及上游 Publisher 宣布完结完结事情,否则撤销订阅:

example("allSatisfy") {
    let publisher = stride(from: 0, to: 5, by: 2).publisher
    publisher
        .print("publisher")
        .allSatisfy { $0 % 2 == 0 }
        .sink(receiveCompletion: { print($0) }, 
            receiveValue: { allEven in
            print(allEven ? "All numbers are even" : "Something is odd...")
        })
        .store(in: &subscriptions)
    publisher
        .print("publisher")
        .allSatisfy { $0 % 3 == 0 }
        .sink(receiveCompletion: { print($0) }, 
            receiveValue: { print($0) })
        .store(in: &subscriptions)
}

运转 Playground 将输出:

--- allSatisfy ---
publisher: receive subscription: (Sequence)
publisher: request unlimited
publisher: receive value: (0)
publisher: receive value: (2)
publisher: receive value: (4)
publisher: receive finished
All numbers are even
finished
publisher: receive subscription: (Sequence)
publisher: request unlimited
publisher: receive value: (0)
publisher: receive value: (2)
publisher: receive cancel
false
finished

reduce(_:_:)

func reduce<T>(
    _ initialResult: T,
    _ nextPartialResult: @escaping (T, Self.Output) -> T
) -> Publishers.Reduce<Self, T>

reduce Operator 它不查找特定值或查询整个 Publisher。 它答应咱们依据上游 Publisher 的值,迭代累积一个新值:

reduce.png

咱们用代码描绘上述弹珠图:

example("reduce") {
    let publisher = ["He", "llo", " ", "Wo", "rld", "!"].publisher
    publisher
        .print("publisher")
        .reduce("") { accumulator, value in
            accumulator + value
        }
        .sink(receiveValue: { print("Reduced into: \($0)") })
        .store(in: &subscriptions)
}

在此代码中,咱们创立一个宣布六个字符串的 publisher。将 reduce 与空字符串一同运用,将宣布的值附加到它上面,创立最终的字符串结果:

--- reduce ---
publisher: receive subscription: (["He", "llo", " ", "Wo", "rld", "!"])
publisher: request unlimited
publisher: receive value: (He)
publisher: receive value: (llo)
publisher: receive value: ( )
publisher: receive value: (Wo)
publisher: receive value: (rld)
publisher: receive value: (!)
publisher: receive finished
Reduced into: Hello World!

reduce 的第二个参数是一个闭包,它承受两个某种类型的值并回来一个相同类型的值。在 Swift 中,+ 也是一个匹配该签名的函数,咱们完全能够改写代码:

example("reduce") {
    let publisher = ["He", "llo", " ", "Wo", "rld", "!"].publisher
    publisher
        .print("publisher")
        .reduce("", +)
        .sink(receiveValue: { print("Reduced into: \($0)") })
        .store(in: &subscriptions)
}

内容参考

  • Combine | Apple Developer Documentation;
  • 来自 Kodeco 的书本《Combine: Asynchronous Programming with Swift》;
  • 对上述 Kodeco 书本的汉语自译版 《Combine: Asynchronous Programming with Swift》 的收拾、修正、弥补更新。