我正在从事的工作
我正在尝试编写一个转换流,该流将用作读取HTTP API(Salesforce(数据的过程的一部分,将每个资源转换为标准的内部数据结构,然后将转换的数据发送为由Elasticsearch索引。
我的问题
我已经写了一个简单的变换流,但是它似乎是在吞咽数据,而不是通过它。
这是我的类AdapterStream
const { Transform } = require('stream')
class AdapterStream extends Transform {
constructor (adapter) {
super({objectMode: true})
// adapter is a function that contains the transform logic
this.adapter = adapter
}
_transform (chunk, _encoding, callback) {
this.push(this.adapter(chunk))
callback()
}
}
module.exports = AdapterStream
这是我的使用方式
// query is an EventEmitter (https://jsforce.github.io/document/#query)
query.run({autoFetch: true, maxFetch: 10})
.pipe(transform)
.pipe(process.stdout)
我期望发生的事情
我希望在我的终端窗口中看到转换的数据。
实际发生了什么
我的终端窗口中没有打印任何东西
我尝试过的东西
写入文件
query.run({autoFetch: true, maxFetch: 10})
.pipe(transform)
.pipe(fs.createWriteStream('./test.json'))
这会如预期的那样创建文件./test.json
,但它是空的。
订阅事件
query.run({autoFetch: true, maxFetch: 10})
.pipe(transform)
transform.on('readable', () => { console.log(transform.read()) })
这做了我期望.pipe(process.stdout)
做的事情:将转换后的记录打印到控制台。
我的问题是,为什么pipe()
不做我期望的事情?我缺少一些简单的东西。
您想将数据作为第二个参数返回到回调。如下。
const { Transform } = require('stream')
class AdapterStream extends Transform {
constructor (adapter) {
super({objectMode: true})
// adapter is a function that contains the transform logic
this.adapter = adapter
}
transform (chunk, _encoding, callback) {
callback(null, this.adapter(chunk))
}
}