如何让master在nodejs集群中等待worker完成



我在Nodejs集群上部署了一个应用程序,其中有一个master派生了worker。工作人员执行一些数据库活动(可能需要一段时间(,然后必须将一些结果发送回主机。这是我的骨架。当我运行这个程序时,一些来自工人的消息没有被主机接收到。我怎么能让主人等到它收到所有工人的消息。

if ( isMaster ) {
for(k=0; k<nodes; k++)
{
cluster.fork();
console.log("Started Node-" + k);
}
for (const id in cluster.workers) {
var worker = cluster.workers[id];
worker.on('exit', () => {
console.log('worker', id . ' Exited');      
});
worker.on('message', (msg) => {
console.log ("msg recvd by id:", id, 'msg:', msg);
consumeMsg(msg);
});
} else { // isWorker 
// do some database work, potentially long running (in tens of seconds)
.....
process.send( { results: dbResults, ID: cluster.worker.id} );
}

以下是您想要的一个工作的简单版本

const { Worker, isMainThread, parentPort } = require('worker_threads');
const MESSAGE_COUNT = 10;
function main() {
if (isMainThread) {
// instanciate worker
const worker = new Worker(__filename);
// receive worker responses
worker.on('message', (message) => {
console.log('worker response', message);
});
// send some messages
for (let i = 0; i < MESSAGE_COUNT; i++) {
worker.postMessage('message ' + parseInt(i));
}
// ask the worker to stop
worker.postMessage('exit');
} else {
// receive parent message
parentPort.on('message', (message) => {
// handle exit case
if (message == 'exit') {
parentPort.unref();
}
// do some work
console.log('worker: message from main', message);
// send result back
parentPort.postMessage('response to:' + message);
});
parentPort.on('exit', () => {
// do some closing actions
console.log('worker closing');
});
}
};
main();

输出:

worker: message from main message 0
worker response response to:message 0
worker response response to:message 1
worker response response to:message 2
worker response response to:message 3
worker response response to:message 4
worker response response to:message 5
worker response response to:message 6
worker response response to:message 7
worker response response to:message 8
worker response response to:message 9
worker response response to:exit
worker: message from main message 1
worker: message from main message 2
worker: message from main message 3
worker: message from main message 4
worker: message from main message 5
worker: message from main message 6
worker: message from main message 7
worker: message from main message 8
worker: message from main message 9
worker: message from main exit

对于几个工人,你可以做相同的

const { Worker, isMainThread, parentPort } = require('worker_threads');
const MESSAGE_COUNT = 10;
function createWorker(name) {
// instanciate worker
const worker = new Worker(__filename);
// receive worker responses
worker.on('message', (message) => {
console.log('worker ' + name + ' response', message);
});
// send some messages
for (let i = 0; i < MESSAGE_COUNT; i++) {
worker.postMessage(name + ' message ' + parseInt(i));
}
// ask the worker to stop
worker.postMessage('exit');
}
function main() {
if (isMainThread) {
createWorker('one');
createWorker('two');
} else {
// receive parent message
parentPort.on('message', (message) => {
// handle exit case
if (message == 'exit') {
parentPort.unref();
}
// do some work
console.log('worker: message from main', message);
// send result back
parentPort.postMessage('response to:' + message);
});
parentPort.on('exit', () => {
// do some closing actions
console.log('worker closing');
});
}
};
main();

输出:

worker: message from main one message 0
worker one response response to:one message 0
worker one response response to:one message 1
worker one response response to:one message 2
worker one response response to:one message 3
worker one response response to:one message 4
worker one response response to:one message 5
worker one response response to:one message 6
worker one response response to:one message 7
worker one response response to:one message 8
worker one response response to:one message 9
worker one response response to:exit
worker: message from main two message 0
worker two response response to:two message 0
worker two response response to:two message 1
worker two response response to:two message 2
worker two response response to:two message 3
worker two response response to:two message 4
worker two response response to:two message 5
worker two response response to:two message 6
worker two response response to:two message 7
worker two response response to:two message 8
worker two response response to:two message 9
worker two response response to:exit
worker: message from main one message 1
worker: message from main one message 2
worker: message from main one message 3
worker: message from main one message 4
worker: message from main one message 5
worker: message from main one message 6
worker: message from main one message 7
worker: message from main one message 8
worker: message from main one message 9
worker: message from main exit
worker: message from main two message 1
worker: message from main two message 2
worker: message from main two message 3
worker: message from main two message 4
worker: message from main two message 5
worker: message from main two message 6
worker: message from main two message 7
worker: message from main two message 8
worker: message from main two message 9
worker: message from main exit

最新更新