我可以做一个简单的HTTP请求并得到一个流。
但是,如果我必须发出一个HTTP请求,然后轮询以确定数据是否准备好,然后发出另一个请求来获取数据呢?
我想在一个返回一个流的方法中完成所有这些,所以我可以这样做:
multiStepMethod(options).pipe(wherever);
代替:
multiStepMethod(options, (err, stream) => {
stream.pipe(wherever);
})
我需要multiStepMethod返回一个代理可读流,它将等待一些事件,然后包装(现在可用)流并开始将它的数据发送到管道
#!/usr/bin/env node
'use strict';
const stream = require('stream');
// This is an example of a 'readable' stream that has to go through a multi-
// step process to finally get the actual readable stream. So we are
// asynchronously wrapping another readable stream.
// The key to simplicity here was to use a transform stream instead of a
// readable stream because it allows us to pipe the stream to ourselves.
class ReadableWrappingTransform extends stream.Transform {
constructor() {
super({
objectMode: true,
// Our _transform method doesn't really do anything and we don't want to
// hog up any more additional memory than necessary.
highWaterMark: 1
});
process.nextTick(() => {
if (new Date().getTime() % 5 === 1) {
// Here we simulate an error that happened somewhere in the multi-step
// process to get the final stream. So we just emit 'error' and we're
// done.
this.emit('error', new Error('Could not get the stream.'));
//Assuming based on the node docs that we should not emit
// 'close' or 'end' on error. If we do emit 'end', it will trigger the
// writable's 'finish' event, which is probably not desired. You either
// want an 'error' OR a 'finish'.
// NODE END EVENT DOCS
// The 'end' event is emitted when there is no more data to be consumed
// from the stream.
// Note: The 'end' event will not be emitted unless the data is
// completely consumed. This can be accomplished by switching the stream
// into flowing mode, or by calling stream.read() repeatedly until all
// data has been consumed.
// this.emit('end');
// NODE CLOSE EVENT DOCS
// The 'close' event is emitted when the stream and any of its
// underlying resources (a file descriptor, for example) have been
// closed. The event indicates that no more events will be emitted, and
// no further computation will occur.
// Not all Readable streams will emit the 'close' event.
// this.emit('close');
} else {
// We successfully got the stream we wanted after a long, hard, multi-
// step process, so first we need to copy all our listeners over to it
// -- NOT.
// ['close', 'data', 'end', 'error'].forEach((eventName) => {
// this.listeners(eventName).forEach((l) => {
// readable.on(eventName, l);
// });
// });
// Turns out that .pipe propagates ALL listeners EXCEPT the 'error'
// listener. What's up with that !?! If we copy any of the others we
// get double the events -- including double the data. So here we just
// copy over the 'error' listener to make sure we get 'error' events.
['error'].forEach((eventName) => {
this.listeners(eventName).forEach((l) => {
readable.on(eventName, l);
});
});
// Then just pipe the final readable to ourselves, and we are good.
readable
.pipe(this);
}
});
}
_transform(data, encoding, callback) {
// Nothing special to do here just pass along the data.
this.push(data);
callback();
}
}
// This is just a very unreliable test readable stream.
const readable = new stream.Readable({
objectMode: true,
read() {
for (let i = 0; i < 10; i++) {
if (new Date().getTime() % 13 === 1) {
this.__err = new Error('Sorry, error reading data.');
this.emit('error', this.__err);
return;
}
this.push({
Name: `Mikey ${i}`
});
}
this.push(null);
}
});
// Any old writable that we can pipe to.
const writable = new stream.Writable({
objectMode: true,
write(chunk, encoding, callback) {
console.log(chunk, encoding);
callback();
}
});
new ReadableWrappingTransform()
// if your stream emits close you get close.
.on('close', () => {
console.error('CLOSE');
})
// if you push null you get end from read.
.on('end', () => {
console.error('END');
})
// error needs to be both places !?! seriously node?
.on('error', (error) => {
console.error('ERROR', error);
})
// Finish does no good here. It's a writable event.
// .on('finish', () => {
// console.error('FINISH');
// })
.pipe(writable)
// Close and End do no good here, they are readable events.
// They are not propagated to the writable.
//
// // if your stream emits close you get close.
// .on('close', () => {
// console.error('CLOSE');
// })
// // if you push null you get end from read.
// .on('end', () => {
// console.error('END');
// })
// error needs to be both places !?! seriously node?
.on('error', (error) => {
console.error('ERROR', error);
})
// you should always get either finish or error or something was done
// incorrectly.
.on('finish', () => {
console.error('FINISH');
});