节点变换流不将输出发送到管道



我正在从事的工作

我正在尝试编写一个转换流,该流将用作读取HTTP API(Salesforce(数据的过程的一部分,将每个资源转换为标准的内部数据结构,然后将转换的数据发送为由Elasticsearch索引。

我的问题

我已经写了一个简单的变换流,但是它似乎是在吞咽数据,而不是通过它。

这是我的类AdapterStream

const { Transform } = require('stream')
class AdapterStream extends Transform {
  constructor (adapter) {
    super({objectMode: true})
    // adapter is a function that contains the transform logic
    this.adapter = adapter
  }
  _transform (chunk, _encoding, callback) {
    this.push(this.adapter(chunk))
    callback()
  }
}
module.exports = AdapterStream

这是我的使用方式

// query is an EventEmitter (https://jsforce.github.io/document/#query)
query.run({autoFetch: true, maxFetch: 10})
     .pipe(transform)
     .pipe(process.stdout)

我期望发生的事情

我希望在我的终端窗口中看到转换的数据。

实际发生了什么

我的终端窗口中没有打印任何东西

我尝试过的东西

写入文件

query.run({autoFetch: true, maxFetch: 10})
     .pipe(transform)
     .pipe(fs.createWriteStream('./test.json'))

这会如预期的那样创建文件./test.json,但它是空的。

订阅事件

query.run({autoFetch: true, maxFetch: 10})
     .pipe(transform)
transform.on('readable', () => { console.log(transform.read()) })

这做了我期望.pipe(process.stdout)做的事情:将转换后的记录打印到控制台。

我的问题是,为什么pipe()不做我期望的事情?我缺少一些简单的东西。

您想将数据作为第二个参数返回到回调。如下。

const { Transform } = require('stream')
class AdapterStream extends Transform {
  constructor (adapter) {
    super({objectMode: true})
    // adapter is a function that contains the transform logic
    this.adapter = adapter
  }
  transform (chunk, _encoding, callback) {
    callback(null, this.adapter(chunk))
  }
}

最新更新