在NodeJS中使用流

  • 本文关键字:NodeJS node.js stream
  • 更新时间 :
  • 英文 :


我正在尝试在Nodejs中构建一个API,该API将流式输出针对垂直数据库执行的查询。

Nodejs中的vertica db驱动程序暴露了我正在使用的未缓冲查询接口。欲了解更多详情,请参阅:https://github.com/wvanbergen/node-vertica

下面是我的代码:
var vertica = require('vertica');
var Readable = require('stream').Readable;
var rs = new Readable;
var conn = vertica.connect( {
    host: 'hostname',
    user: 'user',
    password: 'password',
    database: 'verticadb'    
});

var q = conn.query('select * from table');
q.on('row', function(row) {
    rs.push(row.join(',') + "n");
});
q.on('end', function(status) {
    rs.push(null);
    rs.pipe(process.stdout);
    conn.disconnect();
});
q.on('error', function(err) {
    conn.disconnect();
});

它确实返回适当的输出,但我的理解是,它实际上缓冲了row.join(',') + "n"的输出,并且只有在读取所有行后才将其管道输出到stdout。我的目标是在每一行被读取后立即输出。我应该如何修改我的代码使其工作?您可以替换vertica "row"

附录

我已经设法使它工作使用所谓的"经典可读流";基于在https://github.com/substack/stream-handbook.

找到的文档。

这个的代码:

var vertica = require('vertica');
var Stream = require('stream');
var stream = new Stream;
stream.readable = true;
var conn = vertica.connect( {
    host: 'hostname',
    user: 'user',
    password: 'password',
    database: 'verticadb'
});

var q = conn.query('select * from affiliate_manager_2');
q.on('row', function(row) {
    stream.emit('data', row.join(',') + "n");
});
q.on('end', function(status) {
    stream.emit('end'); 
    conn.disconnect();
});
q.on('error', function(err) {
    conn.disconnect();
});
stream.pipe(process.stdout);

然而,它是"旧"我想知道如何使用"新方法"。

Readable为" abstract "。它正在寻找一个名为_read的函数,该函数未在默认实现上定义。没有它,它只是缓冲每个push(chunk),直到它看到push(null)。这就是你在例子中看到的行为。

要获得您想要的行为,只需添加一个_read函数!

这里有一个例子,你可以适应你的数据库:

var Readable = require('stream').Readable;
var stream = new Readable;
stream._read = function () {
  var query = …;
  query.on('row', function (row) {
    stream.push(JSON.stringify(row) + 'n');
  });
  query.on('end', function () {
    stream.push(null);
  });
  stream._read = function () {};
};
stream.pipe(process.stdout);

进一步阅读:

  • http://nodejs.org/api/stream.html stream_class_stream_readable_1
  • https://github.com/substack/stream-handbook creating-a-readable-stream

相关内容

  • 没有找到相关文章

最新更新