我有一个问题,为什么写这个不是为了安慰数字2,4,6? 解释是什么?
Observable.range(1, 6)
.groupBy(n => n % 2 === 0)
.concatMap(obs => obs)
.subscribe((n) => console.log(n), null, () => console.log('complete concatMap'))
// this is the output
1 -
3 -
5 -
complete concatMap
基本问题是您正在使用仅在前一个完成时才订阅下一个可观察concatMap
。groupBy
发出两个GroupedObservable
,所以它订阅了第一个,我认为在它可以订阅第二个之前,链就完成了。这意味着观察者从第一个GroupedObservable
接收complete
通知,因此您永远不会看到第二个GroupedObservable
的值(老实说,我不是 100% 确定它真的像这样发生,但这是有意义的,无需进一步调查您的示例)。
因此,如果您只想要第二组,您可以执行以下操作:
import { Observable } from 'rxjs';
Observable.range(1, 6)
.groupBy(n => n % 2 === 0)
.filter(o => o.key === true)
.concatMap(obs => obs)
.subscribe((n) => console.log(n), null, () => console.log('complete concatMap'))
查看现场演示(打开控制台):https://stackblitz.com/edit/rxjs5-sfused
我检查了源代码,groupBy
收到complete
通知后完成了所有组(它在收到来自range
的所有值后完成),因此concatMap
永远没有空间订阅第二个可观察的。
看到这个: https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/groupBy.ts#L200-L210
问题是groupBy
运算符为每个键发出Subjects
。
Concat map 仅在第一个完成后订阅后续Subject
,即它错过了从后续Subject
中捕获项目的机会,因为所有子流都在同一时间发出值。
致敬: https://blog.angularindepth.com/those-hidden-gotchas-within-rxjs-7d5c57406041
TL;博士:
GroupBy 接收subjectSelector
作为第 4 个参数。您可以使用它来强制使用ReplaySubject
而不是Subject
(默认)。
Observable.range(1, 6)
.groupBy(
n => n % 2 === 0,
null,
null,
() => new ReplaySubject() // <-- Here we go
)
.concatMap(obs => obs)
.subscribe((n) => console.log(n))
RxViz 上的演示