‍介绍

关于 RxJS,多播可能是一个比较高级的概念。

这篇文章会经过一个比方,逐步的介绍多播以及多播的操作符。

Hot & cold Observables

默认情况下,大多数 Observables 都归于 Cold Observables ,每次咱们订阅 Cold Observable 时,都会从头创立它的生产者。那这是什么意思?

首要,咱们必须知道生产者是什么:它能够是咱们 Observable 值的来历,它页能够是 DOM 事件、回调、HTTP 恳求、迭代器等,总之,任何能够产生价值并将其传递给观察者的东西。

现在咱们知道了生产者是什么,就更容易理解前面语句的含义了,它基本上是说咱们的 Observable 的生产者在每次订阅时都会被一遍又一遍地创立。让咱们看一个比方:

import { timer } from "rxjs";
import { tap } from "rxjs/operators";
const cold$ = timer(1000).pipe(tap(() => console.log("副作用")));
cold$.subscribe(val => console.log(`Observer 1: ${val}`));
cold$.subscribe(val => console.log(`Observer 2: ${val}`));
// 自这个 Cold Observable 发动,副作用就履行两次,一次是给 subscription
/* Output:
副作用,
Observer 1: 0,
副作用, 
Observer 2: 0
*/

如你所见,由于咱们的 ObservableCold Observable ,而且它的生产者在每次订阅时都被从头创立,所以每次订阅就履行一次,副作用一共履行了两次

假如 Observable 是热的,不论咱们订阅了多少次,副作用将只履行一次

举个比方来说,假定咱们有一个 Ajax Observable,它能够获取一些数据。由于 Ajax ObservableCold Observable,每次订阅它时,都会宣布一个新的 HTTP 恳求。

20 个订阅 = 20 个 HTTP 恳求。比方下面这样:

import { ajax } from "rxjs/ajax";
import { map, mergeAll, take, tap } from "rxjs/operators";
// 一个 Ajax(cold Observable)
const sharedGhibliFilm$ = ajax
  .getJSON("https://ghibliapi.herokuapp.com/films")
  .pipe(
    tap(() => console.log("我恳求啦")),
    mergeAll(),
    take(1)
  );
// 两次 subscribe
sharedGhibliFilm$
  .pipe(map(({ title }) => title))
  .subscribe(title => console.log(`标题: ${title}`));
sharedGhibliFilm$
  .pipe(map(({ description }) => description))
  .subscribe(description => console.log(`描绘: ${description}`));
// ... 两次恳求
/* Output:
'我恳求啦',
'标题: Castle in the Sky',
'我恳求啦',
'描绘: The orphan Sheeta inherited a mysterious crystal that...'
*/

怎么才能够正确处理 热/冷 Observables 就变得很重要。

咱们不想每次订阅都从头履行一遍,那就得将这个 Cold Observables 变成 Hot Observables。运用 RxJS 中的多播就能够让这件事变得很便利。

☎️多播 (Multicast)

多播 经过运用 Subject 同享源Observable。让咱们看一个运用多播的比方:

import { interval, Subject } from "rxjs";
import { take, tap, multicast } from "rxjs/operators";
const number$ = interval(1000).pipe(take(10));
const multicast$ = number$.pipe(
  tap(() => console.log("履行了")),
  multicast(() => new Subject())
);
multicast$.subscribe(console.log);
// Output:

假如你将这段代码跑起来,你会发现它没有任何输出,这是为什么?

由于 multicast 回来一种特殊的 ObservableConnectableObservable

这种特殊类型的 Observable 有一个connect()办法,当被调用时,它负责运用咱们提供的 Subject 订阅源 Observable

这意味着假如咱们不调用connect(),流永久不会被订阅,也不会不会开端发射值。

因此,让咱们对之前的代码添加并调用connect()

import { ConnectableObservable, interval, Subject } from "rxjs";
import { take, tap, multicast } from "rxjs/operators";
const number$ = interval(1000).pipe(take(2));
const multicast$ = number$.pipe(
  tap(() => console.log("履行了")),
  multicast(() => new Subject())
) as ConnectableObservable<number>;
// number$不会发射值,直到调用 connect
multicast$.connect();
multicast$.subscribe(val => console.log(`Observer 1: ${val}`));
multicast$.subscribe(val => console.log(`Observer 2: ${val}`));
/* Output:
履行了,
Observer 1: 0,
Observer 2: 0
(1s)
履行了,
Observer 1: 1,
Observer 2: 1
*/

现在,能够看到有输出值了。由于multicast同享源 Observable,副作用只会履行一次,即便咱们订阅了很多次。

撤销订阅

与所有 Observable 相同,撤销订阅咱们的多播 Observable 是为了防止内存走漏。在处理回来 ConnectableObservable 的多播操作符时,需求撤销对多播的订阅。

根据上面的代码片段,take(2)履行两次,并撤销订阅:

import { ConnectableObservable, interval, Subject, timer } from "rxjs";
import { tap, multicast } from "rxjs/operators";
const number$ = interval(1000);
const multicast$ = number$.pipe(
  tap(() => console.log("履行了")),
  multicast(() => new Subject())
) as ConnectableObservable<number>;
const connectSubscription = multicast$.connect();
multicast$.subscribe(val => console.log(`Observer 1: ${val}`));
multicast$.subscribe(val => console.log(`Observer 2: ${val}`));
// 2s后撤销订阅
timer(2000)
  .pipe(tap(() => connectSubscription.unsubscribe()))
  .subscribe();

这能够让咱们防止内存走漏!

订阅者不一起回来怎么办?

上面的代码咱们看到订阅后的输出是一起产生的。

然后,现实情况还有有些不同,比方下面这样,一个流一起订阅两次,第二个流会在两秒后产生:

import { ConnectableObservable, interval, Subject, timer } from "rxjs";
import { take, tap, multicast } from "rxjs/operators";
const number$ = interval(1000).pipe(take(2));
// 经过多播,咱们能够共享 number$ Observable
const multicasted$ = number$.pipe(
  tap(_ => console.log("我履行了")),
  multicast(() => new Subject())
) as ConnectableObservable<number>;
multicasted$.connect();
multicasted$.subscribe(val => console.log(`Observer 1: ${val}`));
// 2s后不会接收到值,由于值现已发射过了
timer(2000)
  .pipe(
    tap(() =>
      multicasted$.subscribe(val => console.log(`Late observer: ${val}`))
    )
  )
  .subscribe();
/* Output:
我履行了,
Observer 1: 0,
(1s)
我履行了,
Observer 1: 1,
Late observer: 1
*/

咱们能够看到,第二个流回来的值好像没有从 0 开端,而是直接回来了 1。那么该怎么处理这个问题呢?

咱们所要做的便是将 Subject 替换为 ReplaySubject

由于 ReplaySubjects 会向新订阅者从头发送旧值,就能够处理掉上面那个问题,具体参阅下面这段代码:

import { ConnectableObservable, interval, ReplaySubject, timer } from "rxjs";
import { take, tap, multicast } from "rxjs/operators";
const number$ = interval(1000).pipe(take(2));
const multicasted$ = number$.pipe(
  tap(_ => console.log("我履行了")),
  // 运用 ReplaySubject
  multicast(() => new ReplaySubject())
) as ConnectableObservable<number>;
multicasted$.connect();
multicasted$.subscribe(val => console.log(`Observer 1: ${val}`));
// 由于运用了ReplaySubject,2s后会接收到现已发射的值
timer(2000)
  .pipe(
    tap(() =>
      multicasted$.subscribe(val => console.log(`Late observer: ${val}`))
    )
  )
  .subscribe();
/* Output:
我履行了,
Observer 1: 0,
(1s)
Late observer: 0,
我履行了,
Observer 1: 1,
Late observer: 1
*/

能够看到,第二个流尽管拖延开端,但是也是从 0 开端宣布了值。

publish()

咱们再回到运用 multicast(() => new Subject()) 这块。

RxJS 中能够运用 publish 对其简化,也便是说 publish 相当于 multicast(() => new Subject())

看下面这个比方:

import { ConnectableObservable, interval, timer } from "rxjs";
import { publish, tap, take } from "rxjs/operators";
const number$ = interval(1000).pipe(take(4));
const multicast$ = number$.pipe(
  tap(() => console.log("我履行了")),
  // 运用 publish
  publish()
) as ConnectableObservable<number>;
// 履行 connect
multicast$.connect();
multicast$.subscribe(val => console.log(`Observer 1: ${val}`));
multicast$.subscribe(val => console.log(`Observer 2: ${val}`));
/* Output:
我履行了,
Observer 1: 0, 
Observer 2: 0,
(1s)
我履行了,
Observer 1: 1,
Observer 2: 1...
*/

当然,在这里,咱们假如想要能够正确的输出值,也得 connect() 它。

publishReplay()

既然 pulish()multicast(() => new Subject()) 的简写。

那同理,publishReplay() 便是 multicast(() => new ReplaySubject()) 的简写。

publishReplay() 也便是答应把旧的值发送给新的订阅者。看下面这个比方:

import { ConnectableObservable, interval, timer } from "rxjs";
import { publishReplay, take, tap } from "rxjs/operators";
const number$ = interval(1000).pipe(take(3));
const multicast$ = number$.pipe(
  tap(() => console.log("我履行了")),
  publishReplay()
) as ConnectableObservable<number>;
// 调用connect
multicast$.connect();
multicast$.subscribe(val => console.log(`Observer 1: ${val}`));
timer(2000)
  .pipe(
    tap(() =>
      multicast$.subscribe(val => console.log(`Late observer: ${val}`))
    )
  )
  .subscribe();
/* Output:
  我履行了,
  Observer 1: 0,
  (1s)
  我履行了,
  Observer 1: 1,
  Late observer: 0,
  Late observer: 1,
  (1s) 
  我履行了,
  Observer 1: 2
  Late observer: 2
*/

publishLast()

publishLast相当于multicast(() => new AsyncSubject())

它会比及源 Observable 完结然后宣布最终一个值,参阅下面这个比方:

import { ConnectableObservable, interval } from "rxjs";
import { publishLast, take } from "rxjs/operators";
const number$ = interval(1000).pipe(take(4));
const multicast$ = number$.pipe(publishLast()) as ConnectableObservable<
  number
>;
// 调用 connect
multicast$.connect();
multicast$.subscribe(val => console.log(`Observer 1: ${val}`));
multicast$.subscribe(val => console.log(`Observer 2: ${val}`));
/* Output:
(4s)
Observer 1: 3, 
Observer 2: 3
*/

publishBehavior()

publishBehavior相当于multicast(() => new BehaviorSubject())

由于它运用BehaviorSubject,所以publishBehavior咱们能够指定一个初始值,像下面这样:

import { ConnectableObservable, interval } from "rxjs";
import { publishBehavior, take } from "rxjs/operators";
const number$ = interval(1000).pipe(take(2));
// 提供初始值 -1
const multicast$ = number$.pipe(publishBehavior(-1)) as ConnectableObservable<
  number
>;
// 调用 connect
multicast$.connect();
multicast$.subscribe(val => console.log(`Observer 1: ${val}`));
multicast$.subscribe(val => console.log(`Observer 2: ${val}`));
/* Output:
Observer 1: -1, 
Observer 2: -1,
(1s)
Observer 1: 0,  
Observer 2: 0,
(1s)
Observer 1: 1, 
Observer 2: 1
*/

能够看到输出值从 -1 开端输出。

refCount()

但是到现在为止,还是在用 connect() 办法树立连接,会很容易忘掉调用,有没有更简便的办法?

RxJS 当然有,那便是 refCoun() 办法,refCount 负责在内部统计订阅的数量,它处理了两件很重要的事情:

  • 假如订阅的数量大于等于 1,refCount 会仅订阅一次源,并调用 connect()
  • 假如订阅的数量小于1,也便是没有任何订阅者,refCount 则会从源撤销订阅

咱们应用 refCount 看看:

import { interval, Subject, timer } from "rxjs";
import { take, takeUntil, tap, multicast, refCount } from "rxjs/operators";
const number$ = interval(1000).pipe(take(10));
// 咱们运用 takeUntil + Subject 而不是 unsubscribing
const stop$ = new Subject();
const multicast$ = number$.pipe(
  tap(() => console.log("我调用了")),
  multicast(() => new Subject()),
  // 经过运用 refCount,咱们不再需求手动调用 connect()
  refCount(),
  takeUntil(stop$)
);
// refCount === 1, number$ 开端订阅
multicast$.subscribe(val => console.log(`Observer 1: ${val}`));
// refCount === 2
multicast$.subscribe(val => console.log(`Observer 2: ${val}`));
// refCount === 0, number$ 撤销订阅
timer(2000)
  .pipe(
    tap(() => stop$.next()),
    tap(() => console.log("The end"))
  )
  .subscribe();
/* Output:
  (1s)
  我调用了,
  Observer 1: 0,
  Observer 2: 0,
  (1s) 
  我调用了,
  Observer 1: 1,
  Observer 2: 1,
  The end
 */

所以,refCount它负责调用connect()和撤销订阅源 Observable。

share()

share() 运算符相当于 multicast(() => new Subject())+refCount ,它也是最常用的多播运算符。

咱们运用 share() 来改写上面的示例:

import { interval } from "rxjs";
import { take, tap, share } from "rxjs/operators";
const number$ = interval(1000).pipe(take(2));
const share$ = number$.pipe(
  tap(() => console.log("我调用了")),
  share()
);
share$.subscribe(val => console.log(`Observer 1: ${val}`));
share$.subscribe(val => console.log(`Observer 2: ${val}`));
/* Output:
我调用了,
Observer 1: 0,
Observer 2: 0,
(1s)
我调用了,
Observer 1: 1, 
Observer 2: 1
*/

咱们运用更实际一点的一个比方,示例如下:

import { ajax } from "rxjs/ajax";
import { map, mergeAll, take, tap, share } from "rxjs/operators";
//  源流调用 share 办法让其成为 hot Observable 
const sharedGhibliFilm$ = ajax
  .getJSON("https://ghibliapi.herokuapp.com/films")
  .pipe(
    tap(() => console.log("我调用了")),
    mergeAll(),
    take(1),
    share()
  );
// 两个订阅
sharedGhibliFilm$
  .pipe(map(({ title }) => title))
  .subscribe(title => console.log(`Title: ${title}`));
sharedGhibliFilm$
  .pipe(map(({ description }) => description))
  .subscribe(description => console.log(`Description: ${description}`));
/* Output:
'我调用了',
'Title: Castle in the Sky',
'Description: The orphan Sheeta inherited a mysterious crystal that...'
*/

shareReplay()

很容易猜到,shareReplay 能够记住前史的订阅者,也便是答应把旧的值发送给新的订阅者。

shareReplay 等同于multicast(() => new ReplaySubject())+refCount

这是一个比方:

import { interval, timer } from "rxjs";
import { shareReplay, take, tap } from "rxjs/operators";
const number$ = interval(1000).pipe(take(3));
// 运用 shareReplay,让其成为 hot Observable 
const multicasted$ = number$.pipe(
  tap(() => console.log("我调用了")),
  shareReplay()
);
multicasted$.subscribe(val => console.log(`Observer 1: ${val}`));
// 2s后履行
timer(2000)
  .pipe(
    tap(() =>
      multicasted$.subscribe(val => console.log(`Late observer: ${val}`))
    )
  )
  .subscribe();
// 咱们能够接收到旧的值
/* Output:
  我调用了,
  Observer 1: 0,
  (1s)
  我调用了,
  Observer 1: 1,
  Late observer: 0,
  Late observer: 1,
  (1s) 
  我调用了,
  Observer 1: 2
  Late observer: 2
*/

总结

  • publish相当于multicast(() => new Subject()).
  • publishBehavior相当于multicast(() => new BehaviorSubject()).
  • publishLast相当于multicast(() => new AsyncSubject()).
  • publishReplay相当于multicast(() => new ReplaySubject()).
  • 有了refCount,咱们不再需求手动调用,connect()也不再需求处理退订。
  • share等同于multicast(() => new Subject())refCount()
  • shareReplay等同于multicast(() => new ReplaySubject())refCount()

这便是 RxJS 中的全部内容啦,希望对你有帮助~

转载需注明作者及来历喔~