用promise构建请求队列



我的目标是针对REST端点运行数据导入。

我不想等到一个请求得到解决后再解雇一个新的。我想"模拟"并行连接。

我不确定我是否有基本的知识问题。

此代码创建子进程:

const numchild = require('os').cpus().length;
const SIZE = 1000;
const SIZE_PER_CHILD = SIZE / numchild;
for (let i = 0; i < numchild; i++) {
const child = child_process.fork('./child.js');
child.send({ start: i * SIZE_PER_CHILD, end: (i + 1) * SIZE_PER_CHILD });
// Some more code...
}

然后,对于每个子进程,我想生成一个用于导入的随机负载,并针对REST端点启动它:

process.on('message', async function({ start, end }) {
const count = start;
while (count < end) {
const generatedData = 'I was generated! Yay!' + count;
await axios.put('/api/v1/import', generatedData);
count++;
}
});

上述方法将等待每个导入请求完成,然后激发下一个请求,直到所有子导入完成。不是我想要的。

现在,我所攻击的端点应该能够处理比我所能生成的请求更多的请求。

我可以这样重写:

process.on('message', async function({ start, end }) {
const count = start;
while (count < end) {
const generatedData = 'I was generated! Yay!' + count;
axios.put('/api/v1/import', generatedData).then(() => console.log('I am done with this one'));
count++;
}
});

当然,这种方法的问题是,所有请求都会在几秒钟内生成,并针对端点触发。我想这更像DOS风格。也不是我想要的。

我希望实现的是:每个孩子过程有15个开放的连接。如果一个请求已完成,请对下一个请求进行排队,直到再次有15个请求挂起为止。

所以我尝试了这个:

process.on('message', async function({ start, end }) {
const count = start;
let queue = [];
while (count < end) {
if (queue.length === 15) {
queue = queue.filter(async (promise) => {
const state = await promiseState(promise);
return state !== 'fulfilled';
});
} else {
const generatedData = 'I was generated! Yay!' + count;
queue.push(axios.put('/api/v1/import', generatedData).then(() => console.log('I am done with this one')));
count++;
}
}
});
function promiseState(p) {
const t = {};
return Promise.race([p, t])
.then(v => (v === t) ? "pending" : "fulfilled", () => "rejected");
}

也不起作用,也没有意义,对吧?filter函数返回promise,因此我试图做的事情不起作用。

我有办法做到这一点吗?

尝试p-queue。下面的并发设置为3,这意味着在这个队列中最多同时执行3个调用:

import PQueue from 'p-queue';
const queue = new PQueue({
concurrency: 3,
});
process.on('message', async function ({ start, end }) {
var calls = [];
var count = start;
while (count < end) {
const generatedData = 'I was generated! Yay!' + count;
calls.push(
queue.add(() => {
return axios
.put('/api/v1/import', generatedData)
.then(() => console.log('I am done with this one'));
})
);
count++;
}
var results = await Promise.all(calls);
});

最新更新