如何在node.js中发出/pipe数组值作为可读流



从数组和管道值到可写流创建可读流的最佳方法是什么?我已经看到了substack使用setInterval的例子,我可以成功地实现使用0作为间隔值,但是我迭代了很多数据,每次触发gc都会减慢速度。

// Working with the setInterval wrapper
var arr = [1, 5, 3, 6, 8, 9];
function createStream () {
    var t = new stream;
    t.readable = true;
    var times = 0;
    var iv = setInterval(function () {
        t.emit('data', arr[times]);
        if (++times === arr.length) {
            t.emit('end');
            clearInterval(iv);
        }
    }
}, 0);
// Create the writable stream s
// ....
createStream().pipe(s);

我想做的是不使用setInterval发出值。也许可以像这样使用async模块:

async.forEachSeries(arr, function(item, cb) {
    t.emit('data', item);
    cb();
}, function(err) {
 if (err) {
     console.log(err);
 }
 t.emit('end');
});

在本例中,我迭代数组并输出数据,但不输出任何值。我已经看到了shinout的ArrayStream,但我认为它是在v0.10之前创建的,它比我想的要多一点开销。

您可以通过创建一个可读流并将值压入其中来解决这个问题。

流很麻烦,但是直接使用流通常比使用库更容易。

用于流的字符串或缓冲区数组

如果你使用的是字符串或缓冲区数组,这将工作:

'use strict'
const Stream = require('stream')
const readable = new Stream.Readable()
readable.pipe(process.stdout)
const items = ['a', 'b', 'c']
items.forEach(item => readable.push(item))
// no more data
readable.push(null)

指出:

  • readable.pipe(process.stdout)做两件事:将流置于"流动"模式并设置进程。stdout可写流从readable
  • 接收数据
  • Readable#push方法适用于可读流的创建者,而不是流消费者。
  • 你必须做Readable#push(null)来表示没有更多的数据。

流的非字符串数组

要从既不是字符串也不是缓冲区的数组中创建流,需要可读流和可写流都处于"对象模式"。在下面的示例中,我做了以下更改:

  • {objectMode: true}
  • 初始化可读流
  • 不是管道到process.stdout,而是管道到一个处于对象模式的简单可写流。

      'use strict'
      const Stream = require('stream')
      const readable = new Stream.Readable({objectMode: true})
      const writable = new Stream.Writable({objectMode: true})
      writable._write = (object, encoding, done) => {
        console.log(object)
        // ready to process the next chunk
        done()
      }
      readable.pipe(writable)
      const items = [1, 2, 3]
      items.forEach(item => readable.push(item))
      // end the stream
      readable.push(null)
    

性能注意

数据来自哪里?如果它是一个流数据源,最好使用转换流来操作流,而不是从数组转换。

从Node 12.3开始,您可以使用stream.Readable.from(iterable, [options])

const { Readable } = require("stream");
const arr = [1, 5, 3, 6, 8, 9];
const readableStream = Readable.from(arr);
readableStream.on("data", (row) => console.log(row));

也适用于object

const arr = [
  { index: 1, hello: { foo: "bar" } },
  { index: 2, hello: { foo: "bar" } },
  { index: 3, hello: { foo: "bar" } },
];
const readableStream = Readable.from(arr);

br;

这是一个后进先出解决方案。array. prototype.pop()的行为类似于shift,但它应用于数组中的最后一个元素。

const items = [1,2,3]
const stream = new Readable({
  objectMode: true,
  read() {
    const item = items.pop()
    if (!item) {
      this.push(null);
      return;
    }
    this.push(item)
  },
})

我最终使用了ArrayStream。它确实解决了GC被频繁触发的问题。我得到了递归过程的警告。

从节点的nextTick修改了ArrayStream中的nextTick回调到setimmid,并修复了警告,似乎工作得很好。

这是一个老问题,但如果有人偶然发现这个,node-stream-array是一个更简单,更优雅的Node.js实现>= v0.10

var streamify = require('stream-array'),
  os = require('os');
streamify(['1', '2', '3', os.EOL]).pipe(process.stdout);

相关内容

  • 没有找到相关文章

最新更新