如何创建一个 Node.js 代理可读流,该流将包装在创建代理项流时不可用的另一个流?



我可以做一个简单的HTTP请求并得到一个流。

但是,如果我必须发出一个HTTP请求,然后轮询以确定数据是否准备好,然后发出另一个请求来获取数据呢?

我想在一个返回一个流的方法中完成所有这些,所以我可以这样做:

multiStepMethod(options).pipe(wherever);

代替:

multiStepMethod(options, (err, stream) => {
    stream.pipe(wherever);
})

我需要multiStepMethod返回一个代理可读流,它将等待一些事件,然后包装(现在可用)流并开始将它的数据发送到管道

#!/usr/bin/env node
'use strict';
const stream = require('stream');
// This is an example of a 'readable' stream that has to go through a multi-
// step process to finally get the actual readable stream. So we are
// asynchronously wrapping another readable stream.
// The key to simplicity here was to use a transform stream instead of a
// readable stream because it allows us to pipe the stream to ourselves.
class ReadableWrappingTransform extends stream.Transform {
  constructor() {
    super({
      objectMode: true,
      // Our _transform method doesn't really do anything and we don't want to
      // hog up any more additional memory than necessary.
      highWaterMark: 1
    });
    process.nextTick(() => {
      if (new Date().getTime() % 5 === 1) {
        // Here we simulate an error that happened somewhere in the multi-step
        // process to get the final stream. So we just emit 'error' and we're
        // done.
        this.emit('error', new Error('Could not get the stream.'));
        //Assuming based on the node docs that we should not emit
        // 'close' or 'end' on error. If we do emit 'end', it will trigger the
        // writable's 'finish' event, which is probably not desired. You either
        // want an 'error' OR a 'finish'.
        // NODE END EVENT DOCS
        // The 'end' event is emitted when there is no more data to be consumed
        // from the stream.
        // Note: The 'end' event will not be emitted unless the data is
        // completely consumed. This can be accomplished by switching the stream
        // into flowing mode, or by calling stream.read() repeatedly until all
        // data has been consumed.
        // this.emit('end');
        // NODE CLOSE EVENT DOCS
        // The 'close' event is emitted when the stream and any of its
        // underlying resources (a file descriptor, for example) have been
        // closed. The event indicates that no more events will be emitted, and
        // no further computation will occur.
        // Not all Readable streams will emit the 'close' event.
        // this.emit('close');
      } else {
        // We successfully got the stream we wanted after a long, hard, multi-
        // step process, so first we need to copy all our listeners over to it
        // -- NOT.
        // ['close', 'data', 'end', 'error'].forEach((eventName) => {
        //   this.listeners(eventName).forEach((l) => {
        //     readable.on(eventName, l);
        //   });
        // });
        // Turns out that .pipe propagates ALL listeners EXCEPT the 'error'
        // listener. What's up with that !?! If we copy any of the others  we
        // get double the events -- including double the data. So here we just
        // copy over the 'error' listener to make sure we get 'error' events.
        ['error'].forEach((eventName) => {
          this.listeners(eventName).forEach((l) => {
            readable.on(eventName, l);
          });
        });
        // Then just pipe the final readable to ourselves, and we are good.
        readable
          .pipe(this);
      }
    });
  }
  _transform(data, encoding, callback) {
    // Nothing special to do here just pass along the data.
    this.push(data);
    callback();
  }
}
// This is just a very unreliable test readable stream.
const readable = new stream.Readable({
  objectMode: true,
  read() {
    for (let i = 0; i < 10; i++) {
      if (new Date().getTime() % 13 === 1) {
        this.__err = new Error('Sorry, error reading data.');
        this.emit('error', this.__err);
        return;
      }
      this.push({
        Name: `Mikey ${i}`
      });
    }
    this.push(null);
  }
});
// Any old writable that we can pipe to.
const writable = new stream.Writable({
  objectMode: true,
  write(chunk, encoding, callback) {
    console.log(chunk, encoding);
    callback();
  }
});
new ReadableWrappingTransform()
  // if your stream emits close you get close.
  .on('close', () => {
    console.error('CLOSE');
  })
  // if you push null you get end from read.
  .on('end', () => {
    console.error('END');
  })
  // error needs to be both places !?! seriously node?
  .on('error', (error) => {
    console.error('ERROR', error);
  })
  // Finish does no good here. It's a writable event.
  // .on('finish', () => {
  //   console.error('FINISH');
  // })
  .pipe(writable)
  // Close and End do no good here, they are readable events.
  // They are not propagated to the writable.
  //
  // // if your stream emits close you get close.
  // .on('close', () => {
  //   console.error('CLOSE');
  // })
  // // if you push null you get end from read.
  // .on('end', () => {
  //   console.error('END');
  // })
  // error needs to be both places !?! seriously node?
  .on('error', (error) => {
    console.error('ERROR', error);
  })
  // you should always get either finish or error or something was done
  // incorrectly.
  .on('finish', () => {
    console.error('FINISH');
  });

相关内容

  • 没有找到相关文章

最新更新