我正在尝试在Nodejs中构建一个API,该API将流式输出针对垂直数据库执行的查询。
Nodejs中的vertica db驱动程序暴露了我正在使用的未缓冲查询接口。欲了解更多详情,请参阅:https://github.com/wvanbergen/node-vertica
下面是我的代码:var vertica = require('vertica');
var Readable = require('stream').Readable;
var rs = new Readable;
var conn = vertica.connect( {
host: 'hostname',
user: 'user',
password: 'password',
database: 'verticadb'
});
var q = conn.query('select * from table');
q.on('row', function(row) {
rs.push(row.join(',') + "n");
});
q.on('end', function(status) {
rs.push(null);
rs.pipe(process.stdout);
conn.disconnect();
});
q.on('error', function(err) {
conn.disconnect();
});
它确实返回适当的输出,但我的理解是,它实际上缓冲了row.join(',') + "n"
的输出,并且只有在读取所有行后才将其管道输出到stdout。我的目标是在每一行被读取后立即输出。我应该如何修改我的代码使其工作?您可以替换vertica "row"
附录
我已经设法使它工作使用所谓的"经典可读流";基于在https://github.com/substack/stream-handbook.
找到的文档。这个的代码:
var vertica = require('vertica');
var Stream = require('stream');
var stream = new Stream;
stream.readable = true;
var conn = vertica.connect( {
host: 'hostname',
user: 'user',
password: 'password',
database: 'verticadb'
});
var q = conn.query('select * from affiliate_manager_2');
q.on('row', function(row) {
stream.emit('data', row.join(',') + "n");
});
q.on('end', function(status) {
stream.emit('end');
conn.disconnect();
});
q.on('error', function(err) {
conn.disconnect();
});
stream.pipe(process.stdout);
然而,它是"旧"我想知道如何使用"新方法"。
Readable
为" abstract "。它正在寻找一个名为_read
的函数,该函数未在默认实现上定义。没有它,它只是缓冲每个push(chunk)
,直到它看到push(null)
。这就是你在例子中看到的行为。
要获得您想要的行为,只需添加一个_read
函数!
这里有一个例子,你可以适应你的数据库:
var Readable = require('stream').Readable;
var stream = new Readable;
stream._read = function () {
var query = …;
query.on('row', function (row) {
stream.push(JSON.stringify(row) + 'n');
});
query.on('end', function () {
stream.push(null);
});
stream._read = function () {};
};
stream.pipe(process.stdout);
进一步阅读:
- http://nodejs.org/api/stream.html stream_class_stream_readable_1
- https://github.com/substack/stream-handbook creating-a-readable-stream