如何从'on data'事件中'pipe' oracle-db 数据



我已经使用node-oracledb几个月了,到目前为止我已经设法实现了我需要的。

我目前正在开发一个搜索应用程序,该应用程序可能会从单个调用返回大约 2m 行数据。 为了确保我不会与浏览器和服务器断开连接,我想我会尝试 queryStream,以便有源源不断的数据流回到客户端。

我按原样实现了 queryStream 示例,这对于几十万行工作正常。 但是,当返回的行数大于 100 万时,Node 会耗尽内存。通过记录和观察客户端和服务器日志事件,我可以看到客户端在发送和接收的行方面远远落后于服务器。 所以,看起来 Node 正在崩溃,因为它缓冲了太多数据。

值得注意的是,在这一点上,我的选择流实现是在通过 Express 调用的 req/res 函数中。

为了返回数据,我做了类似的事情。

stream.on('data', function (data) {
rowcount++;
let obj = new myObjectConstructor(data);
res.write(JSON.stringify(obj.getJson());
});

我一直在阅读有关流和管道如何帮助流的信息,所以我希望能够做的是能够将查询的结果通过管道传输到 a( 帮助流和 b( 能够在发送回客户端之前将结果管道传输到其他函数。

例如

function getData(req, res){
var stream = myQueryStream(connection, query);
stream
.pipe(toSomeOtherFunction)
.pipe(yetAnotherFunction)
.pipe(res);
}

我花了几个小时试图找到一个允许我通过管道传输结果的解决方案或示例,但我被困住了,需要一些帮助。

抱歉,如果我错过了一些明显的东西,但我仍然掌握了 Node,尤其是流。

提前谢谢。

这里有一点阻抗不匹配。queryStream API 发出 JavaScript 对象行,但要流式传输到客户端的是 JSON 数组。您基本上必须在开头添加一个左括号,在每行后添加一个逗号,并在末尾添加一个右括号。

我将向您展示如何在控制器中执行此操作,该控制器直接使用驱动程序,而不是像我在本系列中提倡的那样使用单独的数据库模块。

const oracledb = require('oracledb');
async function get(req, res, next) {
try {
const conn = await oracledb.getConnection();
const stream = await conn.queryStream('select * from employees', [], {outFormat: oracledb.OBJECT});
res.writeHead(200, {'Content-Type': 'application/json'});
res.write('[');
stream.on('data', (row) => {
res.write(JSON.stringify(row));
res.write(',');
});
stream.on('end', () => {
res.end(']');
});
stream.on('close', async () => {
try {
await conn.close();
} catch (err) {
console.log(err);
}
});
stream.on('error', async (err) => {
next(err);
try {
await conn.close();
} catch (err) {
console.log(err);
}
});
} catch (err) {
next(err);
}
}
module.exports.get = get;

了解概念后,您可以使用可重用的 Transform 类简化一些事情,该类允许您在控制器逻辑中使用管道:

const oracledb = require('oracledb');
const { Transform } = require('stream');
class ToJSONArray extends Transform {
constructor() {
super({objectMode: true});
this.push('[');
}
_transform (row, encoding, callback) {
if (this._prevRow) {
this.push(JSON.stringify(this._prevRow));
this.push(',');
}
this._prevRow = row;
callback(null);
}
_flush (done) {
if (this._prevRow) {
this.push(JSON.stringify(this._prevRow));
}
this.push(']');
delete this._prevRow;
done();
}
}
async function get(req, res, next) {
try {
const toJSONArray = new ToJSONArray();
const conn = await oracledb.getConnection();
const stream = await conn.queryStream('select * from employees', [], {outFormat: oracledb.OBJECT});
res.writeHead(200, {'Content-Type': 'application/json'});
stream.pipe(toJSONArray).pipe(res);
stream.on('close', async () => {
try {
await conn.close();
} catch (err) {
console.log(err);
}
});
stream.on('error', async (err) => {
next(err);
try {
await conn.close();
} catch (err) {
console.log(err);
}
});
} catch (err) {
next(err);
}
}
module.exports.get = get;

与其编写自己的逻辑来创建 JSON 流,不如使用 JSONStream 将对象流转换为(字符串化的(JSON,然后再将其管道传输到目标(resprocess.stdout等( 这节省了处理.on('data',...)事件的需要。

在下面的示例中,我使用了来自节点流模块的管道而不是.pipe方法:效果相似(我认为具有更好的错误处理(。要从oracledb.queryStream获取对象,您可以指定选项{outFormat: oracledb.OUT_FORMAT_OBJECT}(文档(。然后,您可以对生成的对象流进行任意修改。这可以使用转换流来完成,也许使用 through2-map 进行,或者如果您需要删除或拆分行,则使用 through2。在下面,流在字符串化为 JSON 后发送到process.stdout,但您也可以同样向其发送 Express 的res

require('dotenv').config()   // config from .env file
const JSONStream = require('JSONStream')
const oracledb = require('oracledb')
const { pipeline } = require('stream')  
const map = require('through2-map')   // see https://www.npmjs.com/package/through2-map
oracledb.getConnection({
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
connectString: process.env.CONNECT_STRING
}).then(connection => {
pipeline(
connection.queryStream(`
select dual.*,'test' as col1 from dual 
union select dual.*, :someboundvalue as col1 from dual 
`
,{"someboundvalue":"test5"} // binds
,{
prefetchRows: 150, // for tuning
fetchArraySize: 150, // for tuning
outFormat: oracledb.OUT_FORMAT_OBJECT
}
)
,map.obj((row,index) => {
row.arbitraryModification = index 
return row
})
,JSONStream.stringify() // false gives ndjson
,process.stdout     // or send to express's res
,(err) => { if(err) console.error(err) }
)
})
// [
// {"DUMMY":"X","COL1":"test","arbitraryModification":0}
// ,
// {"DUMMY":"X","COL1":"test5","arbitraryModification":1}
// ]

最新更新