RxJS - 将发出的值映射到多个 Http 请求的组合输出



我有这样的东西:

// getNums returns of Obsevable<number[]>
nums : Observable<number[]> = getNums();
// requestNum is a server call that takes a number and returns Observable<number>
serverNums : Observable<Observable<number>[]> = x.pipe(
    map(num_array => num_array.map(n => requestNum(n)))
);
// Part of an external API that I don't own.
function requestNum(num : number) : Observable<number> {
  // make a server call
}

问题是serverNums属于 Observable<Observable<number>[]> 型。我需要serverNums才能Observable<number[]>这样我才能做到

this.result = combineLatest(nums, serverNums).pipe(
    [nums : number[], serverNums : number[]] => {
      for (i=0; i<nums.length; i++) {
        console.log(nums[i], serverNums[i]);
      }
    }
);

因为this.result必须在我的构造函数中实例化为类型 Observable<Result[]>,而不是在订阅中设置。

所以我不能做subscribe(() => {this.result = result}).

使用 forkJoin 执行并组合所有 http 请求,并将从源发出的数组合并映射到 forkJoin 返回的 Observable。

import { forkJoin } from 'rxjs'; 
import { mergeMap } from 'rxjs/operators';
serverNums : Observable<number[]> = x.pipe(
    mergeMap(num_array => forkJoin(num_array.map(n => requestNum(n))))
);

forkJoin 会将所有 http 请求中最后(也是唯一)发出的值合并到一个数组中,在所有请求完成后。请注意,requestNum必须在发出值后完成,forkJoin才能正常工作。(Angular HttpClient发出的 Http 请求在发出值后完成)

根据您的可观察x和所需的行为,您可以使用 switchMap 而不是 mergeMap

您没有解释数据接口的结构是什么,所以我可以从数组中假设:

[1,2,3]

您希望创建对象:

{1:1, 2:2, 3:3}.

(如果您发布数据接口,我将相应地更新答案)。


要从数组创建对象,您可以使用 reduce:

x.pipe(reduce((pre,curr)=>{pre[curr]=curr; return pre;}, {})

点从一个空对象 ( {} ) 开始,并在每次迭代中将 pre[curr] 属性添加到该对象并将其设置为 curr。

  • pre 是上次迭代中的上一个值
  • curr 是当前迭代的当前值(在上面示例中为 1、2 或 3)

编辑:

你需要的是使用mergeMap(AKA flatMap)将Observable<Observable<number>>展平为Observable<number>

serverNums : Observable<Observable<number>[]> = x.pipe(
    map(num_array => num_array.map(n => requestNum(n)),
    mergeMap(num=>num)
)

这将在单个可观察量类型的可观察量中发出 requestNum(n) 的所有值。

最新更新