0%

The Basic Concepts of RxJava

RxJava 到底是什么? 一个词:异步。

RxJava 在 GitHub 主页上的自我介绍是 “a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。

RxJava 好在哪? 一个词:简洁。

给 Android 开发者的 RxJava 详解

RxJava 的本质可以压缩为异步这一个词。说到根上,它就是一个实现异步操作的库,而别的定语都是基于这之上的。

异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。 Android 创造的 AsyncTask 和Handler ,其实都是为了让异步代码更加简洁。RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁。

RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。

观察者模式

观察者模式面向的需求是:A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应。举个例子,新闻里喜闻乐见的警察抓小偷,警察需要在小偷伸手作案的时候实施抓捕。在这个例子里,警察是观察者,小偷是被观察者,警察需要时刻盯着小偷的一举一动,才能保证不会漏过任何瞬间。程序的观察者模式和这种真正的『观察』略有不同,观察者不需要时刻盯着被观察者(例如 A 不需要每过 2ms 就检查一次 B 的状态),而是采用注册(Register)或者称为订阅(Subscribe)的方式,告诉被观察者:我需要你的某某状态,你要在它变化的时候通知我。 Android 开发中一个比较典型的例子是点击监听器 OnClickListener 。对设置 OnClickListener 来说, View 是被观察者, OnClickListener 是观察者,二者通过 setOnClickListener() 方法达成订阅关系。订阅之后用户点击按钮的瞬间,Android Framework 就会将点击事件发送给已经注册的 OnClickListener 。采取这样被动的观察方式,既省去了反复检索状态的资源消耗,也能够得到最高的反馈速度。当然,这也得益于我们可以随意定制自己程序中的观察者和被观察者,而警察叔叔明显无法要求小偷『你在作案的时候务必通知我』。

OnClickListener 的模式大致如下图:

如图所示,通过 setOnClickListener() 方法,Button 持有 OnClickListener 的引用(这一过程没有在图上画出);当用户点击时,Button 自动调用 OnClickListener 的 onClick() 方法。另外,如果把这张图中的概念抽象出来(Button -> 被观察者、OnClickListener -> 观察者、setOnClickListener() -> 订阅,onClick() -> 事件),就由专用的观察者模式(例如只用于监听控件点击)转变成了通用的观察者模式。如下图:

RxJava 的观察者模式

RxJava 有四个基本概念:

  1. Observable (可观察者,即被观察者)
  2. Observer (观察者)
  3. subscribe (订阅)
  4. 事件

Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。

  1. onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。

  2. onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。

在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

RxJava 的观察者模式大致如下图:

基本实现

基于以上的概念, RxJava 的基本实现主要有三点:

1) 创建 Observer

Observer 即观察者,它决定事件触发的时候将有怎样的行为。 RxJava 中的 Observer 接口的实现方式:

除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。 Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的:

不仅基本使用方式一样,实质上,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。所以如果你只想使用基本功能,选择 Observer 和 Subscriber 是完全一样的。它们的区别对于使用者来说主要有两点:

  1. onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法,具体可以在后面的文中看到。

  2. unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态。 unsubscribe() 这个方法很重要,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。

2) 创建 Observable

Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件。 RxJava 使用 create() 方法来创建一个 Observable ,并为它定义事件触发规则:

深入浅出RxJava

RxJava最核心的两个东西是Observables(被观察者,事件源)和Subscribers(观察者)。Observables发出一系列事件,Subscribers处理这些事件。这里的事件可以是任何你感兴趣的东西(触摸事件,web接口调用返回的数据…)一个Observable可以发出零个或者多个事件,知道结束或者出错。每发出一个事件,就会调用它的Subscriber的onNext方法,最后调用Subscriber.onNext()或者Subscriber.onError()结束。
Rxjava的看起来很想设计模式中的观察者模式,但是有一点明显不同,那就是如果一个Observerble没有任何的的Subscriber,那么这个Observable是不会发出任何事件的。

Learning RxJava 2 for Android by example

RxJava 2.0 has been completely rewritten from scratch on top of the Reactive-Streams specification. The specification itself has evolved out of RxJava 1.x and provides a common baseline for reactive systems and libraries.

Because Reactive-Streams has a different architecture, it mandates changes to some well known RxJava types.

RxJava1 -> RxJava2

Quick Look on few changes done in RxJava2 over RxJava1

  1. onCompleted -> onComplete - without the trailing d
  2. Func1 -> Function
  3. Func2 -> BiFunction
  4. CompositeSubscription -> CompositeDisposable
  5. limit operator has been removed - Use take in RxJava2
    and much more.

Operators :

Map -> transform the items emitted by an Observable by applying a function to each item

Zip -> combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function

Filter -> emit only those items from an Observable that pass a predicate test

FlatMap -> transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable

Take -> emit only the first n items emitted by an Observable

Reduce -> apply a function to each item emitted by an Observable, sequentially, and emit the final value
Skip -> suppress the first n items emitted by an Observable

Buffer -> periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time

Concat -> emit the emissions from two or more Observables without interleaving them

Replay -> ensure that all observers see the same sequence of emitted items, even if they subscribe after the Observable has begun emitting items

Merge -> combine multiple Observables into one by merging their emissions

Reactive Programming

Rx概念介绍 Reactive是一种编程思想 Reactive编程就是异步数据流的编程,基于事件的编程

一切皆流

一个流是将要发生的有序序列事件的一部分。
你可以创建任何事物的数据流。任何事物都可以是流:变量,用户输入,属性 ,缓存,数据结构等等

它可以发出三种不同的事件:value,error或者completed

流(Observable)是被观察的对象。对于流的监听被称作订阅。我们定义的函数被称作观察者。

Rx扩展(Rxjava,RxJs,RxScala)提供了一个用于创建,变换,连接,过滤任何流的函数库。不仅某个流可以用于另一个流的输入,多个流同样可以作为其它流的输入。你也可以合并两个流。如果你对某些事件感兴趣,也可以通过对一个流的过滤获得另一个目标流。也可以将一个流中的数据映射到一个新的数据流。

优点

简洁
抽象层次高,你可以聚焦于定义业务逻辑的事件依赖,而不是大量的实现细节
有效避免callback hell,更少的中间状态变量

###缺点

代码抽象层次高,真正使用Rx思想解决问题需要一个过程
对于android来说,包比较大,方法数也不少

适用场景

异步 ? 线程切换 ? 事件组合?(多个请求和UI操作组合)

EveryWhere

Rx是一种思想,这里的一切都是流,你可以定义任何事物的流,可以是事件,可以是数据结构,任意发挥你的想象,通过Rx的方式来解决问题。(Twitter suggestion的实现)

Callback Hell

比如有一个链式请求调用,你首先需要根据第一个请求的结果去判断下一步的操作。那么就要处理多个请求的回调。不管是正确还是错误,你总需要通过callback处理。无形中多了不少代码量,创建了变量,浪费了内存,同时增加了错误的可能性。

Reference

[1]给 Android 开发者的 RxJava 详解
[2]深入浅出RxJava(一:基础篇)
[3]RxJava2-Android-Samples
[4]Reactive Programming

欢迎关注我的其它发布渠道