连接两个(或n个)流


  • 2个流:

    给定可读流stream1stream2,有什么惯用(简洁(的方法可以获得包含级联的stream1stream2的流

    我无法执行stream1.pipe(outStream); stream2.pipe(outStream),因为这样流内容就会混乱在一起。

  • n流:

    给定发射不确定数量流的EventEmitter,例如

    eventEmitter.emit('stream', stream1)
    eventEmitter.emit('stream', stream2)
    eventEmitter.emit('stream', stream3)
    ...
    eventEmitter.emit('end')
    

    用什么惯用(简洁(的方法获得一个所有流连接在一起的流

这可以用vanilla Node.js 完成

import { PassThrough } from 'stream'
const merge = (...streams) => {
    let pass = new PassThrough()
    for (let stream of streams) {
        const end = stream == streams.at(-1);
        pass = stream.pipe(pass, { end })
    }
    return pass
}

如果Node.js 版本中没有.at(),请使用streams.slice(-1)[0]

组合流包连接流。自述文件示例:

var CombinedStream = require('combined-stream');
var fs = require('fs');
var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));
combinedStream.pipe(fs.createWriteStream('combined.txt'));

我认为您必须同时追加所有流。如果队列为空,则combinedStream将自动结束。请参阅第5期。

流流库是一种具有显式.end的替代方案,但它不太受欢迎,可能也没有经过很好的测试。它使用节点0.10的streams2 API(参见本讨论(。

现在可以使用异步迭代器轻松完成这项工作

async function* concatStreams(readables) {
  for (const readable of readables) {
    for await (const chunk of readable) { yield chunk }
  }
} 

你可以像这个一样使用它

const fs = require('fs')
const stream = require('stream')
const files = ['file1.txt', 'file2.txt', 'file3.txt'] 
const iterable = await concatStreams(files.map(f => fs.createReadStream(f)))
// convert the async iterable to a readable stream
const mergedStream = stream.Readable.from(iterable)

有关异步迭代器的更多信息:https://2ality.com/2019/11/nodejs-streams-async-iteration.html

如果您不关心流中数据的顺序,那么在nodejs中进行简单的reduce操作应该没问题!

const {PassThrough} = require('stream')
let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) => {
  s.pipe(pt, {end: false})
  s.once('end', () => a.every(s => s.ended) && pt.emit('end'))
  return pt
}, new PassThrough())

干杯;(

在vanilla nodejs中使用ECMA 15+,并结合IvoFeng的好答案。

PassThrough类是一个不以任何方式修改流的平凡的Transform流。

const { PassThrough } = require('stream');
const concatStreams = (streamArray, streamCounter = streamArray.length) => streamArray
  .reduce((mergedStream, stream) => {
    // pipe each stream of the array into the merged stream
    // prevent the automated 'end' event from firing
    mergedStream = stream.pipe(mergedStream, { end: false });
    // rewrite the 'end' event handler
    // Every time one of the stream ends, the counter is decremented.
    // Once the counter reaches 0, the mergedstream can emit its 'end' event.
    stream.once('end', () => --streamCounter === 0 && mergedStream.emit('end'));
    return mergedStream;
  }, new PassThrough());

可以这样使用:

const mergedStreams = concatStreams([stream1, stream2, stream3]);

您可以使其更加简洁,但这里有一个有效的方法:

var util = require('util');
var EventEmitter = require('events').EventEmitter;
function ConcatStream(streamStream) {
  EventEmitter.call(this);
  var isStreaming = false,
    streamsEnded = false,
    that = this;
  var streams = [];
  streamStream.on('stream', function(stream){
    stream.pause();
    streams.push(stream);
    ensureState();
  });
  streamStream.on('end', function() {
    streamsEnded = true;
    ensureState();
  });
  var ensureState = function() {
    if(isStreaming) return;
    if(streams.length == 0) {
      if(streamsEnded)
        that.emit('end');
      return;
    }
    isStreaming = true;
    streams[0].on('data', onData);
    streams[0].on('end', onEnd);
    streams[0].resume();
  };
  var onData = function(data) {
    that.emit('data', data);
  };
  var onEnd = function() {
    isStreaming = false;
    streams[0].removeAllListeners('data');
    streams[0].removeAllListeners('end');
    streams.shift();
    ensureState();
  };
}
util.inherits(ConcatStream, EventEmitter);

我们用streams(流的队列;push在后面,shift在前面(、isStreamingstreamsEnded来跟踪状态。当我们得到一个新的流时,我们推送它,当一个流结束时,我们停止侦听并移动它。当流的流结束后,我们设置streamsEnded

对于每一个事件,我们都会检查我们所处的状态。如果我们已经在流式传输(管道传输流(,我们什么也不做。如果队列为空并且设置了streamsEnded,则会发出end事件。如果队列中有什么东西,我们会继续它并听取它的事件。

*请注意,pauseresume是建议性的,因此某些流的行为可能不正确,需要缓冲。这个练习留给读者。

完成了所有这些之后,我将通过构造EventEmitter,用它创建ConcatStream,然后发出两个stream事件和一个end事件来处理n=2情况。我相信它可以做得更简洁,但我们也可以使用我们现有的。

https://github.com/joepie91/node-combined-stream2是对组合流模块(如上所述(的Streams2兼容替换。它自动包装Streams1流。

组合流的示例代码2:

var CombinedStream = require('combined-stream2');
var fs = require('fs');
var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));
combinedStream.pipe(fs.createWriteStream('combined.txt'));

这里投票最多的两个答案都不适用于异步流,因为无论源流是否准备好生成,它们都只是通过管道传输。我必须将内存中的字符串流与数据库中的数据馈送相结合,并且数据库内容总是在结果流的末尾,因为需要一秒钟的时间才能获得数据库响应。以下是我为自己的目的而写的。

export function joinedStream(...streams: Readable[]): Readable {
  function pipeNext(): void {
    const nextStream = streams.shift();
    if (nextStream) {
      nextStream.pipe(out, { end: false });
      nextStream.on('end', function() {
        pipeNext();
      });
    } else {
      out.end();
    }
  }
  const out = new PassThrough();
  pipeNext();
  return out;
}

streamee.js是一组基于节点1.0+流的流转换器和作曲家,包括一个连接方法:

var stream1ThenStream2 = streamee.concatenate([stream1, stream2]);

以下代码对我有效:(。已从之前给出的所有答案中获取输入

  const pipeStreams = (streams) => {
  const out = new PassThrough()
  // Piping the first stream to the out stream
  // Also prevent the automated 'end' event of out stream from firing
  streams[0].pipe(out, { end: false })
  for (let i = 0; i < streams.length - 2; i++) {
    // On the end of each stream (until the second last) pipe the next stream to the out stream
    // Prevent the automated 'end' event of out stream from firing
    streams[i].on('end', () => {
      streams[i + 1].pipe(out, { end: false })
    })
  }
  // On the end of second last stream pipe the last stream to the out stream.
  // Don't prevent the 'end flag from firing'
  streams[streams.length - 2].on('end', () => {
    streams[streams.length - 1].pipe(out)
  })
  return out
} 

Nisha为这个问题提供了我最喜欢的解决方案。一些解决方案没有删除结束事件,这在进行音频流合并时导致了一些问题。然而,当只有一条流时,他忘记了处理明显的情况。非常感谢您深思熟虑的解决方案Nisha!

const pipeStreams = (streams: Stream[]): Stream => {
    //If there is only one stream, return that stream
    if (streams.length == 1) return streams[0];
    const out = new PassThrough()
    // Piping the first stream to the out stream
    // Also prevent the automated 'end' event of out stream from firing
    streams[0].pipe(out, { end: false })
    for (let i = 0; i < streams.length - 2; i++) {
        // On the end of each stream (until the second last) pipe the next stream to the out stream
        // Prevent the automated 'end' event of out stream from firing
        streams[i].on('end', () => {
            streams[i + 1].pipe(out, { end: false })
        })
    }
    // On the end of second last stream pipe the last stream to the out stream.
    // Don't prevent the 'end flag from firing'
    streams[streams.length - 2].on('end', () => {
        streams[streams.length - 1].pipe(out)
    })
    return out
}

相关内容

  • 没有找到相关文章

最新更新