我不是rxJS管道专家,我有一个问题,我认为很简单,我的代码归结为:
getThings = (): Observable<Thing[]> =>
this.getThingIds().pipe(
mergeMap((thingIds: number[]) =>
thingIds.map((id: number) => this.http.get<Thing>(`url${id}`))
)
);
问题是这返回一个Observable<Observable<Thing>>
。是否有一种方法将其转换为所需的Observable<Thing[]>
与一个算子?还是我从一开始就完全错了?
基本上,我需要为每个从getThingIds
接收到的id做一个请求,并将所有这些结果转换为数组
我不认为你说得太远了。您可以将它们与combineLatest
或forkJoin
组合—例如:
getThings = (): Observable<Thing[]> =>
this.getThingIds().pipe(
mergeMap((thingIds: number[]) => {
const things$ = [];
thingIds.forEach((id: number) => things$.push(this.http.get<Thing>(`url${id}`)));
return forkJoin(things$);
})
);
想给出一个性能更好的解决方案。虽然forkJoin()
可以完成此操作,但订阅者必须等到所有Thing
都被获取后才能发送数据。
下面的解决方案完成了同样的事情,但是给出了一个随着每个HTTP请求完成而增长的数组。
getThings$ = this.getThingIds$.pipe(
switchMap((ids:number[])=>
from(ids).pipe(
mergeMap(id=>this.http.get<Thing>(`url${id}`)),
scan((things, thing)=>[...things, thing], [] as Thing[])
)
)
);
这是逐行
- 我们从
switchMap()
开始而不是mergeMap()
。合并映射创建一个队列,而切换映射将中断任何正在进行的http请求。如果我们得到一个新的id数组,我们不希望等待所有旧的id先解析。 接下来我们使用 - 现在我们使用
mergeMap()
设置http队列。 - 接下来是
scan()
操作符,它类似于Array.reduce()
,但会为前一行(HTTP队列)发出的每个值触发内部函数。我们使用扩展运算符将每个新的Thing
添加到累积响应中。
from()
,它将创建一个可观察对象,发出id
数组的每个值。注意:我将方法更改为变量,因为它们都不需要任何参数。它只是删除了最微小的样板文件。