【程序系列翻译】ReactiveX

【程序系列翻译】ReactiveX - Observable

ReactiveX系列

ReactiveX 是http://reactivex.io提供的响应式框架,采用观察者模式+链式api构成,目前支持语言如下:

【程序系列翻译】ReactiveX - Observable

ReactiveX系列翻译,仅翻译通用篇,共计5篇内容:

  1. Observable
  2. Operators
  3. Single
  4. Subject
  5. Scheduler

只有理解了这五大内容,学习起来才会有目标和场景。以后不管你去使用RxJava,还是RxSwift,中心思想不会跳出这些内容。

本文是第一篇:可观察

简述了可观察的内容是什么,以及观察者模式中的流程

正文

在ReactiveX中,一个观察者对象订阅在可观察对象。然后,可观察对象发出任何的项目或者项目序列,观察者对象都会做出响应。因为在等待

可观察对象发出消息的时候不会阻塞当前任何操作,所以这个模式适合并发操作,且观察者对象会创建一个类似哨兵的监听,随时对可观察对象所做的任何事情做出相应。

本文解释了反应模式是什么以及Observables和观察者是什么(观察者如何订阅Observables)。

下面这张图很直观的说明了反应模式,如何表示Observables的Observables和转换:

【程序系列翻译】ReactiveX - Observable


后台支持

在你编写程序的任务时,你或多或少希望你的任务是逐渐递增的,同一时间仅执行一个。但是在ReactiveX中,许多任务是可以并行的,他们没有任何执行顺序,而是通过观察者获取结果而不是一个方法。您定义了一种检索和转换数据的机制,以“Observable”的形式,然后订阅观察者。此时,先前定义的机制开始发挥作用,观察员站在岗哨,以便在准备就绪时捕获并响应其排放。

这种方法的一个优点是,当你有一堆不依赖于彼此的任务时,你可以同时启动所有任务,而不是等到每个任务完成后再开始下一个。这样,你的整个捆绑任务只需要与捆绑中最长的任务一样长。

有许多术语用于描述这种异步编程和设计模型。本文将使用以下术语:观察者订阅Observable。Observable通过调用观察者的方法发出项目或向其观察者发送通知。

在其他文件和其他背景下,我们所谓的“观察者”有时被称为“用户”,“观察者”或“反应堆”。这种模型通常被称为“反应堆模式”。

示例:建立观察者

此页面使用类似Groovy的伪代码作为示例,但在许多语言中都有ReactiveX实现。

在普通的方法调用中 - 也就是说,不是ReactiveX中典型的异步并行调用 - 流程就是这样的:

  1. 调用一个方法
  2. 将该方法的返回值存储在变量中
  3. 使用该变量及其新值来做一些有用的事情

比如:

【程序系列翻译】ReactiveX - Observable

而在异步模型中,流程更像是这样:

  1. 定义一个方法,该方法对异步调用的返回值执行一些有用的操作;这种方法是观察者的一部分
  2. 将异步调用本身定义为Observable
  3. 通过订阅将观察者附加到该Observable(这也启动了Observable的操作)
  4. 继续你的事业,每当调用返回时,观察者的方法将开始对其返回值或值进行操作 - Observable发出的结果

比如:

【程序系列翻译】ReactiveX - Observable

onNext, onCompleted, onError

这是3个订阅反馈方法。Subscribe方法是将观察者连接到Observable的方法。

  • onNext:只要Observable发出一个项目,Observable就会调用此方法。此方法将Observable发出的项作为参数
  • onError:Observable调用此方法以指示它无法生成预期数据或遇到其他一些错误。它不会进一步调用onNext或onCompleted。onError方法将参数指示导致错误的原因。
  • onCompleted:如果Observable在最后一次调用onNext,如果它没有遇到任何错误,则调用此方法

按照Observable的接口规范,onNext可以调用任意次数(包括0次),在最后一次调用(结束的时候),onCompleted及onError仅可调用其一。按照文档的定义,我们将onNext称作“排放”,onCompleted及onError的调用称作“通知”。

更完整的订阅调用示例如下所示:

【程序系列翻译】ReactiveX - Observable

退订

在很多语言实现中,存在一个特殊的观察者接口Subscriber,他实现了unsubscribe方法。你可以调用这个方法来指示订阅服务器不再对它当前订阅的任何Observable感兴趣。那些Observable可以(如果他们没有其他感兴趣的观察者)选择停止生成要发出的新项目。

此取消订阅的结果将通过适用于观察者订阅的Observable的运算符链级联回来,这将导致链中的每个链接停止发射项目,这不能保证立即发生,然而,即使在没有观察者观察这些排放之后,Observable也有可能产生并尝试发射物品一段时间。

命名约束

每种语言实现ReactiveX,都有自己的命名喜好。虽然实现之间存在许多共性,但没有规范的命名标准。

此外,其中一些名称在其他情况下具有不同的含义,或者在特定实现语言的习语中看起来很尴尬。

比如,onEvent命名(通用: onNext, onCompleted, onError

)。在某些上下文中,这些名称将指示通过哪些方法注册事件处理程序。但是,在ReactiveX中,它们自己命名事件处理程序。

“热”和“冷”观察者

Observable何时开始发出任务序列?这取决于Observable。

“热”Observable可以在创建项目后立即开始发出项目,因此任何后来订阅该Observable的观察者都可以开始在中间某处观察序列。

另一方面,“冷”Observable等待观察者在开始发射物品之前订阅它,因此这样的观察者保证从一开始就看到整个序列。

在ReactiveX的一些实现中,还存在称为“可连接”可观察的东西。

这样的Observable在调用Connect方法之前不会开始发出项目,无论是否有任何观察者都订阅了它。

Observable运算符

Observable和观察者只是ReactiveX的开始。它们本身只不过是标准观察者模式的轻微扩展,更适合处理一系列事件而不是单个回调。

真正的力量来自“反应式扩展”(因此是“ReactiveX”) - 允许您转换,组合,操作和处理Observables发出的项目序列的运算符。

这些Rx运算符允许您以声明方式组合异步序列,同时具有回调的所有效率优势,但没有嵌套回调处理程序的缺点,这些回调处理程序通常与异步系统相关联。

ReactiveX包含的运算符如下:

  • 创建Observable:
  • 转换Observable项:
  • 过滤Observable:
  • 合并Observable:
  • 错误处理:
  • 公用工具:
  • 条件和布尔:
  • 数学和集合:
  • 转化Observable:
  • 连接Observable:
  • 背压:

这些运算符不属于ReactiveX的核心,而是在一个或多个特定于语言的实现和/或可选模块中实现的。

链式调用

大多数运算符都在Observable上运行并返回一个Observable。这允许您在链中一个接一个地应用这些运算符。链中的每个运算符都会修改由前一个运算符的运算产生的Observable。

当然,还有其他模式,如Builder模式,其中特定类的各种方法通过方法的操作修改该对象,对同一类的项进行操作。这些模式还允许您以类似的方式链接方法。但是在Builder模式中,方法在链中出现的顺序通常并不重要,Observable运算符命令事项。

链式Observable运算符不能在原始的Observable上独立运行,它发起链,但它们依次运行,每个运算符都由运算符在链中的前一个运算符生成。


分享到:


相關文章: