我正在尝试编写一个小节点应用程序,该应用程序将搜索并解析文件系统上的大量文件。为了加快搜索速度,我们尝试使用某种地图缩减。该计划将是以下简化方案:
- Web 请求随搜索查询一起传入
- 启动 3 个进程,每个进程分配 1000(不同)文件
- 一旦一个进程完成,它会将它的结果"返回"回主线程
- 所有进程完成后,主线程将继续将组合结果作为 JSON 结果返回 JSON 结果
我对此的问题是:这在 Node 中可行吗?推荐的方法是什么?
我一直在摆弄,但没有进一步,然后使用Process以下示例:
引发:
function Worker() {
return child_process.fork("myProcess.js");
}
for(var i = 0; i < require('os').cpus().length; i++){
var process = new Worker();
process.send(workItems.slice(i * itemsPerProcess, (i+1) * itemsPerProcess));
}
我的进程.js
process.on('message', function(msg) {
var valuesToReturn = [];
// Do file reading here
//How would I return valuesToReturn?
process.exit(0);
}
几个旁注:
- 我知道进程的数量应该取决于服务器上的CPU数量
- 我还知道文件系统中的速度限制。在我们将其移动到数据库或Lucene实例之前,请将其视为概念证明:-)
应该是可行的。举个简单的例子:
// parent.js
var child_process = require('child_process');
var numchild = require('os').cpus().length;
var done = 0;
for (var i = 0; i < numchild; i++){
var child = child_process.fork('./child');
child.send((i + 1) * 1000);
child.on('message', function(message) {
console.log('[parent] received message from child:', message);
done++;
if (done === numchild) {
console.log('[parent] received all results');
...
}
});
}
// child.js
process.on('message', function(message) {
console.log('[child] received message from server:', message);
setTimeout(function() {
process.send({
child : process.pid,
result : message + 1
});
process.disconnect();
}, (0.5 + Math.random()) * 5000);
});
因此,父进程生成 X 个子进程并传递给它们一条消息。它还安装一个事件处理程序来侦听从子级发回的任何消息(例如,带有结果)。
子进程等待来自父进程的消息,然后开始处理(在这种情况下,它只是启动一个随机超时的计时器来模拟正在完成的一些工作)。完成后,它将结果发送回父进程,并使用process.disconnect()
断开自身与父进程的连接(基本上停止子进程)。
父进程跟踪启动的子进程数,以及已发回结果的子进程数。当这些数字相等时,父进程从子进程接收所有结果,以便它可以合并所有结果并返回 JSON 结果。
对于这样的分布式问题,我使用了 zmq,它运行得非常好。我会给你一个类似的问题,我遇到了这个问题,并试图通过进程解决(但失败了),然后转向zmq。
使用 bcrypt 或昂贵的哈希算法是明智的,但它会阻止节点进程大约 0.5 秒。我们不得不将其卸载到不同的服务器,作为快速修复,我基本上完全使用了您所做的。运行一个子进程并向它发送消息并使其响应。我们发现的唯一问题是,无论出于何种原因,我们的子进程在完全不做任何工作时都会固定整个核心。(我仍然没有弄清楚为什么会发生这种情况,我们进行了跟踪,似乎 epoll 在 stdout/stdin 流上失败了。它也只会发生在我们的Linux机器上,并且在OSX上可以正常工作。
编辑:
核心的固定在 https://github.com/joyent/libuv/commit/12210fe 中修复,并且与 https://github.com/joyent/node/issues/5504 有关,所以如果你遇到问题并且你使用的是 centos + 内核 v2.6.32:更新节点,或者更新你的内核!
不管我在使用 child_process.fork() 时遇到了什么问题,这里有一个我经常使用的漂亮模式
客户:
var child_process = require('child_process');
function FileParser() {
this.__callbackById = [];
this.__callbackIdIncrement = 0;
this.__process = child_process.fork('./child');
this.__process.on('message', this.handleMessage.bind(this));
}
FileParser.prototype.handleMessage = function handleMessage(message) {
var error = message.error;
var result = message.result;
var callbackId = message.callbackId;
var callback = this.__callbackById[callbackId];
if (! callback) {
return;
}
callback(error, result);
delete this.__callbackById[callbackId];
};
FileParser.prototype.parse = function parse(data, callback) {
this.__callbackIdIncrement = (this.__callbackIdIncrement + 1) % 10000000;
this.__callbackById[this.__callbackIdIncrement] = callback;
this.__process.send({
data: data, // optionally you could pass in the path of the file, and open it in the child process.
callbackId: this.__callbackIdIncrement
});
};
module.exports = FileParser;
子进程:
process.on('message', function(message) {
var callbackId = message.callbackId;
var data = message.data;
function respond(error, response) {
process.send({
callbackId: callbackId,
error: error,
result: response
});
}
// parse data..
respond(undefined, "computed data");
});
我们还需要一个模式来同步不同的进程,当每个进程完成其任务时,它会响应我们,我们将为每个完成的进程增加一个计数,然后在达到我们想要的计数时调用信号量的回调。
function Semaphore(wait, callback) {
this.callback = callback;
this.wait = wait;
this.counted = 0;
}
Semaphore.prototype.signal = function signal() {
this.counted++;
if (this.counted >= this.wait) {
this.callback();
}
}
module.exports = Semaphore;
下面是将上述所有模式联系在一起的用例:
var FileParser = require('./FileParser');
var Semaphore = require('./Semaphore');
var arrFileParsers = [];
for(var i = 0; i < require('os').cpus().length; i++){
var fileParser = new FileParser();
arrFileParsers.push(fileParser);
}
function getFiles() {
return ["file", "file"];
}
var arrResults = [];
function onAllFilesParsed() {
console.log('all results completed', JSON.stringify(arrResults));
}
var lock = new Semaphore(arrFileParsers.length, onAllFilesParsed);
arrFileParsers.forEach(function(fileParser) {
var arrFiles = getFiles(); // you need to decide how to split the files into 1k chunks
fileParser.parse(arrFiles, function (error, result) {
arrResults.push(result);
lock.signal();
});
});
最终我使用了 http://zguide.zeromq.org/page:all#The-Load-Balancing-Pattern,其中客户端使用 nodejs zmq 客户端,而 worker/broker 是用 C 编写的。这使我们能够在多台机器上扩展它,而不仅仅是具有子进程的本地机器。