使用异步迭代器对集合的顺序映射进行切片



假设我有events的集合。会有很多,比如10-20k。对于每一个事件,我都需要发出额外的请求来获得eventDetails。基本实现看起来像:

const eventsWithDetails = await Promise.all(
events.map(async(event) => {
const eventDetails = await event.getEventDetails();
return {
...event,
...eventDetails
}
})
);

事实上,这正是我所需要的关于我所交互的API的一个澄清:对于大量连续的请求,API由于端点过载而周期性地为其中一些请求抛出错误。蛮力解决方案之一是按顺序请求每N个项目的束(当然这很难看,也完全不可扩展。但它有效!(:

const slice1 = await Promise.all(
events.slice(0, 500).map(mapEventWithTimestamp)
);
await sleep(4000);
const slice2 = await Promise.all(
events.slice(500, 1000).map(mapEventWithTimestamp)
);
await sleep(4000);
...slices3,
...sliceN
return [...slice1, ...slice2, ...sliceN]

作为寻找健壮解决方案的一部分,我尝试使用异步迭代器来了解实现。具有状态对象,该对象也将具有异步迭代接口。这里的要点是在每1000个请求后延迟N秒。类似的东西:

let mapper = {
start: 0,
stop: events.length,
step: 1000,
result: [],

async * [Symbol.asyncIterator]() {
for (let current = this.start; current <= this.stop; current += this.step) {
await sleep(4000);
let slice = await Promise.all(events.slice(current, current + this.step);
this.result = [...this.result, ...slice];
yield slice;
}
}
// is there way to return this.result to external usage??
};
}

有没有什么方法可以通过异步迭代的优雅且可扩展的解决方案来完成这项任务?要有这样的明确界面:const eventsWithDetails = await mapEventsWithDetails(events);(将进行内部迭代,然后返回映射数据(

您可以使用iter-ops库尝试以下操作,以可迭代的方式顺序处理所有内容:

import {pipeAsync, map, wait} from 'iter-ops';
const i = pipeAsync(
events,
map(event => event.getEventDetails()), // remap into requests
wait() // resolve each promise inside iterable
);
// this is where it will start execution;
for await(const a of i) {
console.log(a); // print whatever data you're getting
}

您还可以根据需要将delaythrottle运算符添加到管道中。

如果您真的想批量处理数据,您可以使用page运算符,将请求拆分为页面:

import {pipeAsync, map, page, wait} from 'iter-ops';
const i = pipeAsync(
events,
page(500), // split into pages of 500 items in each
map(page => Promise.all(page.map(a => a.getEventDetails()))),
wait() // resolve each page
);
// this will trigger processing one page at a time;
for await(const page of i) {
console.log(page); // print a whole page of data
}

最新更新