RabbitMQ RPC using Node.js



我使用 RabbitMQ 在 Node.js 中实现 RPC。

我按照教程进行操作,我为每个客户端断言队列名称与"rpc_client"相同,以便成为高权限,以下是在服务器中调用函数client.js

const amqp = require('amqplib');
async function client(){
let args = process.argv.slice(2);
let corr = generateUuid();
let num = parseInt(args[0]);
try {
let conn = await amqp.connect('amqp://127.0.0.1');
let ch = await conn.createChannel();
let q = await ch.assertQueue('rpc_client');
console.log(' [x] Requesting fib(%d)', num);
console.log(q.queue)
await ch.consume(q.queue,msg=>{
if (msg.properties.correlationId == corr) {
console.log(' [.] Got %s', msg.content.toString());
ch.ack(msg)
setTimeout(function() { conn.close(); process.exit(0) }, 5500);
}
},{noAck:false});
ch.sendToQueue('rpc_server',
new Buffer(num.toString()),
{ correlationId: corr, replyTo: q.queue });
} catch(err){
console.error(err);
}
}
function generateUuid() {
return Math.random().toString() +
Math.random().toString() +
Math.random().toString();
}
client();

但是我发现当我一次运行多个客户端时,后一个客户端不会运行消费回调(从服务器获取答案并打印出来),直到前一个客户端的连接关闭。 例如,第二个客户端将获得答案并打印它,其连接将在 5500 毫秒内关闭,第二个客户端必须等待第一个客户端关闭,它将打印答案然后等待另外 5500 毫秒关闭。

那么为什么要这样做呢? 因为队列可以同时消耗两个工人中的两个按摩。

这是server.js

async function server(){
try {
let conn = await amqp.connect('amqp://127.0.0.1');
let ch = await conn.createChannel();
process.once('SIGINT',()=>conn.close());
let q = await ch.assertQueue('rpc_server');
ch.prefetch(1);
console.log(' [x] Awaiting RPC requests');
await ch.consume(q.queue,msg=>{
let n = parseInt(msg.content.toString());
console.log(" [.] fib(%d)", n);
let r = fibonacci(n);
ch.sendToQueue(msg.properties.replyTo,
new Buffer(r.toString()),
{correlationId: msg.properties.correlationId});
ch.ack(msg);
},{noAck:false});

} catch(err) {
console.error(err);
}
}
server();
function fibonacci (n , ac1 = 1 , ac2 = 1) {
if( n <= 1 ) {return ac2};
return fibonacci (n - 1, ac2, ac1 + ac2);
}

如果队列名称相同,则它是相同的队列。不是两个同名队列。在这种情况下,客户端按顺序而不是并行方式获取消息是有意义的。

因此,请尝试使用不同的队列名称,它应该可以工作。

据我所知,您必须将预取更改为大于 1 的数字。

basic.qos(预取)方法,允许您限制未确认的数量 使用时通道(或连接)上的消息。

所以问题出在这个参数上。 您的频道将等待您的第一个请求得到处理和确认。

最新更新