所以我有一个基于单客户端-服务器的生态系统,在那里我使用RabbitMQ作为持久中间件。
现在,单个消息的流程是这样的。
- 步骤1:客户端A向具有目标的服务器发送消息在该消息的元数据中被设置为客户端B
- 步骤2:服务器收到消息后将消息推送到RabbitMQ并向客户端B发送一个通知,表示他有一些消息以获取
- 步骤3:客户端B在收到通知后调用获取消息API从服务器获取消息
- 步骤4:在服务器上,从客户端B调用后拉使用基于拉的方法从RabbitMQ发送消息(
channel.basicGet(queueName, false)
(并移交消息
现在在上面的流程中,有一些事情我有一些疑问。
首先,如果我的客户端收到两个通知,并两次调用pull消息API,那么可能会出现并发问题。
假设我在收到消息时没有发送消息Acknowledgement,但我在之后发送,那么同一条消息是否可能被发送到两个pull API?如果是这样,有什么办法可以防止这种情况发生吗?
从MQ获取消息的示例代码:
long currentMessageCount = channel.messageCount(QUEUE_NAME);
while (currentMessageCount-- > 0) {
GetResponse getResponse = channel.basicGet(QUEUE_NAME, false);
if (getResponse == null) {
break;
}
AMQP.BasicProperties props = getResponse.getProps();
Envelope envelope = getResponse.getEnvelope();
int messageCount = getResponse.getMessageCount();
byte[] body = getResponse.getBody();
/*
Do some logic
*/
channel.basicAck(envelope.getDeliveryTag(), false);
}
TIA-
basicGet
很少是正确的解决方案。在步骤2中,客户端应该使用RabbitMQ。无需通知消息已准备就绪。RabbitMQ将在消息进入队列后立即将其发送到客户端B。步骤3和步骤4变得不必要。
注意:RabbitMQ团队监控rabbitmq-users
邮件列表,有时只回答StackOverflow上的问题