我有一个用例,在该用例中,我必须使用排队机制,以确保消息由消费者("工作者")按顺序依次处理。
我过去使用过RabbitMQ,它保证了它接收消息的顺序。但如果这个顺序不正确怎么办?
假设我提交消息[4,5,3,2,1],RabbitMQ消费者将按该顺序处理消息。如果我希望它们按[1、2、3、4、5]的顺序处理,因为消息相互依赖,该怎么办?
此外,我不想让消费者在确认消息2之前消费消息3(没有间隙)。
是否有支持此用例的排队解决方案?目前,我们将消息转储到数据库中,并让工作人员定期按顺序提取数据。
考虑以下两种模式之一:"消息序列";或";重新排序器";。
消息序列的作用如下:
当一大组数据可能需要分解成消息大小的块时,将数据作为消息序列发送,并用序列标识字段标记每条消息。
重序列器有点不同:
使用有状态过滤器,即重新排序器,来收集和重新排序消息,以便可以按指定顺序将它们发布到输出通道。
重排序器可以接收可能未按顺序到达的消息流。重序列器包含在内部缓冲区中,用于存储无序消息,直到获得完整的序列。然后将按顺序排列的消息发布到输出通道。重要的是,输出通道要保持顺序,这样才能保证消息按顺序到达下一个组件。像大多数其他路由器一样,重序列器通常不会修改消息内容。
我强烈建议阅读这本书中的这些模式";企业集成模式;Gregor Hohpe/Bobby Woolf(Martin Fowler等人的贡献)
这里有比我所能描述的更多的细节,但本质上,消息序列取决于是否具有序列标识符、位置和";结束";(布尔字段)。您需要在队列的末尾有一个适配器来处理序列。
另一方面,一个定序器;将无序消息存储在内部缓冲器中,直到获得完整的序列,然后以适当的序列将消息发布到输出通道";(第285页)。
这与您的";转储到数据库,然后有一个工人拉";您当前使用的策略。
这些模式的实现细节将基于您的应用程序语言和队列选择(在您的案例中是RabbitMQ),但这些模式已经非常完善,所以我会仔细研究一下。
我不知道RabbitMQ本身有任何内置机制可以帮助您获得此功能。
希望这能有所帮助。
编辑
我在谷歌上搜索了";RabbitMQ重排序器";并发现了以下内容(但不能保证有效性):rabbus序列(GitHub)。
在任何情况下,看看他们的代码在那里做什么可能会有所帮助,看看你是否可以从中获得一些灵感。