节点.js "request"库是否支持异步可迭代响应流?



我对 Node.js 库有点陌生,我正在尝试弄清楚如何在 HTTP 响应流上使用异步迭代。 我的总体目标是读取一个大型响应流,并在块到达时对其进行处理,目前通过生成器函数。 我无法将整个响应存储在内存中进行处理。

我正在使用request库来执行HTTP请求,如下所示。

const request = require("request");
// contrived chunk-by-chunk stream processing 
async function* getChunks(stream) {
for await (const chunk of stream) {
yield chunk[0];
}
}
async function doWork() {
var response = request.get("https://pastebin.com/raw/x4Nn0Tby");
for await (c of getChunks(response)) {
console.log(c);
}
}

当我运行doWork()时,我收到一个错误,指出getChunks()stream变量不是异步迭代的。

类型错误:流不是异步可迭代的

这很令人惊讶,因为我认为所有可读流通常是异步迭代的,并且在没有提供回调时请求库会返回一个流。 当我将request.get(...)替换为fs.createReadStream(...)到某个本地文件时,一切都按预期工作。

也许request库不支持这一点。 如果是这样,我需要做什么才能通过异步迭代处理 HTTP 响应流?

使用 Node.js 11.13 和request2.88.0。

我对requestrequest-promise-native库进行了更多的实验,并且认为在当前的实现下这是不可能的。 生成的流似乎根本不是异步可迭代的。 此外,在处理流之前,需要await正确的实现才能返回响应(如@JBone的答案所示)。 但是,如果调用await request.get(...),则会检索响应的全部内容,这对于大型响应来说是不可取的。

const r = require("request");
const rpn = require("request-promise-native");
// contrived chunk-by-chunk stream processing 
async function* getChunks(stream) {
for await (const chunk of stream) {
yield chunk[0];
}
}
async function doWork() {
const url = "https://pastebin.com/raw/x4Nn0Tby";
const response = r.get(url);         // returns a non-async-iterable object.
const response2 = await rp.get(url); // returns the contents of url
for await (c of getChunks(response)) {  // yields response not async-iterable error.
console.log(c);
}
}

我对这个问题的解决方案是用axios库替换requestrequest-promise-native的使用。 这些库在功能上相似,但axios允许您指定请求应解析为流;正如预期的那样,流是异步迭代的。

const axios = require("axios");
async function doWork() {
var response = await axios.request({
method: "GET",
url: "https://pastebin.com/raw/x4Nn0Tby",
responseType: "stream",
});
for await (c of getChunks(response.data)) {  // async-iteration over response works as expected.
console.log(c);
}
}

简单的答案:不,它没有。您可能希望在request周围使用基于 promise 的包装器,例如请求-承诺,它也适用于async/await

详细信息:请注意,request已被其创建者弃用,因此将停止使用。这意味着,迟早,您很可能需要切换到另一种解决方案,例如公理,超级代理或针,仅举几例。

当然,由您来评估这些模块并找出最适合您需求的模块,但我个人的建议是从axios开始,因为我过去对它有很好的经验,但是,YMMV。

似乎您将不得不使用其他替代方案,就像您可以在此处找到的request模块文档中提到的一样 https://www.npmjs.com/package/request

request supports both streaming and callback interfaces natively. If you'd like 
request to return a Promise instead, you can use an alternative interface wrapper for 
request. These wrappers can be useful if you prefer to work with Promises, or if 
you'd like to use async/await in ES2017.
Several alternative interfaces are provided by the request team, including:
request-promise (uses Bluebird Promises)
request-promise-native (uses native Promises)
request-promise-any (uses any-promise Promises)`

我基于以下问题的回答:

我认为您可以创建async await自定义方法来执行此操作。

async function doMyWork() {
try {
const response = await myOwnRequest(url); 
} catch (e) {
console.log ('the error', e);
}  
}
function myOwnRequest(url) {
return new Promise(function (resolve, reject) {
const resp = request.get(url);
if(resp) {
resolve();
} else {
reject();
}
});
}

axios 的流选项在 axios 0.19.0 上使用上述答案中的示例代码对我不起作用。 可能是椅子和键盘之间的问题,但无论如何......这是使用request的替代方法。

我最终将请求流调整为异步生成器(当然,中间有一个缓冲区)。 这允许一个"流"类型的接口,其中数据的读取和写入可以交错...它不保证低内存消耗。 请求管道("推送")尽可能快地发送到我们的 Writable,我们没有办法暂停它或将其翻转为"拉"类型的接口(据我所知)。 因此,如果我们从缓冲区读取数据的速度比写入数据的速度慢:缓冲区将变得非常大,内存使用率将很高。

因此,如果降低内存使用量至关重要,并且您解析来自 http 源的大文件......然后可能会在"流式传输"时对缓冲区大小进行一些监视/报告,以查看您使用的代码是比流快还是慢,以便您知道缓冲区是变大还是变小。 当然,如果您使用非常慢的http服务器进行测试...然后所有的赌注都关闭了。

这可以通过设置固定缓冲区大小并创建块_write直到发生更多读取(在缓冲区中腾出空间)来解决......即请求必须等待将更多数据写入管道。 但是请求可能会在内部缓冲...因此,如果数据无论如何都在请求端堆积,这将无助于内存消耗。必须检查。

示例代码:

const request = require('request'),
Writable = require('stream').Writable,
EventEmitter = require('events');
module.exports = function (url, MAX_BYTES=1024) {
var response = new ResponseBuffer(MAX_BYTES);
request
.get(url)
.on('error', function(err) { throw err; })
.pipe(response)
.on('error', function(err) { throw err; });
return response.reader();
};
class ResponseBuffer extends Writable {
constructor (MAX_BYTES=1024) {
super();
this.buffer = '';
this.open = true;
this.done = null;  // callback to call when done reading.
this.MAX_BYTES = MAX_BYTES;
this.events = new EventEmitter();
}
_write(chunk, enc, next) {
this.buffer += chunk;
this.events.emit('data');
next();
}
_final(done) {
this.open = false; // signal to reader to return after buffer empty.
return done();
}
async * reader () {
while (true) {
if (this.buffer.length == 0) {
// buffer empty and Writable !open. return.
if (!this.open) { return; }
else { // buffer empty.  wait for data.
await new Promise(resolve => this.events.once('data', resolve));
}
}
let read_bytes = this.buffer.length < this.MAX_BYTES ? this.buffer.length : this.MAX_BYTES;
yield this.buffer.slice(0, read_bytes);
this.buffer = this.buffer.slice(read_bytes);
}
}
}

然后像这样使用它:


const httpModule = require('./path/to/above/module');
var httpGen = httpModule('https://www.google.com'),
chunk;
for await (chunk of httpGen) {
// do something with chunk.
}

另一种方法(如果您特别担心内存使用情况)是仅下载到磁盘(流式传输到文件编写器),然后从磁盘上增量读取(您可以异步迭代fs.createReadStream(...))

最新更新