RxJS 使用之 pipe 管道
前言
之前我们写了 RxJS 的基本的使用方法
接下来写一写 RxJS 的,我个人觉得最重要的,也是最核心的一个概念:管道
正文
如果把 Observable 、 Observer 比喻成一部房子的地基
那么 pipe 就是建在地基上的房子,展现在大家的眼前
RxJS 封装了许多不同的 pipe ,组合这些函数,使得房子可以变化出不同的样子
使用 pipe
在新建 Observable 对象之后,我们可以调用 .pipe(p1(), p2(), ... , pN()) 来组合各式各样的管道
1 | import { Observable } from "rxjs"; |
需要注意的时,pipe 函数返回了一个 Observable 对象
如果管道为空,即调用了 pipe() ,那么返回的 Observable 就等于原对象,即 newObservable === observable
如果管道不为空,比如调用 pipe(map(x => x * 2)) ,那么返回的 Observable 就不等于原来的原来的 Observable 对象
在内部实现中,我们发现 pipe 依赖于 pipeFromArray

从调用上,可以发现 pipeFromArray 是一个返回函数的函数
看下它的实现

在参数为 0 个时返回了 identity 这个函数
而 identity 的实现就是返回自身

所以当参数为 0 个时, pipe 的实现等价于如下:
1 | class Observable<T> implements Subscribable<T> { |
常见的 pipe 函数
map
map 很像 js 中数组的 map ,都是对原值映射成新值的一个函数
1 | import { map, Observable } from "rxjs"; |
以上例子会输出 0 2 4,即对 0 1 2 的每一个数字都乘以 2
filter
与数组的 filter 方法同理
1 | import { filter, Observable } from "rxjs"; |
以上就会输出 0 2 ,即过滤了不是偶数的数字,在原输出 0 1 2 中非偶数的数字为 1
tap
tap 和调用 subscribe 方法很相像,都是传入一个 Observer 对象,但是 tap 可以在 pipe 的各个阶段织入,获取当时对应的值
而 subscribe 只能获取 pipe 处理完之后的值,比如:
1 | import { filter, map, Observable, tap } from "rxjs"; |
运行上面的例子,会打印出
1 | tap1: 0 |
这里需要注意的时,我们必须调用 subscribe ,管道内的函数才会被执行
一般情况下,tap 都是被用来 debugger 程序的,就像上面的例子,由于组合了许多 pipe 函数
但是我们需要定位是在哪个 pipe 函数出了问题,那么这时就可以在这个 pipe 前后加一个 tap 来打印值
同时 tap 也是一个执行副作用的地方,对于管道函数,它最好是纯函数,即相同的输入一定会有相同的输出,比如上面的 map , filter
tap 提供了一个“逃生舱”,使得程序可以在特定的 pipe 位置执行一些副作用(比如发起网络请求,或者修改全局的某些变量),但是不会影响整个 pipe 的执行结果
max 和 min
统计最大值和最小值
1 | import { max, min, Observable } from "rxjs"; |
take 和 skip
take 接受一个数字,表示只取前几个值
skip 接受一个数字,表示跳过前几个值,和 take 相反
1 | import { Observable, skip, take } from "rxjs"; |
以上会输出 0 1 2 以及 6 7 8 9
first 和 last
first 就是 take(1) ,即取第一个元素
而 last 就是取最后一个元素
1 | import { first, last, Observable } from "rxjs"; |
以上例子输出 0 和 9
当然,也可以对这个值进行限制,比如我们像找第一个大于 5 的值,可以往 first 里面传入一个验证函数
这样就可以拿到第一个使得验证函数为 true 的值了
1 | import { first, Observable } from "rxjs"; |
以上例子输出 6
debounceTime 和 throttleTime
防抖和节流函数 RxJS 也给我们封装好了
传入时间即可实现对应的功能
防抖例子
1 | import { debounceTime, Observable } from "rxjs"; |
以上例子只输出 5 ,当输出 1 的时候,在等待 1s 之后才会输出,而等待 500ms 之后又输出了 2 ,此时 1 就不该输出了,重新对 2 进行计时,数字 3 , 4 同理
节流例子
1 | import { Observable, throttleTime } from "rxjs"; |
以上例子输出 1 5 ,节流使得一段时间内只拿到一个值,忽略这段时间产生的其他值
在例子中,不使用节流的话,输出为 1(0) 2(300) 3(600) 4(900) 5(1200)
在 1000ms 内只拿到一个值,所以拿到了 1 和 5
当然,这是针对 throttleTime 的默认 ThrottleConfig 参数({ leading: true, trailing: false })的情况下,即先拿值再计时
如果 ThrottleConfig 参数改成 { leading: false, trailing: true } 的话,那么输出就是 4 和 5 了
创建自定义 pipe 函数
上面我们讲了很多官方的提供的 pipe 函数,但有时我们需要的功能官方提供的函数无法实现
这时候就可以通过创建自定义 pipe 函数
在这之前,我们需要明白 pipe 函数究竟是个啥
这里我们以 map 的源码为例子

可以看到返回了执行 operate 函数,传入了一个函数参数,查看 operate 的源码

通过实现我们可以发现,管道的最外层就是一个传入上一个 Observable ,然后返回一个新的 Observable 的一个函数
可能由于类型看的会比较乱,我们把类型和错误捕获删了,只留下核心逻辑
1 | function operate(init) { |
这里发现它调用了 Observable 的 lift 方法,源码如下

发现它创建了一个新的 Observable 对象,挂载了一些属性
可能到这里你会无法理解,发出“这个 source 属性是干嘛的,这个 operator 属性又是干嘛的”的疑问
但是不要急,我们发现这个方法它是 deprecated 的,这意味着它不被官方推荐使用
我们看下 deprecated 注释的内容
Internal implementation detail, do not use directly. Will be made internal in v8. If you have implemented an operator using
lift, it is recommended that you create an operator by simply returningnew Observable()directly. See “Creating new operators from scratch” section here: https://rxjs.dev/guide/operators
翻译过来就是:这个一个内部实现,请不要直接使用,它会在版本 8 上转为内部(目前暴露在了 Observable 的属性上),如果你需要通过 lift 方法实现一个操作,推荐直接通过 new Observable() 返回一个新的对象
现在就好办了,直接返回一个新的 Observable 就完事了,于是乎我们可以写出下面的框架代码:
1 | const myPipe = () => { |
回过头来,我们来看 map 的实现
1 | export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> { |
通过对源 Observable 调用 subscribe ,传入了一个经过 createOperatorSubscriber 包装过后的 Observer ( Subscriber 是 Observer 的实现)
然后通过新的 Observable 的 next 来发送映射之后的值,即 project.call(thisArg, value, index++)
上面的代码可能比较晦涩,换成下面这样写,就非常容易理解了
1 | const myPipe = (mapFn, context) => { |
然后我们写个例子测试一下:
1 | import { Observable, map } from "rxjs"; |
输出如下:

后记
当然,出了本文提到的 pipe ,RxJS 还内置了非常多的管道
可以点击 RxJS Operators 查看
全记住我觉得还是很难的,如果有需要自定义管道时,可以先翻一翻文档,如果有那就直接用,如果可以组合已有管道,那就不要自己再写了
在完全没有的情况下,再考虑自己写管道