我正在尝试下载一个内部处理系统通过nodejs中的HTTPget方法生成的文件列表。对于单个文件或几个文件来说,它工作得很好,stackoverflow上已经有了答案。然而,当你试图下载一个包含asyn请求的庞大文件列表时,问题就会出现,系统只会超时并抛出错误。
因此,这更多的是一个可扩展性问题。最好的方法是一个接一个地下载文件,或者一次下载几个文件,然后转移到下一批,但我不知道如何做到这一点。这是我到目前为止的代码,它适用于一些文件,但在这种情况下,我有大约850个文件(每个文件有几个MB(,它不起作用-
const https = require("http");
var fs = require('fs');
//list of files
var file_list = [];
file_list.push('http://www.sample.com/file1');
file_list.push('http://www.sample.com/file2');
file_list.push('http://www.sample.com/file3');
.
.
.
file_list.push('http://www.sample.com/file850');
file_list.forEach(single_file => {
const file = fs.createWriteStream('files/'+single_file ); //saving under files folder
https.get(single_file, response => {
var stream = response.pipe(single_file);
stream.on("finish", function() {
console.log("done");
});
});
});
它对一些文件运行良好,并在files
文件夹中创建了许多空文件,然后抛出此错误
events.js:288
throw er; // Unhandled 'error' event
^
Error: connect ETIMEDOUT 192.168.76.86:80
at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1137:16)
Emitted 'error' event on ClientRequest instance at:
at Socket.socketErrorListener (_http_client.js:426:9)
at Socket.emit (events.js:311:20)
at emitErrorNT (internal/streams/destroy.js:92:8)
at emitErrorAndCloseNT (internal/streams/destroy.js:60:3)
at processTicksAndRejections (internal/process/task_queues.js:84:21) {
errno: 'ETIMEDOUT',
code: 'ETIMEDOUT',
syscall: 'connect',
address: '192.168.76.86',
port: 80
}
看起来它给网络带来了巨大的负载,可能一个接一个地下载这些也可以。如果可能的话,请建议最佳的可扩展解决方案。谢谢
问题是同时加载它们,本质上是DDoSing服务器。您需要限制线程并使用堆栈来处理
下面是一个简化的例子,说明它可能是什么样子(未经测试(。
const MAX_THREADS = 3;
const https = require("http");
const fs = require("fs");
const threads = [];
//list of files
const file_list = [];
file_list.push("http://www.sample.com/file1");
file_list.push("http://www.sample.com/file2");
file_list.push("http://www.sample.com/file3");
// ...
file_list.push("http://www.sample.com/file850");
const getFile = (single_file, callback) => {
const file = fs.createWriteStream("files/" + single_file); //saving under files folder
https.get(single_file, (response) => {
var stream = response.pipe(single_file);
stream.on("finish", function () {
console.log("done");
callback(single_file);
});
});
};
const process = () => {
if (!file_list.length) return;
let file = file_list.unshift();
getFile(file, process); // the loop
};
while (threads.length < MAX_THREADS) {
const thread = "w" + threads.length;
threads.push(thread);
process();
}
您甚至不需要使用worker数组,只需要循环来启动它们就足够了,但您可以将一个对象添加到线程池中,并使用它来保存统计信息和处理重试或节流等高级功能。
您一次向目标服务器发送了无数个请求。这将大量加载目标服务器,并在您尝试处理所有响应时消耗大量本地资源。
最简单的方案是发送一个请求,当你得到响应时,发送下一个,依此类推。这将在同一时间只有一个请求。
您通常可以通过同时管理飞行中的少量请求(可能是3-5个(来提高吞吐量。
而且,如果目标服务器实现了速率限制,那么您可能必须放慢向其发送请求的速度(每60秒不超过N(。
有很多方法可以做到这一点。以下是一些函数的指针,这些函数实现了各种方法。
这里是mapConcurrent()
,这里是pMap()
:它们允许您迭代一个数组,向主机发送请求,但可以进行管理,使您在决定N的值的同时只有N个请求
这里的rateLimitMap()
:让我们来管理每秒发送的请求数。
我个人会做这样的事情:
// currentIndex is the index of the next file to fetch
const currentIndex = 0;
// numWorkers is the maximum number of simultaneous downloads
const numWorkers = 10;
// promises holds each of our workers promises
const promises = [];
// getNextFile will download the next file, and after finishing, will
// then download the next file in the list, until all files have been
// downloaded
const getNextFile = (resolve) => {
if (currentIndex >= file_list.length) resolve();
const currentFile = file_list[currentIndex];
// increment index so any other worker will not get the same file.
currentIndex++;
const file = fs.createWriteStream('files/' + currentFile );
https.get(single_file, response => {
var stream = response.pipe(single_file);
stream.on("finish", function() {
if (currentIndex === file_list.length) {
resolve();
} else {
getNextFile(resolve);
}
});
});
}
for (let i = 0; i < numWorkers; i++) {
promises.push(new Promise((resolve, reject) => {
getNextFile(resolve);
}));
}
Promise.all(promises).then(() => console.log('All files complete'));