正在从事件处理程序写入WriteStream



我有一个EventEmitter对象,我被设置为侦听事件。当事件发出时,我想将信息写入文件。我有一个通过fs.createWriteStream(path, { flags: 'a'});打开的FileStream。目前,我的问题是,如果我以极快的速度频繁地发出事件,我就会开始"备份"。IE .write返回false,要求我暂时停止写作。由于我在事件处理程序中进行写入,因此附近没有回调函数可用于指示写入过程的结束。我可以从处理端或发射端做些什么来防止备份?

归根结底,这似乎并不重要;所有的数据都会写入到文件中。但我想尽我所能遵守"规则"。

我知道我可以监听drain事件,然后再开始写,但我如何防止其他事件进入处理程序?我注意到,如果我在每次发射前延迟50毫秒,备份似乎不会发生,但这似乎有点像黑客攻击。另外,如果你有一个较慢的硬盘怎么办?

以下是我的情况示例:

var ee = new EventEmitter();
var stream = fs.createWriteStream('./file/log.txt', { flags:'a'} );
ee.on('report', function (i) {
  stream.write('new file data ' + i + ' --- '  + Date.now + 'n');
});
for (var i = 0; i < 10000; ++i) {
  ee.emit('report', i)
}

这不是确切的代码,但这是它的要点。完整的代码发生在从运行的HTTP服务器发送响应时,但如果我通过for循环排队等待1000个请求,我就会陷入上述情况。

实际上,我最终找到了一个使用读写流的更简单的解决方案。有关示例,请参阅下面的代码

var stream = require('stream');
var fs = require('fs');
var EventEmitter = require('events').EventEmitter;
var ee = new EventEmitter();
var writeStream = fs.createWriteStream('./file/log.txt', { flags: 'a', end: false } );
var readStream = new stream.Readable();
// This needs to be here for compatibility reasons, but is intentionally a no-op
readStream._read = function() {};
ee.on('report', function (i) {
  readStream.push(i.toString());
});
readStream.pipe(writeStream);
for (var i = 0; i < 10000; ++i) {
  ee.emit('report', i);
}

这将允许节点管道和流系统与操作系统协调处理背压。这是解决这个问题的首选方法IMO.

处理此问题的理想方法是pause()传入事件,如果事件来自流或以其他方式可以暂停,则可以这样做,但这并不总是可能的。

如果您不能以某种方式暂停传入事件,那么我通常处理此问题的方法是使用async模块的queue函数。当然还有很多其他方法可以做到这一点,但使用队列是我找到的最简单的方法,async模块(非常适合许多异步操作)提供了一个很好的方法。

基本思想是将所有write调用放入一个队列中,该队列被配置为一次只处理一个任务。如果您从stream.write呼叫中得到false,则您将pause()作为queue。从stream获得drain事件后,您将再次resume()队列。这样,您就不会在stream饱和时向其写入,但您仍然可以接收事件,并在stream准备就绪时对其进行排队。

使用您的示例代码这样做看起来像这样:

var async = require('async');
var ee = new EventEmitter();
var stream = fs.createWriteStream('./file/log.txt', { flags:'a'} );
// Create a queue with a concurrency of 1
var writeQueue = async.queue(function(data, callback) {
    if (!stream.write(data)) {
        // if write() returns false, it's saturated; pause the queue
        writeQueue.pause();
    }
    callback();
}, 1); // <-- concurrency argument here; it's easy to miss ;)
stream.on('drain', function() {
    // the stream isn't saturated anymore; resume the queue
    writeQueue.resume();
})
ee.on('report', function (i) {
    // instead of writing directly to the stream, push data to the writeQueue
    writeQueue.push('new file data ' + i + ' --- '  + Date.now() + 'n');
});
for (var i = 0; i < 10000; ++i) {
  ee.emit('report', i)
}

注意:这与让流在内部缓冲没有什么不同。你仍然在缓冲数据,你只是自己做,这会让你对情况有更多的控制权

相关内容

  • 没有找到相关文章

最新更新