我的NodeJs代码中有这种情况,它计算排列(这里的代码(,但无论如何,我都不会从setInterval
获得任何输出。
const { Readable } = require('stream');
const { intervalToDuration, formatDuration, format } = require('date-fns');
const { subsetPerm } = require('./permutation');
function formatLogs(counter, permStart) {
const newLocal = new Date();
const streamTime = formatDuration(intervalToDuration({
end: newLocal.getTime(),
start: permStart.getTime()
}));
const formattedLogs = `wrote ${counter.toLocaleString()} patterns, after ${streamTime}`;
return formattedLogs;
}
const ONE_MINUTES_IN_MS = 1 * 60 * 1000;
let progress = 0;
let timerCallCount = 1;
let start = new Date();
const interval = setInterval(() => {
console.log(formatLogs(progress, start));
}, ONE_MINUTES_IN_MS);
const iterStream = Readable.from(subsetPerm(Object.keys(Array.from({ length: 200 })), 5));
console.log(`Stream started on: ${format(start, 'PPPPpppp')}`)
iterStream.on('data', () => {
progress++;
if (new Date().getTime() - start.getTime() >= (ONE_MINUTES_IN_MS * timerCallCount)) {
console.log(`manual timer: ${formatLogs(progress, start)}`)
timerCallCount++;
if (timerCallCount >= 3) iterStream.destroy();
}
});
iterStream.on('error', err => {
console.log(err);
clearInterval(interval);
});
iterStream.on('close', () => {
console.log(`closed: ${formatLogs(progress, start)}`);
clearInterval(interval);
})
console.log('done!');
但我发现它印的是"完成了!"(预期(,然后脚本似乎结束了,即使我在on('data')
回调中放入console.log,我也会将数据打印到终端。但即使几个小时后,setInterval
中的console.log也不会运行,因为除了on('close',...)
的输出之外,文件中什么都没有。
输出日志如下所示:
> node demo.js
Stream started on: Sunday, January 30th, 2022 at 5:40:50 PM GMT+00:00
done!
manual timer: wrote 24,722,912 patterns, after 1 minute
manual timer: wrote 49,503,623 patterns, after 2 minutes
closed: wrote 49,503,624 patterns, after 2 minutes
节点指南中的计时器有一个名为"留下超时"的部分,看起来很相关。但是,尽管我使用了interval.ref();
,但在第二次读取时,我告诉脚本在对同一超时对象调用.unref()
之前不要垃圾收集对象,这并不完全正确,也没有什么区别。
我使用类似于npm run noodle
的npm来运行它,它只指向文件。
生成器是同步的,并阻止事件循环
Readable.from一次性处理整个生成器,因此如果生成器是同步的并且长时间运行,它会阻塞事件循环。
以下是它运行的注释代码:
async function next() {
for (;;) {
try {
const { value, done } = isAsync ?
await iterator.next() : // our generator is not asynchronous
iterator.next();
if (done) {
readable.push(null); // generator not done
} else {
const res = (value &&
typeof value.then === 'function') ?
await value :
value; // not a thenable
if (res === null) {
reading = false;
throw new ERR_STREAM_NULL_VALUES();
} else if (readable.push(res)) { // readable.push returns false if it's been paused, or some other irrelevant cases.
continue; // we continue to the next item in the iterator
} else {
reading = false;
}
}
} catch (err) {
readable.destroy(err);
}
break;
}
}
下面是readable.push的api,它解释了如何保持生成器运行:
如果可以继续推送额外的数据块,则返回:true;否则为false。
没有任何东西告诉NodeJ不要继续推送数据,所以它继续。
在每次运行事件循环之间,Node.js会检查它是否在等待任何异步I/O或定时器,如果没有,则会干净地关闭。
我将此作为NodeJs Github问题提出,并最终讨论了这个解决方案:
cosnt yieldEvery = 1e5;
function setImmediatePromise() {
return new Promise(resolve => setImmediate(resolve));
}
const iterStream = Readable.from(async function* () {
let i = 0
for await (const item of baseGenerator) {
yield item;
i++;
if (i % yieldEvery === 0) await setImmediatePromise();
}
}());
这在一定程度上受到了snyk.io博客的启发,该博客对这个问题进行了更详细的介绍。