如何确保将消息发送给队列中的特定消费者



我们在微服务架构中工作,并使用RabbitMQ作为消息代理。我们希望避免出现以下情况:

  1. 实体开始创建,但需要一段时间才能完成
  2. 系统决定创建时间过长,并且由于超时而应该删除实体,因此它会发送一条消息来删除当前仍在创建的实体
  3. 删除消息被消耗,系统检查实体是否存在,但由于实体仍在创建过程中,因此找不到它
  4. 删除实体消息使用者由于找不到实体而返回错误

我们如何确保在create消息完成后消耗delete消息,从而不阻止其他消息的消耗?

我们如何确保删除消息在创建消息完成后被消耗,这样我们就不会阻止其他消息的消耗?

假设您的实体创建超时为N。负责创建实体的工作人员应该知道此超时,并且应该能够在达到N时取消实体创建。这不是绝对必要的,但听起来你的实体创建可能是资源密集型的,所以取消应该是你的一个功能。

如果您的工作人员知道在达到超时N时取消实体创建,那么也许您甚至不需要删除消息?

如果你保留删除消息,工人处理可以做以下事情:

  • 首先,确保您的队列配置了死信交换
  • 使用消息,并尝试删除实体
  • 如果删除成功,那太好了,用RabbitMQ确认消息,就完成了
  • 如果删除失败,nack使用RabbitMQ(拒绝(消息,并将requeue设置为false。这将导致消息路由到死信交换
  • 工作人员应该从绑定到此死信交换的队列中消费。您可以有一个专门用于重新尝试实体删除的队列。当工作进程使用此队列中的消息时,它可以重新尝试删除。如果失败,您可以再次拒绝它(当然是在延迟之后(,如果该队列具有相同的死信设置,则会发生相同的过程
  • 最后,确保您的删除工作人员尊重count属性,并且只尝试删除实体一定次数。如果超过限制,这将在您的系统中创建一个异常

注意:RabbitMQ团队监控rabbitmq-users邮件列表,有时只回答StackOverflow上的问题

最新更新