前言

上一篇介绍了什么是ReactiveX,本篇首要介绍Observable。笔者以为把这些概念梳理清楚了,对于后边的运用会比较好上手,不至于一向处于懵懵的开发状况。

Observable

在ReactiveX中,调查者订阅Observable。然后调查者对Observable宣布的任何项目或项目序列做出反应。这种办法有助于兵书操作,因为它不需求阻塞等候Observable发射对象。而是以调查者的办法创建一个哨兵,不管在未来任何时候,这个哨兵随时预备着恰当的操作来呼应Observable。

这个页面解说了什么是呼应办法,什么是Observables和Observers(以及Observers是如何订阅Observables的)。其他页面显示了如何运用各种Observable操作符将Observable链接在一起并更改它们的行为。

本文档伴随着“大理石图”进行解说。以下是大理石图如何表明Observable和Observable的转化:

第二篇 ReactiveX之Observable

布景

在许多软件编程使命中,您或多或少地期望您编写的指令将按照您编写的次序以递加的办法履行和完结,一次一个。ReactiveX中,许多指令可能会并行履行,它们的成果随后会被“调查者”以任意次序捕获。与其调用办法,不如以“可调查”的办法界说一种检索和转化数据的机制,然后调查者订阅,在这一点上,先前界说的机制发动,调查者放哨,随时捕捉并呼应其排放物。
这种办法的一个优点是,当你有一堆彼此不依赖的使命时,你能够同时开端所有使命,而不是等候每一个使命完结后再开端下一个使命——这样,你的整个使命包只需求和使命包中最长的使命相同长的时刻就能够完结。
有许多术语用来描绘这种异步编程和规划的模型。本文档将运用以下术语:调查员订阅Observable。Observable经过调用调查者的办法来向其调查者宣布项目或发送通知。
在其他文档和其他上下文中,咱们称之为“调查者”的东西有时被称为“订阅者”、“调查器”或“反应器”。这种模型一般被称为”反应器办法“。

树立调查者

本页运用相似Groovy的伪代码作为示例,但有许多言语的ReactiveX完成。

在一个普通的办法调用中——也就是说,不是ReactiveX中典型的异步、并行调用——流是这样的:

  1. 调用一个办法。
  2. 将该办法的回来值存储在变量中。
  3. 运用该变量及其新值来做一些有用的工作。 或许,相似这样的工作:
// make the call, assign its return value to `returnVal`
returnVal = someMethod(itsParameters);
// do something useful with returnVal

在异步模型中,流程更像这样:

  1. 界说一个办法,用异步调用的回来值做一些有用的工作;这种办法是观测者的一部分。

  2. 将异步调用自身界说为Observable。

  3. 经过订阅Observable将调查者连接到该Observable(这也会发动Observable的操作)。

  4. 继续你的事业;每当调用回来时,调查者的办法将开端对其回来值进行操作,即Observable宣布的项。

它看起来像这样:

// defines, but does not invoke, the Subscriber's onNext handler
// (in this example, the observer is very simple and has only an onNext handler)
def myOnNext = { it -> do something useful with it };
// defines, but does not invoke, the Observable
def myObservable = someObservable(itsParameters);
// subscribes the Subscriber to the Observable, and invokes the Observable
myObservable.subscribe(myOnNext);
// go on about my business

onNext, onCompleted, and onError

Subscribe是将调查者连接到Observable的办法。您的调查者完成了以下办法的一些子集:
onNext
每当Observable宣布项目时,Observable都会调用此办法。此办法将Observable宣布的项作为参数。
onError
Observable调用此办法以指示它未能生成预期的数据或遇到了其他过错。它不会再调用onNext或onCompleted。onError办法将导致过错的原因作为其参数。
onCompleted
Observable在最后一次调用onNext之后调用这个办法,假如它没有遇到任何过错的话。

根据Observable合约的条款,它能够调用onNext零次或多次,然后能够在这些调用之后调用onCompleted或onError,但不能同时调用两者,这将是它的最后一次调用。按照惯例,在本文档中,对onNext的调用一般被称为项目的“排放”,而对onCompleted或onError的调用被称为“通知”。
一个更完好的订阅调用示例如下所示:

def myOnNext     = { item -> /* do something useful with item */ };
def myError      = { throwable -> /* react sensibly to a failed call */ };
def myComplete   = { /* clean up after the final response */ };
def myObservable = someMethod(itsParameters);
myObservable.subscribe(myOnNext, myError, myComplete);
// go on about my business

撤销订阅

在一些ReactiveX完成中,有一个专门的调查者接口Subscriber,它完成了一个撤销订阅办法。您能够调用此办法来指示订阅服务器不再对其当前订阅的任何Observable感兴趣。然后,这些Observable能够(假如它们没有其他感兴趣的调查者)挑选停止生成要宣布的新项目。

撤销订阅的成果将经过运用于观测者订阅的Observable的运算符链级联回来,这将导致链中的每个链接停止发射项目。但是,这并不能保证立即发生,即便在没有调查员留下来调查这些排放物之后,Observable也有可能在一段时刻内发生并试图排放这些物品。

关于命名约好的几点注记

ReactiveX的每种特定于言语的完成都有自己的命名古怪。虽然完成之间有许多共性,但没有标准的命名标准。

此外,其间一些称号在其他上下文中有不同的含义,或许在特定完成言语的习语中显得很为难。

例如,有onEvent命名办法(例如onNext、onCompleted、onError)。在某些上下文中,这样的称号将指示用于注册事情处理程序的办法。但是,在ReactiveX中,它们自己命名事情处理程序。

“热”和“冷”Observables

“热”和“冷”Observable何时开端发射其项目序列?这取决于Observable。“热门”Observable可能会在创建后立即开端发射项目,因而任何后来订阅该Observable的调查者都可能会在在中心的某个位置开端调查序列。另一方面,一个“冷”的Observable在开端发射项目之前要等候,直到调查者订阅它,因而这样的调查者能够保证从一开端就看到整个序列。

在ReactiveX的一些完成中,还有一种称为“可连接”的Observable。这样的Observable在调用其Connect办法之前不会开端宣布项,无论是否有任何调查者订阅了它。

经过Observable操作符进行组合

Observables和observers仅仅ReactiveX的开端。就其自身而言,它们只不过是标准调查器办法的一个轻微扩展,更适合处理一系列事情,而不是单个回调。

真实的力量来自“呼应式扩展”(因而是“ReactiveX”),这些操作符答应您转化、组合、操纵和处理Observables发射的项目序列。

这些Rx运算符答应您以声明的办法将异步序列组合在一起,具有回调的所有功率优势,没有一般与异步系统相关联的嵌套回调处理程序的缺点。

本文档将有关各种运算符及其用法示例的信息分组到以下页面中:

  • Creating Observables Create,Defer,Empty/Never/Throw,From,Interval,Just,Range,Repeat,Start, andTimer

  • Transforming Observable Items Buffer,FlatMap,GroupBy,Map,Scan, andWindow

  • Filtering Observables Debounce,Distinct,ElementAt,Filter,First,IgnoreElements,Last,Sample,Skip,SkipLast,Take, andTakeLast

  • Combining Observables And/Then/When,CombineLatest,Join,Merge,StartWith,Switch, andZip

  • Error Handling Operators
    CatchandRetry

  • Utility Operators Delay,Do,Materialize/Dematerialize,ObserveOn,Serialize,Subscribe,SubscribeOn,TimeInterval,Timeout,Timestamp, andUsing

  • Conditional and Boolean Operators All,Amb,Contains,DefaultIfEmpty,SequenceEqual,SkipUntil,SkipWhile,TakeUntil, andTakeWhile

  • Mathematical and Aggregate Operators Average,Concat,Count,Max,Min,Reduce, andSum

  • Converting Observables To

  • Connectable Observable Operators Connect,Publish,RefCount, andReplay

  • Backpressure Operators
    履行特定流量操控策略的各种运算符

这些页面包括一些运算符的信息,这些运算符不是ReactiveX中心的一部分,但在一个或多个特定于言语的完成和/或可选模块中完成。

链式运算符(操作符)

大多数运算符对Observable进行操作并回来Observable。这答应您在链中一个接一个地运用这些运算符。链中的每个操作符都会修改上一个操作符的操作所发生的Observable。

还有其他办法,如Builder办法,其间特定类的各种办法经过对该办法的操作来修改该对象,从而对同一类的项进行操作。这些办法还答应您以相似的办法链接办法。但是,在Builder办法中,办法在链中出现的次序一般并不重要,而Observable运算符的次序很重要。

Observable运算符链并不独立于建议该链的原始Observable进行操作,但它们顺次进行操作,每个运算符都对链中前一个运算符生成的Observable履行操作。

总结

读完本篇,信任小伙伴们应该对Observable和observers的概念比较清楚了,一个是能够被调查事物,一个是对该事物进行了订阅(即调查),订阅的时候需求提供一些呼应的办法。

有关于Operators的东西,咱们在rxwift的运用中再介绍,因为内容太多了,在reactiveX里边无法介绍。