让我们暂时接受在消息队列(如RabbitMQ)上实现RPC并不是一个可怕的想法——有时在与遗留系统接口时可能是必要的。
在RabbitMQ上使用RPC的情况下,客户端发送消息给代理,代理将消息路由给worker, worker通过代理将结果返回给客户端。但是,如果一个工作线程实现了多个远程方法,那么不同的调用需要以某种方式路由到不同的侦听器。
在这种情况下一般的做法是什么?所有RPC over MQ示例只显示了一种远程方法。将方法名称设置为路由规则/队列名称将是很好的和容易的,但我不知道这是否是正确的方法。
让我们暂时接受在消息队列(如RabbitMQ)上实现RPC并不是一个可怕的想法
一点都不可怕!它很常见,在很多情况下都被推荐使用——不仅仅是遗留集成。
…好了,现在回到你的实际问题:)
从一个非常高层的角度来看,这是你需要做的。
您的请求和响应需要包含两个关键信息:
- a
correlation-id
a
reply-to
队列这些信息将允许您将原始请求和响应关联起来。
发送请求前
让请求代码为自己创建一个独占队列。此队列将用于接收应答。
创建一个新的关联id -通常是GUID或UUID以保证唯一性。
发送请求时
将生成的关联id附加到消息属性。有一个correlationId
属性,你应该使用它。
将关联id与请求的关联回调函数(应答处理程序)存储在发出请求的代码中的某个位置。当收到回复时,您需要这样做。
还将您创建的独占队列的名称附加到消息的replyTo
属性。
完成所有这些后,您可以通过rabbitmq发送消息
回信时回复码需要同时使用原始消息中的correlationId
和replyTo
字段。所以一定要获取这些
应答应该直接发送到replyTo
队列。不要通过交换使用标准出版。相反,使用您正在使用的任何库的"发送到队列"特性将回复消息直接发送到队列,并将响应直接发送到replyTo
队列。
也要确保在响应中包含correlationId
。这是回答你问题的关键部分
处理回复
发出原始请求的代码将从replyTo
队列接收消息。然后将correlationId
从消息属性中拉出。
使用关联id查找请求的回调方法…处理响应的代码。把消息传递给这个回调方法,你就差不多完成了。
实现细节
从高层次的角度来看,是有效的。当你深入研究代码时,实现细节将根据你使用的语言和驱动程序/库而有所不同。
对于任何给定语言,大多数优秀的RabbitMQ库都会内置请求/响应。如果你的库没有,你可能需要寻找一个不同的库。除非您是在AMQP协议之上编写基于模式的库,否则您应该寻找具有为您实现的通用模式的库。如果您需要更多关于请求/应答模式的信息,包括我在这里提供的所有细节(以及更多),请查看这些资源:
- 我自己的RabbitMQ模式电子邮件课程/电子书 <
- RabbitMQ教程/gh>
- 企业集成模式——一定要买这本书,以获得完整的描述/实现模式。这本书值得拥有
如果你在Node.js中工作,我建议你使用wascally库,它包含了你需要的请求/回复功能。关于Ruby,请查看bunny。对于Java或. net,请查看周围的一些服务总线实现。在。net中,我推荐使用NServiceBus或MassTransit。
我发现对每个请求使用一个新的reply-to队列会变得非常低效,特别是在集群上运行RabbitMQ时。
正如评论中建议的那样,直接回复似乎是可行的方法。我在这里记录了所有我尝试过的选项,然后才决定使用那个。
我写了一个npm包amq.rabbitmq.reply-to.js:
使用直接回复-一个允许RPC(请求/回复)客户端使用类似于教程6 (https://www.rabbitmq.com/direct-reply-to.html)中演示的设计的特性,以避免为每个请求声明响应队列。
创建一个事件发射器,其中rpc响应将通过correlationId发布https://github.com/squaremo/amqp.node/issues/259#issuecomment-230165144
const rabbitmqreplyto = require('amq.rabbitmq.reply-to.js');
const serverCallbackTimesTen = (message, rpcServer) => {
const n = parseInt(message);
return Promise.resolve(`${n * 10}`);
};
let rpcServer;
let rpcClient;
Promise.resolve().then(() => {
const serverOptions = new rabbitmqreplyto.RpcServerOptions(
/* url */ undefined,
/* serverId */ undefined,
/* callback */ serverCallbackTimesTen);
return rabbitmqreplyto.RpcServer.Create(serverOptions);
}).then((rpcServerP) => {
rpcServer = rpcServerP;
return rabbitmqreplyto.RpcClient.Create();
}).then((rpcClientP) => {
rpcClient = rpcClientP;
const promises = [];
for (let i = 1; i <= 20; i++) {
promises.push(rpcClient.sendRPCMessage(`${i}`));
}
return Promise.all(promises);
}).then((replies) => {
console.log(replies);
return Promise.all([rpcServer.Close(), rpcClient.Close()]);
});
//['10',
// '20',
// '30',
// '40',
// '50',
// '60',
// '70',
// '80',
// '90',
// '100',
// '110',
// '120',
// '130',
// '140',
// '150',
// '160',
// '170',
// '180',
// '190',
// '200']