通过最近的实际使用学习让自己重新认识了一下RxJS. 写一下最近的收获.

Observable

一个`Observable`是一个可被观察的对象, 往往充当数据源的操作, 数据源可能来自事件(EventEmitter)/WebSocket/HttpResponse等。数据源是不可修改的, 就像Promise, 本身表示的就是一个连续事情本身.

const Rx = require('rxjs');

const Ob$ = new Rx.Observable.create(observer => {
    observer.next(Rx.Observable.of(1));
    observer.complete();
});

Ob$.subscribe(console.log);
ScalarObservable { _isScalar: true, value: 1, scheduler: null }

这里打印了一个`ScalarObservable`表示是一个只有一个简单值的Observable. 同时看到订阅后`Observable`直接打印出来了, 所以不能将它当作一个Monad使用.

Oberver

`Observer`表示观察者, 可被subscribe. 可以用鸭子类型填充.

const Rx = require('rxjs');

const myObserver = {
    next: (value) => {
        console.log('<value> ->', value);
    },
    complete: (value) => {
        console.log('<complete> ->', value);
    },
    error: (value) => {
        console.log('<error> ->', value);
    }
}

const ob$ = Rx.Observable.from([1, 2, 3]);

ob$.subscribe(myObserver);
<value> -> 1
<value> -> 2
<value> -> 3
<complete> -> undefined

Subject

一个Subject即是Observable也是Observer. 通常用于实现广播的操作, 主要是为了下面这个问题而出现

const Rx = require('rxjs');

const ob$ = Rx.Observable.interval(100).take(3)
      .do(() => console.log('side effect'))

ob$.subscribe((value) => {
    console.log('A:', value);
});

ob$.subscribe((value) => {
    console.log('B:', value);
});

side effect A: 0 side effect B: 0 side effect A: 1 side effect B: 1 side effect A: 2 side effect B: 2

这里ob$被订阅了两次,发现`side effect`也跟着每次执行两遍. 在通常情况下这都不是我们想要的广播操作. Observable会每次subscribe后从头到尾执行一遍所有操作.

这种情况下可以使用Subject来解决这个问题.

const Rx = require('rxjs');

const ob$ = Rx.Observable
      .interval(100)
      .take(3)
      .do(() => {
          console.log('side effect');
      });

const subject$ = new Rx.Subject();

ob$.subscribe(subject$);


subject$.subscribe((value) => {
    console.log('A ->', value);
});

subject$.subscribe((value) => {
    console.log('B ->', value);
});
side effect
A -> 0
B -> 0
side effect
A -> 1
B -> 1
side effect
A -> 2
B -> 2

上面的代码原始Observable每emit一次, 两个subscription都会接收到对应的值.

然后我们发现直接操作Subject是不符合设计原则的, 因为如果我们在一个操作中返回一个Subject后,外部代码可以拿着这个Subject手动调用`next`。针对这种情况Subject提供了toObservable来将Subject转换成只读模式

function someAction () {
    ...
        return subject$.toObservable();
}

BehaviorSubject

基本功能与Subject类似, 但是BehaviorSubject会保留最后一次的值给订阅者, 初始化时会有一个默认的初始值.

例如一个连续的1—2—3—|事件流,如果订阅者在2,3之间开始订阅,则订阅者第一次会收到上一次的值,也就是2.

ReplaySubject

会将所有历史值都保存下来, 并在订阅时依次触发.(microtask 同步)

const Rx = require('rxjs');

const ob$ = Rx.Observable
      .interval(10)
      .take(5);

const bufferSize = 10;

const subject$ = new Rx.Subject;
const replaySubject$ = new Rx.ReplaySubject(bufferSize);
const behaviorSubject$ = new Rx.BehaviorSubject;

ob$.subscribe(subject$);
ob$.subscribe(replaySubject$);
ob$.subscribe(behaviorSubject$);

setTimeout(() => {
    subject$.subscribe((value) => {
        console.log('normal subject:', value);
    });

    replaySubject$.subscribe(value => {
        console.log('replay subject:', value);
    });

    behaviorSubject$.subscribe(value => {
        console.log('behavior subject:', value);
    });

    console.log('->subscribe done<-');
}, 40);
replay subject: 0
replay subject: 1
replay subject: 2
behavior subject: 2
->subscribe done<-
normal subject: 3
replay subject: 3
behavior subject: 3
normal subject: 4
replay subject: 4
behavior subject: 4

Scheduler

观察一下下面代码的输出

const Rx = require('rxjs');

const ob$ = Rx.Observable.from([1, 2, 3]);


console.log('subscribe begin');

ob$.subscribe((value) => {
    console.log('output: ', value);
});

console.log('subscribe end');
subscribe begin
output:  1
output:  2
output:  3
subscribe end

结果有点不符合预期, Observable中的操作默认都是同步执行的. 但是在很多情况下这都不满足我们的需要,因为我们想让ob中的操作在当前的执行都完成之后再执行.这种情况下就需要Scheduler来调度流的执行顺序. Scheduler决定了什么时刻再执行下一次操作.

Rxjs默认提供了四种类型的scheduler:

Scheduler 用途
null 默认行为, 同步递归执行
Rx.Scheduler.queue 弹性调度
Rx.Scheduler.asap process.nextTick执行或下一个microtask
Rx.Scheduler.async 异步调度, 类似setInterval(0)

Scheduler.async

async是最好理解的一种, 会在下一个MacroTask中执行. 内部实现为setInterval

const Rx = require('rxjs');

const ob$ = Rx.Observable.from([1, 2, 3]);

const delay = 0;

console.log('subscribe begin');

ob$.subscribeOn(Rx.Scheduler.async, delay)
    .subscribe((value) => {
        console.log('output: ', value);
    });

console.log('subscribe end');

setTimeout(() => {
    console.log('timeout');
}, delay);
subscribe begin
subscribe end
output:  1
output:  2
output:  3
timeout

subscribeOn的第二个参数如果不传默认为0.

另外这里使用`subscribeOn`用来仅仅影响subscribe的触发时机. 可以用`observeOn`来影响操作符的调度时间.

Scheduler.asap

asap类似与async, 不同的是会将当前任务放到MicroTask执行.

const Rx = require('rxjs');

const obA$ = Rx.Observable.of('').startWith('A', Rx.Scheduler.asap);
const obB$ = Rx.Observable.of('').startWith('B', Rx.Scheduler.async);

Rx.Observable.merge(obA$, obB$)
    .filter(x => !!x)
    .subscribe((value) => {
        console.log(value);
    });
A
B

Scheduler.queue

queue如果不传参数也是同步执行的一种,不同的是内部实现使用的是循环实现而不是默认的递归.

默认的方式(null):

const Rx = require('rxjs');

const a$ = Rx.Observable.of('a1','a2').do(console.log);
const b$ = Rx.Observable.of('b1','b2').do(console.log);
const c$ = Rx.Observable.of('c1','c2').do(console.log);

Rx.Observable.combineLatest(a$, b$, c$)
    .subscribe(console.log);
a1
a2
b1
b2
c1
[ 'a2', 'b2', 'c1' ]
c2
[ 'a2', 'b2', 'c2' ]

默认的行为为深度优先,会优先计算深度,直到第一个条件满足,也就是c1才开始满足条件.

const Rx = require('rxjs');

const a$ = Rx.Observable.of('a1', 'a2', Rx.Scheduler.queue).do(console.log);
const b$ = Rx.Observable.of('b1', 'b2', Rx.Scheduler.queue).do(console.log);
const c$ = Rx.Observable.of('c1', 'c2', Rx.Scheduler.queue).do(console.log);

Rx.Observable.combineLatest(a$, b$, c$, Rx.Scheduler.queue)
    .subscribe(console.log);
a1
b1
c1
[ 'a1', 'b1', 'c1' ]
a2
[ 'a2', 'b1', 'c1' ]
b2
[ 'a2', 'b2', 'c1' ]
c2
[ 'a2', 'b2', 'c2' ]

null

默认的行为,同步调度.

此外除了这些Rx还实现了Scheduler.animationFrame等schedule. 可以方便浏览器实现一些动画效果.

Hot-Cold模式

众所周知Rx的Observable为Cold的流,Cold模式的流只有订阅了之后才真正开始执行并且每次订阅实际上是创建了一个和原来一样的流。

Hot模式除了不订阅就可以执行,也可以进行广播, 表现的更像一个Subject.

最简单的方法是使用publish操作符来将一个Observable转换成ConnectableObservable达到转换成Hot的效果

const Rx = require('rxjs');

const proxy$ = Rx.Observable.from([1,2,3])
      .do(() => console.log('side effect'))
      .publish();



proxy$.subscribe((value) => {
    console.log('A: ', value);
})

proxy$.subscribe(value => {
    console.log('B: ', value);
});

proxy$.connect();
side effect
A:  1
B:  1
side effect
A:  2
B:  2
side effect
A:  3
B:  3

ConnectableObservable还有一个有趣的操作符refCount, 将该Hot流转换为不完全Hot的流,只有至少一个订阅者存在才真正执行这个流. 做为对比: connect是手动打开流的开关,refCount是自动打开流的开关.

const Rx = require('rxjs');

const proxy$ = Rx.Observable.interval(100)
      .do(() => console.log('side effect'))
      .take(3)
      .publish()
      .refCount()

setTimeout(() => {
    console.log('timeout');
    proxy$.subscribe((value) => {
        console.log('A: ', value);
    })
}, 500);
timeout
side effect
A:  0
side effect
A:  1
side effect
A:  2

官方文档中有个不错的叙述, 将ConnectableObservable比作一场表演,当没有观众(observer)的时候表演者退场休息,当来了观众之后才开始真正表演.