RxJS : 如何将可观察<可观察<Thing>>转换为可观察<事物[]>



我不是rxJS管道专家,我有一个问题,我认为很简单,我的代码归结为:

getThings = (): Observable<Thing[]> => 
this.getThingIds().pipe(
mergeMap((thingIds: number[]) => 
thingIds.map((id: number) => this.http.get<Thing>(`url${id}`))
)
);

问题是这返回一个Observable<Observable<Thing>>。是否有一种方法将其转换为所需的Observable<Thing[]>与一个算子?还是我从一开始就完全错了?

基本上,我需要为每个从getThingIds接收到的id做一个请求,并将所有这些结果转换为数组

我不认为你说得太远了。您可以将它们与combineLatestforkJoin组合—例如:

getThings = (): Observable<Thing[]> => 
this.getThingIds().pipe(
mergeMap((thingIds: number[]) => {
const things$ = [];
thingIds.forEach((id: number) => things$.push(this.http.get<Thing>(`url${id}`)));
return forkJoin(things$);
})
);

想给出一个性能更好的解决方案。虽然forkJoin()可以完成此操作,但订阅者必须等到所有Thing都被获取后才能发送数据。

下面的解决方案完成了同样的事情,但是给出了一个随着每个HTTP请求完成而增长的数组。

getThings$ = this.getThingIds$.pipe(
switchMap((ids:number[])=>
from(ids).pipe(
mergeMap(id=>this.http.get<Thing>(`url${id}`)),
scan((things, thing)=>[...things, thing], [] as Thing[])
)
)
);

这是逐行

  • 我们从switchMap()开始而不是mergeMap()。合并映射创建一个队列,而切换映射将中断任何正在进行的http请求。如果我们得到一个新的id数组,我们不希望等待所有旧的id先解析。
  • 接下来我们使用from(),它将创建一个可观察对象,发出id数组的每个值。
  • 现在我们使用mergeMap()设置http队列。
  • 接下来是scan()操作符,它类似于Array.reduce(),但会为前一行(HTTP队列)发出的每个值触发内部函数。我们使用扩展运算符将每个新的Thing添加到累积响应中。

注意:我将方法更改为变量,因为它们都不需要任何参数。它只是删除了最微小的样板文件。

最新更新