使用RxJS扩展运算符递归调用API



我有一个api,它一次搜索100条记录。如果有进一步的结果,它会在响应中返回一个光标键。

例如:第一次请求有效载荷

{
cursor: "",
query: "abc"
}

响应:

{
results: [...],
totalCount: 525,
cursor: "1234"
}

第二个请求有效载荷:

{
cursor: "1234",
query: "abc"
}

这将产生下100张唱片。

所以,总的来说,我将为525张唱片打6个电话。

由于这个API依赖于游标,所以它必须是递归的API调用。我想使用RxJS的expand操作符和pull操作符来提取每个结果对象的id。

结果对象结构:{id:";123〃;}

我的目标是整理525个记录id,并将其作为响应发送给这个API调用的订阅者。我在下面试了一下,但无法以具体的方式进行。

private searchRecordsByQueryWithCursor(
query: string,
cursor = ''
): Observable<RecordIds> {
const payload = {
limit: 100,
query: query,
returnedFields: ['id'],
cursor: cursor
};
return this.coreService.fetchSearchResultsByCursor(payload).pipe(
expand((response) => {
if (response.cursor) {
return this.coreService
.fetchSearchResultsByCursor({
...payload,
cursor: response.cursor
})
.pipe(
catchError(() => {
return of({
results: []
});
})
);
}
return EMPTY;
}),
pluck('results'),
reduce((acc: RecordIds, val) => {
return [...acc, ...val];
})
);
}

有人能为rxjs管道的正确结构提供指导吗?

所需输出:["123"、"456"、"789"。。。]

全部525个id。

让我们为这个例子使用一个简化的界面:

interface Response {
data: number[];
cursor: number;
}
function fetch(cursor: number): Observable<Response> { ... }

只要定义了cursor,就可以递归地调用fetch(),基本流程如下所示:

const items$ = fetch().pipe(
expand(response => fetch(response.cursor)),
takeWhile(response => !!response.cursor, true),
reduce((all, {data}) => all.concat(data), [])
);
  • expand将接受先前的发射,并通过先前响应的光标继续调用fetch()
  • CCD_ 5将结束";扩展流";当它接收到没有cursor的响应时
    • 我们通过CCD_ 7;包括";paramater,以免丢弃未定义光标时返回的数据
  • reduce将把所有结果累加到一个数组中

输出:

> fetch(0) { data: Array[3], cursor: 1 }
> fetch(1) { data: Array[3], cursor: 2 }
> fetch(2) { data: Array[3], cursor: 3 }
> fetch(3) { data: Array[3], cursor: undefined }
items$ [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 ]

这是一个正在工作的StackBlitz示例。

作为通过takeWhile()终止的替代方案(如BizzyBob的示例(,另一种选择是直接在expand()内部进行测试,类似于递归函数中的基本情况:

const items$ = fetch().pipe(
expand((response) => {
if (!response.cursor) {
return EMPTY;
}
return fetch(response.cursor);
}),
reduce((all, { data }) => all.concat(data), [])
);

这是因为reduce()在调用expand()询问同一响应的游标部分之前消耗了该响应的数据部分。返回EMPTY完成流(并终止递归(。

这是一个正在工作的StackBlitz示例。

当服务器没有返回数据结束指示时,也可以在管道中抛出take(1000)来处理病理情况,这不是一个坏主意。一些大但有限的值就可以了。

修改reduce()中的累积值并返回修改后的值也是安全的,因为它是一个按引用类型;这避免了必须创建多个中间数组值。

最新更新