为什么 concatMap 没有订阅所有组由 Observables rxjs?



我有一个问题,为什么写这个不是为了安慰数字2,4,6? 解释是什么?

Observable.range(1, 6)
.groupBy(n => n % 2 === 0)
.concatMap(obs => obs)
.subscribe((n) => console.log(n), null, () => console.log('complete concatMap'))
// this is the output
1 -
3 -
5 -
complete concatMap

基本问题是您正在使用仅在前一个完成时才订阅下一个可观察concatMapgroupBy发出两个GroupedObservable,所以它订阅了第一个,我认为在它可以订阅第二个之前,链就完成了。这意味着观察者从第一个GroupedObservable接收complete通知,因此您永远不会看到第二个GroupedObservable的值(老实说,我不是 100% 确定它真的像这样发生,但这是有意义的,无需进一步调查您的示例)。

因此,如果您只想要第二组,您可以执行以下操作:

import { Observable } from 'rxjs';
Observable.range(1, 6)
.groupBy(n => n % 2 === 0)
.filter(o => o.key === true)
.concatMap(obs => obs)
.subscribe((n) => console.log(n), null, () => console.log('complete concatMap'))

查看现场演示(打开控制台):https://stackblitz.com/edit/rxjs5-sfused

我检查了源代码,groupBy收到complete通知后完成了所有组(它在收到来自range的所有值后完成),因此concatMap永远没有空间订阅第二个可观察的。

看到这个: https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/groupBy.ts#L200-L210

问题是groupBy运算符为每个键发出Subjects

Concat map 仅在第一个完成后订阅后续Subject,即它错过了从后续Subject中捕获项目的机会,因为所有子流都在同一时间发出值。

致敬: https://blog.angularindepth.com/those-hidden-gotchas-within-rxjs-7d5c57406041

TL;博士:

GroupBy 接收subjectSelector作为第 4 个参数。您可以使用它来强制使用ReplaySubject而不是Subject(默认)。

Observable.range(1, 6)
.groupBy(
n => n % 2 === 0,
null, 
null,
() => new ReplaySubject() // <-- Here we go
)
.concatMap(obs => obs)
.subscribe((n) => console.log(n))

RxViz 上的演示

最新更新