上次介绍了redux-saga 可以在React中管理我们的异步action, 这次主要介绍RxJS来管理我们的所有异步操作。

RxJS起源于微软最早提出的ReactiveX范式,是一套异步框架处理规范(相当于promise是javascript对异步的统一规范), 也是作为promise超集的存在。 使用RxJS主要是抽象了异步处理逻辑,从局部管理异步状态中解耦,让代码的可读性可维护性更高。

RxJS将时间轴上的事件作为一个流处理,可能来源于点击事件,或者是WebSocketmessage事件, 或者动画响应等等。 RxJS用Observable表示时间轴上的事件流, Observerable提供了可以让我们执行过滤/变换/拼接一系列的操作, 最后再被真实的事件接收者所捕捉。让异步处理更加Smooth.

用RxJS我们写的代码就像下面这样

注: 本文使用的是RxJS5

import Rx from 'rxjs'

const identity = a => a;

const myObserable$ = Rx.Observable.fromEvent(dom, 'myevent')
  .filter(identity)

myObserable$.subscribe((item) => {
  console.log(item)
})

这个例子中我们简单地使用了Rxjs来帮我们过滤掉自定义事件的空事件。 在RxJS中常常约定observer对象的变量名尾部用$, 以此来标识此对象为Observable对象。

上面例子中Rx.Observable.fromEvent 为RxJS的工厂函数,用来从DOM事件创建Observable, 常用的还有fromPromise, fromObservable. 上面代码的filter是RxJS的operator, 用来操作流, RxJS提供了非常多的operator, 抽象了各种我们常规的逻辑操作, 基本上如果你的程序没有特别逆天的操作使用RxJS内置的operator都能完成。 如果不能使用RxJS来解决,一: 有可能你还不会用RxJS, 二: 设计不合理。

map

第一个operator 介绍map, map是我们最常见的操作符了,用法也相当的简单,执行输入到输出的映射。

const myObservable$ = Rx.Observable.fromEvent(dom, 'click')
    .map((e) => ({event: e}))

myObservable$.subscribe((e) => {
  console.log(e)
})

使用事件流图来表示如下.

e-->-----------e -->----------e--->--------->

        map(e => ({event: e}))

({event: e})-->({event: e})->({event: e})--->

这里需要预先说明,RxJS的事件流为冷模式的, 也就如果一个observable对象没有subscribe, 那么他将不会处理事件流,只有subscribe后才会真正的开始工作。 可能乍一想没什么问题,不过如果你想借助一些其他operator执行一些副作用函数来解决你的问题,可能你的subscribe压根就没用,这时候如果你懒得subscribe,那么你的事件流将不会工作。

switchMap

switchMap 基本功能和map一样, 如果我们的变换函数是个同步函数他们的功能完全一样,switchMap是为了解决异步变换而出现的。

假设我们的界面上有个刷新按钮,点一下就会重新获取一个大表格的数据,这些数据请求由于网络原因可能比较慢,所以时间都比较久。有个问题是如果我们点击了刷新按钮,在数据还没有返回之前又点击了一次, 这时候传统的方法直接渲染请求结果可能导致不正确的现象,因为可能数据实时变化导致历史数据显示到了屏幕上。

我们借助RxJS来帮我们实现这个功能。 用到的operator为switchMap.

const Refresh = () => { ... return some heavily ajax request ... }
const myObservable$ = Rx.Observable.fromEvent(dom, 'click')
    .switchMap(() => Rx.Observable.fromPromise(Refresh()))

myObservable.subscribe((newContent) => {
    // trigger rerender
})

当我们点击事件触发后,执行switchMap的函数体,这里我们创建了一个Rx.Observable.fromPromise将我们的ajax的promise转换成Observable, switchMap会将此事件流加入到这个流处理中进行等待(在ajax返回之前不会触发subscribe调用)。 此时我们又点击了一次按钮之后switchMap发现有新的事件到来,而现在还有一个流没执行结束,switchMap会放弃旧的流转而执行新的操作。由此来达到我们只处理最新的需求。

事件流图如下所示:

switch-map

exhaustMap

map的变体2, 也是主要解决异步map的问题。

场景: 假设我们有一个上传文件的按钮,嗯,又是个按钮,而且操作比较重,这个场景是在一个上传操作执行的时候我们是不响应其他上传操作的,防止误点导致重复上传。

const Upload = () => { ... return some heavily ajax request ...}
const myObservable$ = Rx.Observable.fromEvent(dom, 'click')
    .exhaustMap(e => Rx.Observable.fromPromise(Upload()))

myObserable.subscribe((result) => {
  console.log(result)
})

exhaustMap 的作用是加入一个Observable到本流的处理单元中,并且在处理结束之前忽略其他到来的事件流。

exhaustMap 让我们防并发请求的逻辑中解耦了,可以非常优雅地解决这个问题。

事件流图如下所示:

exhaustMap

buffer

buffer 是一类缓冲区operator, 相应的变体有bufferTime, bufferCount, bufferToggle.

RxJS也提供了debouncethrottle两个操作,如果之前你使用过这两个函数也可以直接在RxJS中使用。但是debouncethrottle分别是处理一段时间内最后一个与第一个请求,他们会丢掉非截止时间的其他所有请求。 如果每个事件都是有效的,那么就需要借助buffer来处理了。

buffer可以收集一段时间内的所有事件,并且将它传入下一个operator, 参数是事件的数组。

相关代码暂时不写 ,比较容易理解。

事件流图如下所示:

buffer

scan

scan 操作类似fp中的reduce, 可以累计计算某个值。

scan((acc, item) => newacc)

acc 为上次scan结束时的值,newacc是本次计算的值,并且传入下一个operator.

如果想像reduce一样设置一个起始值,可以在scan之前设置一个startWith(0)

例如我有个事件会输出1,2,3

 myobservable$.scan((acc, item) => acc * item)
     .startWith(1)
     .scan((newacc) => {result: newacc})
     .subscribe(console.log)

组合应用

说operator有点累了,RxJS提供了很多抽象的operator, 使用的时候基本上都会再去看一遍,根据2.8原则 常用的就那几个。

开始说过,如果一个异步行为无法用RxJS描述,那么有可能是你还不会用RxJS

下面来点高级点的组合应用:

场景1:

收集用户所有的点击事件,直到出现一个200ms的间隔。

想象一下你设计的一个游戏,触摸屏幕上不同位置即可出现一个魔法轮,你点击的越快出现的越多,为了不让自己受伤你不能无限地触发魔法,因为可能敌方会对你进行攻击。这时候你突然止手就认为这是一个发射魔法的事件,此时魔法轮开始向目标飞去。然后不断重复此循环。

代码:

  const mahouObservable$ = Rx.Observable.fromEvent(dom, 'touch')
  const debounce$ = mahouObservable$.debounceTime(200)
  const collections = mahouObservable$.buffer(debounce$)
  collections.subscribe(collections => {
    collections.map(shoot)
  })

流图如下

-> -> mahouObservable$ ->e1 ->e2 ->e3 ->e4 ->e5 --(200ms)--->e ->e ->e ->
             debounce$ -> ---------------- -------->e5--->-------------->
          collections$ ->-------------------------[e1...e5]------------->
                        (buffer收集中,等待触发结束事件)

结果是collections收集到了[e1,…e5],来进行统一处理

(所以说不要让我造例子啊喂)

** 生成reactive diagram可以从这个网站: http://rxmarbles.com/