使用 async/await 等待数据到达数组



我想在现代nodeJS工具中有两个并发循环。

一个循环扫描有趣的东西并将它们推送到数组中。

第二个循环将内容从数组中移出并处理它们。

这个想法是两个循环不会相互阻塞。每当第一个循环执行大量扫描但没有找到太多时,第二个循环将能够赶上处理积压工作。只有当没有积压和新数据传入时,处理循环才必须实际等待。

我以多种方式实现了扫描循环。这是简单的部分。 (在这种情况下,我递归扫描 fs 以查找某种类型的文件。

对于处理循环,我让它通过轮询工作,但觉得我应该能够让它使用纯异步/等待来工作。

但我无法理解它。从概念上讲,扫描循环应该履行一个承诺,提醒第二个循环数组中有某些东西,或者应该是一个生成每个新值的生成器,而不是使用数组。

但是我看不出如何一遍又一遍地做一个承诺,或者直接等待数组活动,或者作为生成器来做,而不会在两个循环之间造成阻塞。

我一定是想多了!我错过了什么?


通过轮询工作的代码,带有注释掉的位,其中异步/等待实现可能属于:

"use strict";
const { basename, join } = require('path')
const { promisify } = require('util')
const fs = require('fs')
const readdir = promisify(fs.readdir)
const lstat = promisify(fs.lstat)
async function* scanpaths(paths) {
for (const path of paths) {
yield* scanonepath(path)
}
}
async function* scanonepath(path) {
try {
const s = await lstat(path)
if (s.isDirectory()) {
for (const entry of await readdir(path)) {
yield* scanonepath(join(path, entry))
}
} else if (/.[mM][pP]3$/.test(path)) {
yield { pathname:path, basename:basename(path), stat:s }
}
} catch (e) {
// special file, deleted file, etc
}
}
async function* checkqueue(buf) {
if (buf.length) {
yield buf.shift()
} else {
// TODO await something to arrive in the buf - HOW?
}
}
async function processmp3(fullname, name, stat) {
try {
console.log(fullname)
// TODO tricky processing goes here
} catch (e) {
console.log(name, e)
}
}
(async () => {
let int = null
let globaldone = false
let globalprocessing = []
let buf = []
async function poll() {
if (buf.length) {
let clone = Array.from(buf)
buf.length = 0  // NOT buf = [] as that doesn't change other refs to buf
for (let e of clone) {
globalprocessing.push(processmp3(e.pathname, e.basename, e.stat))
}
}
if (globaldone) {
await Promise.all(globalprocessing)
console.warn("*** finished processing")
} else {
setTimeout(poll, 125)
}
}
// start polling for scanned files ready to process
console.log("** start polling")
setTimeout(poll, 0)
//console.log("** start scanning")
//await enqueue(buf)
globaldone = true
console.warn("*** finished scanning")
})()
// TODO how?
async function enqueue(buf) {
// start scanning by iterating over our generator which does the recursive directory stuff
for await (const file of scanpaths(process.argv.slice(2))) {
buf.push(file)
// TODO resolve a promise to notify dequeue? or also yield this file
}
}

让 scanner 循环,当它找到某些东西并推送它时,调用一个函数,告诉处理队列某些内容已添加到数组中。如果在处理队列主动执行某些操作时调用该函数,则不执行任何操作 - 否则,如果处理队列处于空闲状态,则会告诉处理队列从扫描数组中移动项目并对其进行处理。

当然,当处理队列处理完一个项目时,让它检查是否有其他项目要处理,如果有,请立即处理下一个项目。这可确保处理队列仅在没有更多要处理的项目时才处于空闲状态,并且在推送新项目后立即开始运行。

例如,对于您的代码,您可以改为执行以下操作:

let processing = false;
async function processNext() {
if (processing) {
return;
}
processing = true;
const e = buf.shift();
const mp3 = await processmp3(e.pathname, e.basename, e.stat);
// do something with parsed mp3?
processing = false;
processNext();
}

async function enqueue(buf) {
for await (const file of scanpaths(process.argv.slice(2))) {
buf.push(file);
processNext();
}
}

最新更新