我有一个api,它一次搜索100条记录。如果有进一步的结果,它会在响应中返回一个光标键。
例如:第一次请求有效载荷
{
cursor: "",
query: "abc"
}
响应:
{
results: [...],
totalCount: 525,
cursor: "1234"
}
第二个请求有效载荷:
{
cursor: "1234",
query: "abc"
}
这将产生下100张唱片。
所以,总的来说,我将为525张唱片打6个电话。
由于这个API依赖于游标,所以它必须是递归的API调用。我想使用RxJS的expand操作符和pull操作符来提取每个结果对象的id。
结果对象结构:{id:";123〃;}
我的目标是整理525个记录id,并将其作为响应发送给这个API调用的订阅者。我在下面试了一下,但无法以具体的方式进行。
private searchRecordsByQueryWithCursor(
query: string,
cursor = ''
): Observable<RecordIds> {
const payload = {
limit: 100,
query: query,
returnedFields: ['id'],
cursor: cursor
};
return this.coreService.fetchSearchResultsByCursor(payload).pipe(
expand((response) => {
if (response.cursor) {
return this.coreService
.fetchSearchResultsByCursor({
...payload,
cursor: response.cursor
})
.pipe(
catchError(() => {
return of({
results: []
});
})
);
}
return EMPTY;
}),
pluck('results'),
reduce((acc: RecordIds, val) => {
return [...acc, ...val];
})
);
}
有人能为rxjs管道的正确结构提供指导吗?
所需输出:["123"、"456"、"789"。。。]
全部525个id。
让我们为这个例子使用一个简化的界面:
interface Response {
data: number[];
cursor: number;
}
function fetch(cursor: number): Observable<Response> { ... }
只要定义了cursor
,就可以递归地调用fetch()
,基本流程如下所示:
const items$ = fetch().pipe(
expand(response => fetch(response.cursor)),
takeWhile(response => !!response.cursor, true),
reduce((all, {data}) => all.concat(data), [])
);
expand
将接受先前的发射,并通过先前响应的光标继续调用fetch()
- CCD_ 5将结束";扩展流";当它接收到没有
cursor
的响应时- 我们通过CCD_ 7;包括";paramater,以免丢弃未定义光标时返回的数据
reduce
将把所有结果累加到一个数组中
输出:
> fetch(0) { data: Array[3], cursor: 1 }
> fetch(1) { data: Array[3], cursor: 2 }
> fetch(2) { data: Array[3], cursor: 3 }
> fetch(3) { data: Array[3], cursor: undefined }
items$ [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 ]
这是一个正在工作的StackBlitz示例。
作为通过takeWhile()
终止的替代方案(如BizzyBob的示例(,另一种选择是直接在expand()
内部进行测试,类似于递归函数中的基本情况:
const items$ = fetch().pipe(
expand((response) => {
if (!response.cursor) {
return EMPTY;
}
return fetch(response.cursor);
}),
reduce((all, { data }) => all.concat(data), [])
);
这是因为reduce()
在调用expand()
询问同一响应的游标部分之前消耗了该响应的数据部分。返回EMPTY
完成流(并终止递归(。
这是一个正在工作的StackBlitz示例。
当服务器没有返回数据结束指示时,也可以在管道中抛出take(1000)
来处理病理情况,这不是一个坏主意。一些大但有限的值就可以了。
修改reduce()
中的累积值并返回修改后的值也是安全的,因为它是一个按引用类型;这避免了必须创建多个中间数组值。