在 Node.js 中执行并行处理的最佳方式



我正在尝试编写一个小节点应用程序,该应用程序将搜索并解析文件系统上的大量文件。为了加快搜索速度,我们尝试使用某种地图缩减。该计划将是以下简化方案:

  • Web 请求随搜索查询一起传入
  • 启动 3 个进程,每个进程分配 1000(不同)文件
  • 一旦一个进程完成,它会将它的结果"返回"回主线程
  • 所有进程完成后,主线程将继续将组合结果作为 JSON 结果返回 JSON 结果

我对此的问题是:这在 Node 中可行吗?推荐的方法是什么?

我一直在摆弄,但没有进一步,然后使用Process以下示例:

引发:

function Worker() { 
    return child_process.fork("myProcess.js"); 
}
for(var i = 0; i < require('os').cpus().length; i++){
        var process = new Worker();
        process.send(workItems.slice(i * itemsPerProcess, (i+1) * itemsPerProcess));
}

我的进程.js

process.on('message', function(msg) {
    var valuesToReturn = [];
    // Do file reading here
    //How would I return valuesToReturn?
    process.exit(0);
}

几个旁注:

  • 我知道进程的数量应该取决于服务器上的CPU数量
  • 我还知道文件系统中的速度限制。在我们将其移动到数据库或Lucene实例之前,请将其视为概念证明:-)

应该是可行的。举个简单的例子:

// parent.js
var child_process = require('child_process');
var numchild  = require('os').cpus().length;
var done      = 0;
for (var i = 0; i < numchild; i++){
  var child = child_process.fork('./child');
  child.send((i + 1) * 1000);
  child.on('message', function(message) {
    console.log('[parent] received message from child:', message);
    done++;
    if (done === numchild) {
      console.log('[parent] received all results');
      ...
    }
  });
}
// child.js
process.on('message', function(message) {
  console.log('[child] received message from server:', message);
  setTimeout(function() {
    process.send({
      child   : process.pid,
      result  : message + 1
    });
    process.disconnect();
  }, (0.5 + Math.random()) * 5000);
});

因此,父进程生成 X 个子进程并传递给它们一条消息。它还安装一个事件处理程序来侦听从子级发回的任何消息(例如,带有结果)。

子进程等待来自父进程的消息,然后开始处理(在这种情况下,它只是启动一个随机超时的计时器来模拟正在完成的一些工作)。完成后,它将结果发送回父进程,并使用process.disconnect()断开自身与父进程的连接(基本上停止子进程)。

父进程跟踪启动的子进程数,以及已发回结果的子进程数。当这些数字相等时,父进程从子进程接收所有结果,以便它可以合并所有结果并返回 JSON 结果。

对于这样的分布式问题,我使用了 zmq,它运行得非常好。我会给你一个类似的问题,我遇到了这个问题,并试图通过进程解决(但失败了),然后转向zmq。

使用 bcrypt 或昂贵的哈希算法是明智的,但它会阻止节点进程大约 0.5 秒。我们不得不将其卸载到不同的服务器,作为快速修复,我基本上完全使用了您所做的。运行一个子进程并向它发送消息并使其响应。我们发现的唯一问题是,无论出于何种原因,我们的子进程在完全不做任何工作时都会固定整个核心。(我仍然没有弄清楚为什么会发生这种情况,我们进行了跟踪,似乎 epoll 在 stdout/stdin 流上失败了。它也只会发生在我们的Linux机器上,并且在OSX上可以正常工作。

编辑:

核心的固定在 https://github.com/joyent/libuv/commit/12210fe 中修复,并且与 https://github.com/joyent/node/issues/5504 有关,所以如果你遇到问题并且你使用的是 centos + 内核 v2.6.32:更新节点,或者更新你的内核!

不管我在使用 child_process.fork() 时遇到了什么问题,这里有一个我经常使用的漂亮模式

客户:

var child_process = require('child_process');
function FileParser() {
    this.__callbackById = [];
    this.__callbackIdIncrement = 0;
    this.__process = child_process.fork('./child');
    this.__process.on('message', this.handleMessage.bind(this));
}
FileParser.prototype.handleMessage = function handleMessage(message) {
    var error = message.error;
    var result = message.result;
    var callbackId = message.callbackId;
    var callback = this.__callbackById[callbackId];
    if (! callback) {
        return;
    }
    callback(error, result);
    delete this.__callbackById[callbackId];
};
FileParser.prototype.parse = function parse(data, callback) {
    this.__callbackIdIncrement = (this.__callbackIdIncrement + 1) % 10000000;
    this.__callbackById[this.__callbackIdIncrement] = callback;
    this.__process.send({
        data: data, // optionally you could pass in the path of the file, and open it in the child process.
        callbackId: this.__callbackIdIncrement
    });
};
module.exports = FileParser;

子进程:

process.on('message', function(message) {
    var callbackId = message.callbackId;
    var data = message.data;
    function respond(error, response) {
        process.send({
            callbackId: callbackId,
            error: error,
            result: response
        });
    }
    // parse data..
    respond(undefined, "computed data");
});

我们还需要一个模式来同步不同的进程,当每个进程完成其任务时,它会响应我们,我们将为每个完成的进程增加一个计数,然后在达到我们想要的计数时调用信号量的回调。

function Semaphore(wait, callback) {
    this.callback = callback;
    this.wait = wait;
    this.counted = 0;
}
Semaphore.prototype.signal = function signal() {
    this.counted++;
    if (this.counted >= this.wait) {
        this.callback();
    }
}
module.exports = Semaphore;

下面是将上述所有模式联系在一起的用例:

var FileParser = require('./FileParser');
var Semaphore  = require('./Semaphore');
var arrFileParsers = [];
for(var i = 0; i < require('os').cpus().length; i++){
    var fileParser = new FileParser();
    arrFileParsers.push(fileParser);
}
function getFiles() {
    return ["file", "file"];
}
var arrResults = [];
function onAllFilesParsed() {
    console.log('all results completed', JSON.stringify(arrResults));
}
var lock = new Semaphore(arrFileParsers.length, onAllFilesParsed);
arrFileParsers.forEach(function(fileParser) {
    var arrFiles  = getFiles(); // you need to decide how to split the files into 1k chunks
    fileParser.parse(arrFiles, function (error, result) {
        arrResults.push(result);
        lock.signal();
    });
});

最终我使用了 http://zguide.zeromq.org/page:all#The-Load-Balancing-Pattern,其中客户端使用 nodejs zmq 客户端,而 worker/broker 是用 C 编写的。这使我们能够在多台机器上扩展它,而不仅仅是具有子进程的本地机器。

最新更新