RxJS在并行批量/队列中运行异步任务Array.prototype.map



假设我有一个变量数组,比如:[SashaMishaCaitlyn...String](string[](等等。它有一个大的.length,大约有10k个元素。

我想用它们运行一个异步并行任务,但不是像Promise.all那样一次全部运行,而是像这样批量运行:

0 <= return await result
1 <= return await result, then next 2 (or N)
2
3
4
5
6
7

当然,我可以用各种方式来做,比如用for-loop迭代原语,并在其中做出承诺,然后运行它们,或者使用p-limit,但我听说RxJS及其运算符可以帮助我做到这一点。

根据RxJS bufferCount,它看起来就像我正在寻找的东西,但我仍然找不到必要的例子。

p.S.如果可能的话,我不喜欢在另一个变量中重新创建我的基元数组,并且有两个不同的数组,分别是约20k基元和约20k承诺。我更喜欢通过bulk(N(迭代原语,然后形成promise,等待它们响应,然后迭代到下一个bulk(N(

这取决于您想要阻止异步调用的确切方式。

假设您有N个输入值,并且您希望按大小分组处理它们M.

是否要阻止处理下一个组,直到当前组完成了吗?(这与您的"返回等待结果"更接近,则下一个N〃;要求(。

如果是,那么bufferCount就是方法。

from(members).pipe(
bufferCount(N),
// concatMap completes an async operation for each group in sequence. Nothing
// happens with the next group until the current group is done.
concatMap(async (groupOfN) => {
// process group
});
)

据我所知,bufferCount也是一个选项,但使用MergeMap和第二个arg似乎可以更容易地实现。

import { from } from 'rxjs';
import { mergeMap } from 'rxjs/operators';
async function t(members: string[]) {
await from(members).pipe(
// try / catch block is optional
mergeMap(async obj => { // remember to use `async` for awaiting result
console.log(obj + '1') // any async action you do, http requests, or DB updates
}, 2), // where 2 represent parallel
).toPromise()
};
t(['a', 'b', 'c', 'e', 'd', 'e', 'f', 'g', 'h', 'i']);

最新更新