我可以屈服于一个子进程并在Node.js中返回响应吗



简而言之,我遇到了一个问题,对Node.js服务器的多个并行GET请求导致服务器"阻塞"并挂起,从而导致客户端超时(503,服务不可用)。

经过大量的性能分析,我意识到这是一个CPU问题。特定的请求(我们称之为GET /foo)通过HTTP查询来自多个服务的数据,然后进行大量计算,并将结果返回给客户端,如下所示:

  1. 客户端请求GET /foo
  2. /foo控制器通过HTTP从多个其他服务查询数据`
  3. /foo控制器然后对数据进行一系列迭代,为客户端编译一些输出

步骤3大约需要2秒钟才能完成。但是,如果我向/foo并行发送两个请求,每个客户端将在大约4秒内收到它们的响应。当我在使用更多内核的集群中运行应用程序时,请求运行得更快,但不是我想要的。

似乎我在这里有几个选择:

  1. 预先计算响应(理想情况下,目前希望避免这种情况,因为这将需要一个完整的"缓存无效"方案),或者
  2. /foo将CPU阻塞计算异步发送到另一个进程(使用Heroku,所以这将是另一个dyno),然后我可以使用websocket或其他东西将结果推送到客户端(同样,对于我的情况来说非常复杂),或者
  3. 以某种方式屈服于请求中的子进程,并将结果返回给客户端

很想做一些类似选项3的事情。类似以下内容:

get('/foo', function*(request) {
// I/O, so not blocking the event loop (I think)
let data = yield getData(request)
// make this happen in a different process
let response = yield doSomeHeavyProcessing(data)
return response
})

上面我省略了很多实现细节,但如果有必要的话,我使用的是Koa和Node.js 6。

理想情况下,doSomeHeavyProcessing将在一些单独的过程中进行CPU密集型计算,并且在完成后,仍然以"同步"的方式将结果发送回请求客户端。

我一直在试着把我的大脑包裹在儿童进程、网络工作者、纤维等上,并用这些做了一些基本的"你好世界",让他们基本上完成上面的事情,但没有用。如有必要,可以发布更多详细信息。

以下是您可以尝试的一些方法:

1.将块计算拆分为小块,并使用setImmediate将下一块工作放在事件队列的末尾。因此,计算不再被阻塞,其他请求也可以被处理。

2.微软最近发布了napajs。如自述中所述

随着它的发展,我们发现在CPU绑定任务中补充Node.js非常有用,它能够在多个V8隔离中执行JavaScript并在它们之间进行通信。

我还没有尝试过,但它看起来很有前景:

var napa = require('napajs');
var zone1 = napa.zone.create('zone1', { workers: 4 });
get('/foo', function*(request) {
let data = yield getData(request)
let response = yield zone1.execute(doSomeHeavyProcessing, [data])
return response
})

3.如果以上内容都不够,并且您需要在多台机器上分散负载,那么您可能无法避免使用某种消息队列将工作分配到不同的服务器。在这种情况下,请检查ZeroMQ。它非常容易在节点上使用,并且您可以用它实现任何类型的分布式消息传递模式

为了方便起见,您可以使用带有附加包装器的子进程。

worker.js-该模块将在一个单独的过程中运行,并将完成繁重的工作

const crypto = require('crypto');
function doHeavyWork(data) {
return crypto.pbkdf2Sync(data, 'salt', 100000, 64, 'sha512');
}
process.on('message', (message) => {
const result = doHeavyWork(message.data);
process.send({ id: message.id, result });
});

client.js-子进程的方便(但原始)包装器

const cp = require('child_process');
let worker;
const resolves = new Map();
module.exports = {
init(moduleName, errorCallback) {
worker = cp.fork(moduleName);
worker.on('error', errorCallback);
worker.on('message', (message) => {
const resolve = resolves.get(message.id);
resolves.delete(message.id);
if (!resolve) {
errorCallback(new Error(`Got response from worker with unknown id: ${message.id}`));
return;
}
resolve(message.result);
});
console.log(`Service PID: ${process.pid}, Worker PID: ${worker.pid}`);
},
doHeavyWorkRemotly(data) {
const id = `${Date.now()}${Math.random()}`;
return new Promise((resolve) => {
worker.send({ id, data });
resolves.set(id, resolve);
});
}
}

我使用fork()来利用文档中所述的额外通信信道。

此外,我还记录所有提交给工作进程的请求(const resolves = new Map();),并仅在工作进程返回特定请求的响应(const resolve = resolves.get(message.id);)时解析Promises(resolve(message.result);)。

run.js-一个启动模块,它利用co来"执行"生成器。

const co = require('co');
const client = require('./client');
function errorCallback(error) {
console.log('Got an unexpected error!');
console.log(error);
}
client.init('./worker.js', errorCallback);
function* run() {
while(true) {
yield client.doHeavyWorkRemotly('mydata');
}
}
co(run);

要测试它,只需运行node run.js,它就会打印

服务PID:XXXX,工作进程PID:XXXX

然后查看CPU利用率,工作进程可能占用大约100%的CPU,而服务将相当空闲。

最新更新