使用RabbitMQ时,为每个远程方法创建一个队列



让我们暂时接受在消息队列(如RabbitMQ)上实现RPC并不是一个可怕的想法——有时在与遗留系统接口时可能是必要的。

在RabbitMQ上使用RPC的情况下,客户端发送消息给代理,代理将消息路由给worker, worker通过代理将结果返回给客户端。但是,如果一个工作线程实现了多个远程方法,那么不同的调用需要以某种方式路由到不同的侦听器。

在这种情况下一般的做法是什么?所有RPC over MQ示例只显示了一种远程方法。将方法名称设置为路由规则/队列名称将是很好的和容易的,但我不知道这是否是正确的方法。

让我们暂时接受在消息队列(如RabbitMQ)上实现RPC并不是一个可怕的想法

一点都不可怕!它很常见,在很多情况下都被推荐使用——不仅仅是遗留集成。

…好了,现在回到你的实际问题:)


从一个非常高层的角度来看,这是你需要做的。

您的请求和响应需要包含两个关键信息:

  • a correlation-id
  • a reply-to队列

这些信息将允许您将原始请求和响应关联起来。

发送请求前

让请求代码为自己创建一个独占队列。此队列将用于接收应答。

创建一个新的关联id -通常是GUID或UUID以保证唯一性。

发送请求时

将生成的关联id附加到消息属性。有一个correlationId属性,你应该使用它。

将关联id与请求的关联回调函数(应答处理程序)存储在发出请求的代码中的某个位置。当收到回复时,您需要这样做。

还将您创建的独占队列的名称附加到消息的replyTo属性。

完成所有这些后,您可以通过rabbitmq发送消息

回信时

回复码需要同时使用原始消息中的correlationIdreplyTo字段。所以一定要获取这些

应答应该直接发送到replyTo队列。不要通过交换使用标准出版。相反,使用您正在使用的任何库的"发送到队列"特性将回复消息直接发送到队列,并将响应直接发送到replyTo队列。

也要确保在响应中包含correlationId。这是回答你问题的关键部分

处理回复

发出原始请求的代码将从replyTo队列接收消息。然后将correlationId从消息属性中拉出。

使用关联id查找请求的回调方法…处理响应的代码。把消息传递给这个回调方法,你就差不多完成了。

实现细节

从高层次的角度来看,

是有效的。当你深入研究代码时,实现细节将根据你使用的语言和驱动程序/库而有所不同。

对于任何给定语言,大多数优秀的RabbitMQ库都会内置请求/响应。如果你的库没有,你可能需要寻找一个不同的库。除非您是在AMQP协议之上编写基于模式的库,否则您应该寻找具有为您实现的通用模式的库。

如果您需要更多关于请求/应答模式的信息,包括我在这里提供的所有细节(以及更多),请查看这些资源:

  • 我自己的RabbitMQ模式电子邮件课程/电子书
  • <
  • RabbitMQ教程/gh>
  • 企业集成模式——一定要买这本书,以获得完整的描述/实现模式。这本书值得拥有

如果你在Node.js中工作,我建议你使用wascally库,它包含了你需要的请求/回复功能。关于Ruby,请查看bunny。对于Java或. net,请查看周围的一些服务总线实现。在。net中,我推荐使用NServiceBus或MassTransit。

我发现对每个请求使用一个新的reply-to队列会变得非常低效,特别是在集群上运行RabbitMQ时。

正如评论中建议的那样,直接回复似乎是可行的方法。我在这里记录了所有我尝试过的选项,然后才决定使用那个。

我写了一个npm包amq.rabbitmq.reply-to.js:

  • 使用直接回复-一个允许RPC(请求/回复)客户端使用类似于教程6 (https://www.rabbitmq.com/direct-reply-to.html)中演示的设计的特性,以避免为每个请求声明响应队列。

  • 创建一个事件发射器,其中rpc响应将通过correlationId发布https://github.com/squaremo/amqp.node/issues/259#issuecomment-230165144

<标题>用法:
const rabbitmqreplyto = require('amq.rabbitmq.reply-to.js');
const serverCallbackTimesTen = (message, rpcServer) => {
    const n = parseInt(message);
    return Promise.resolve(`${n * 10}`);
};
let rpcServer;
let rpcClient;
Promise.resolve().then(() => {
    const serverOptions = new rabbitmqreplyto.RpcServerOptions(
    /* url */ undefined, 
    /* serverId */ undefined, 
    /* callback */ serverCallbackTimesTen);
    return rabbitmqreplyto.RpcServer.Create(serverOptions);
}).then((rpcServerP) => {
    rpcServer = rpcServerP;
    return rabbitmqreplyto.RpcClient.Create();
}).then((rpcClientP) => {
    rpcClient = rpcClientP;
    const promises = [];
    for (let i = 1; i <= 20; i++) {
        promises.push(rpcClient.sendRPCMessage(`${i}`));
    }
    return Promise.all(promises);
}).then((replies) => {
    console.log(replies);
    return Promise.all([rpcServer.Close(), rpcClient.Close()]);
});
//['10',
//  '20',
//  '30',
//  '40',
//  '50',
//  '60',
//  '70',
//  '80',
//  '90',
//  '100',
//  '110',
//  '120',
//  '130',
//  '140',
//  '150',
//  '160',
//  '170',
//  '180',
//  '190',
//  '200']

相关内容

  • 没有找到相关文章

最新更新