如何使用javascript rxjs来运行批量计算



我是rxjs的新手,但需要使用这种异步机制来完成这样的任务:我有很多计算请求,比如10K,我想批量执行它们:每批1K,批量是预先确定的。只有当当前批次完成时,我才会转到下一个批次。功能将是:calcBatch(数据集(输入日期集将来自一组数据集:datasets=[…]同步环路看起来像:

datasets.foreach(dataset=> {
while (!calculation_is_done) {
wait();
}
calcBatch(dataset);
});
calcBatch(dataset) {
calculation_is_done = false;
/* calculation */
calculation_is_done = true;
}

现在切换到异步机制,我应该如何构建流?我想的是,在calcBatch中,当工作完成时,将返回一个promise或observable。然后在循环中,订阅者将监听此promise或observable,一旦捕获,将为下一批调用calcBatch。

由于后端(HTTP(无法处理一整套计算,因此需要批量处理计算。

我认为bufferCountconcatMap可能是您所需要的:

function calcBatch(dataset) {
console.log('Processing batch:', dataset)
// Fake async request
return new Promise(res => {
setTimeout(() => res(dataset), 2000)
})
}
from(datasets)
.pipe(
bufferCount(1000),
concatMap(dataset => calcBatch(dataset))
)
.subscribe(dataset => {
console.log('Batch done:', dataset)
})

注意:calcBatch需要返回一个promise或observable。

最新更新