Nodejs 异步承诺队列



我需要使用速率受限的API。例如,我一秒钟只能进行 10 次 API 调用,因此我需要等待当前秒结束才能进行另一次 API 调用。

为了实现这一点,我想创建一个可以自行管理的异步队列。它的主要功能是让我向队列添加新的承诺,当承诺解析时,应用程序会收到通知:

let queue = new Queue()
queue.add(api.get('/somepath')).then(res => { // handle response });

如何使用普通承诺来实现这一点?

export class AsyncQueue {
private queue: Array<Promise<any>>;

add(promise, fct) {
this.queue.push(promise);
}
resolveNext() {
this.queue.pop().then({
// how to resolve the waiting promise in my application
})
}
get length() {
return this.queue.length
}
}

使用当前实现,add队列时将立即调用api.get()。您应该改为add路径(或者可能同时api.getpath(,并在可能的情况下AsyncQueue初始化 Promise。确保add返回一个承诺,该承诺在 API 调用完成后解析。

例如,在原版 JS 中,它可能看起来像这样:

const apiGet = () => new Promise(resolve => setTimeout(resolve, 1000));
class AsyncQueue {
queue = [];
constructor() {
setInterval(this.resolveNext.bind(this), 2000);
}
add(fn, param) {
return new Promise(resolve => {
this.queue.unshift({ fn, param, resolve });
});
}
resolveNext() {
if (!this.queue.length) return;
const { fn, param, resolve } = this.queue.pop();
fn(param).then(resolve);
}
}
const queue = new AsyncQueue()
console.log('start');
// Will resolve after 2000 + 1000 seconds:
queue.add(apiGet, '/somepath').then(res => {
console.log('handling response 1');
});
// Will resolve after 4000 + 1000 seconds:
queue.add(apiGet, '/somepath').then(res => {
console.log('handling response 2');
});

为了避免永久调用 resolveNext((,是否可以像这样实现?

class AsyncQueue {

/* delayBetween: delay (ms) before calling next item
*/
constructor( delayBetween) {
this.queue = [];
this.id = 0;
if (delayBetween < 1) {
delayBetween = 1;
}
this.delayBetween = delayBetween;
this.timer = null;
// setInterval( this.resolveNext.bind(this), this.delayBetween);
}

add(fn, param) {
return new Promise( resolve => {
// liste inversée : le dernier élément ajouté est au début du tableau
this.id ++;
param.queueId = this.id;
// console.log( `${new Date().yyyymmddhhmmsslll()} > push request: ${JSON.stringify(param)}`);
this.queue.unshift( { fn, param, resolve } );
// console.log( `${new Date().yyyymmddhhmmsslll()} > add() > setTimeout...`);
if (this.timer == null) {
this.timer = setTimeout( this.resolveNext.bind(this), this.delayBetween);
}
});
}

resolveNext() {
this.timer = null;
// console.log( `${new Date().yyyymmddhhmmsslll()} > resolveNext() > called, len: ${this.queue.length}...`);
if ( ! this.queue.length) return;       
const { fn, param, resolve } = this.queue.pop();
// console.log( `${new Date().yyyymmddhhmmsslll()} > pop request: ${JSON.stringify(param)}`);
// execute fn, and call resolve only when finished
// fn(param).then(resolve);
fn(param).then((result) => {
// console.log( `${new Date().yyyymmddhhmmsslll()} > fn resolved: ${JSON.stringify(result)}`);
if (this.timer == null) {
this.timer = setTimeout( this.resolveNext.bind(this), this.delayBetween);
}
});
}
}

最新更新