当将一个子进程的标准输出用作另一个子进程的标准输出时,数据有时不会传递给第二个子进程



当使用一个子进程的标准输出作为另一个子进程的标准输出时,似乎有时数据不会传递给下一个子进程:

var spawn = require('child_process').spawn;
var pipeId = 0;
var launchProcess = function(cmd, args, stdin){
  return spawn(cmd, args, {
    stdio: [stdin ? stdin : 'ignore', 'pipe', 'pipe']
  });
};
var launch = function(){
  var task0 = launchProcess('echo', ['hownarenyoundearnstranger']);
  var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout);
  pipeId++;
  task1.on('exit', launch);
};
launch();

部分文件为空:

ls -lhS /tmp/body-pipeline-*

我还尝试通过访问task0.stdout._handle.fd将文件描述符作为正整数传递,但问题仍然存在。

据我所知,这就是shell管道的工作方式:一个进程的标准输出的相同文件描述符被用作另一个进程的标准输出。我试图避免通过NodeJS进程传递所有数据,因为当子进程输出大量数据时,它会导致高CPU负载。

更新:当管道用于stdin和stdout时,一切都如预期的那样工作(在这里使用cat来测试更长的文本):

var spawn = require('child_process').spawn;
var pipeId = 0;
var launchProcess = function(cmd, args, stdin){
  return spawn(cmd, args, {
    stdio: [stdin ? stdin : 'pipe', 'pipe', 'pipe']
  });
};
var launch = function(){
  var task0 = launchProcess('cat');
  var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId]);
  task0.stdout.pipe(task1.stdin)
  task0.stdin.write(JSON.stringify(process.env).split(',').join('n'))
  task0.stdin.end();
  pipeId++;
  task1.on('exit', launch);
};
launch();

Update2:当使用task0.stdout.pipe(task1.stdin)时,脚本使用50%的CPU(相比之下,当将task0的stdout传递为task1的stdin时,该脚本使用0%的CPU):

var spawn = require('child_process').spawn;
var pipeId = 0;
var launchProcess = function(cmd, args, stdin, stdout, stderr){
  return spawn(cmd, args, {
    stdio: [stdin, stdout, stderr]
  });
};
var launch = function(){
  var task0 = launchProcess('yes', ['lala'], 'ignore', 'pipe', 'ignore');
  var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], 'pipe', 'ignore', 'ignore');
  // var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout, 'ignore', 'ignore');

  task0.stdout.pipe(task1.stdin);
  pipeId++;
  task1.on('exit', launch);
};
launch();

Update3:这更好地说明了我的问题。我试图在原始代码中简化它,但我认为它太简化了。Larry Turtis为简化的情况提供了一个解决方案,但这并不适用于我的:

var spawn = require('child_process').spawn;
var pipeId = 0;
var pipeSlots = 6;
var launchProcess = function(cmd, args, stdin, stdout){
  return spawn(cmd, args, {
    stdio: [stdin, stdout, 'ignore']
  });
};
var launch = function(){
  var task0 = launchProcess('echo', ['hownarenyoundearnstranger'], 'ignore', 'pipe');
  var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout, 'ignore');
  task0.on('error', function(err){
    console.log('Error while processing task0:' + err.stack);
  });
  task1.on('error', function(err){
    console.log('Error while processing task1:' + err.stack);
  });
  pipeId++;
};
// Simulating message queue
setInterval(function(){
  // Simulating how many messages we get from the messaging queue
  var mqMessageCount = Math.floor(Math.random() * (pipeSlots + 1));
  for(var i = 0; i < mqMessageCount; i++){
    launch();
  }
}, 250); // For this test we assume that pipes finish under 250ms

如果您不等待第二个进程退出,那么您的原始代码可以正常工作。

var launch = function(){
  var task0 = launchProcess('echo', ['hownarenyoundearnstranger']);
  var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout);
  pipeId++;
  launch();
};

可能发生的情况是task1完成了,但task0没有。我并不完全清楚为什么这很重要,但它显然很重要。可能与在Node文档中我们注意到的事实有关:

..when the 'exit' event is triggered, child process stdio streams might still be open.

确保两个任务都完成可以解决问题。

var spawn = require('child_process').spawn;
var q = require("q");
var pipeId = 0;
var launchProcess = function(cmd, args, stdin) {
    return spawn(cmd, args, {
        stdio: [stdin ? stdin : 'ignore', 'pipe', 'pipe']
    });
};
var launch = function() {
    var task0 = launchProcess('echo', ['hownarenyoundearnstranger']);
    var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], task0.stdout);
    var p0 = q.defer();
    var p1 = q.defer();
    task0.on('exit', p0.resolve);
    task1.on('exit',p1.resolve);
    q.all(p0, p1).then(launch)
    pipeId++;
};
launch();

这是一个已知的NodeJS问题:https://github.com/nodejs/node/issues/9413

TLDR;我的一个同事有一个好主意,解决了这个问题:

var task1 = launchProcess('tee', ['/tmp/body-pipeline-' + pipeId], 'pipe', 'ignore');
var task0 = launchProcess('echo', ['hownarenyoundearnstranger'], 'ignore', task1.stdin);

这个想法是在启动发送任务之前启动接收任务!

相关内容

  • 没有找到相关文章

最新更新