RabbitMQ插件删除重复的消息



我有一个用于生成文档的RabbitMQ队列。基本上,每个文档都有typestate(新的、正在处理的、准备好的(,所以我使用主题交换和路由键,如type.state.每次文档更改时,我都会将带有最后一个文档描述的消息发送到交易所,并且效果很好。

但是,有时文档可以处理两次:

  1. 用户发送新文档。因此,新消息report.new被发送到交换。
  2. 当工作人员尚未开始文档处理(队列尚未到达(时,用户更新了文档。将发送同一文档的新邮件report.new
  3. 所以
  4. 现在工人收到第一条消息并开始他的工作,而文档被更改了,所以这项工作是完全没有意义的。

现在,我只是将小代码添加到工作线程中,将消息中的文档键与数据库中的文档键进行比较last_modified如果它们不同,则取消消息。但是,我认为这不是最好的解决方案。

我的想法是向消息标头添加ID,并有一些 RabbitMQ 插件,该插件将从队列中删除具有相同ID的旧消息。

谢谢。

附言也许另一个 MQ 引擎在这里很有用?例如,也许ActiveMQ具有这样的功能?

好的,我已经阅读了有关 RabbitMQ 内部架构的信息,发现这是不可能的。所以有人在寻找它的方式。

  1. 仅发送邮件正文中的文档ID
  2. 为 worker 创建一个键值存储(我为此使用 memcached(。键是ID值是此ID上次工作线程运行的时间戳。
  3. 当工作线程收到消息时,它会检查消息时间戳是否大于键值存储中的时间戳。如果是,则更新存储中的时间戳并运行任务,否则只需跳过它。

你可以检查我写的这个插件,它允许删除在代理中发布的重复消息。

您可以根据需要在交易所或队列中删除重复数据。发布商需要做的就是使用邮件的ID设置x-deduplicate-message邮件标头。

正如你所写的,ActiveMQ具有"重复消息检测",但它的工作方式不同。它不会从队列中删除旧邮件,但不会向其添加新邮件。所以它的工作原理与RabbitMQ的插件相同。

最新更新