如何并行处理消息,同时确保每个实体的FIFO



假设您的系统中有一个实体,例如"Person",并且您希望处理修改各种Person实体的事件。重要的是:

  • 同一个人的事件按FIFO顺序处理
  • 多个Person事件流由不同的线程/进程并行处理

我们有一个使用共享数据库和锁来解决这个问题的实现。线程竞争获取Person的锁,然后在获得锁后按顺序处理事件。我们希望转移到消息队列以避免轮询和锁定,我们认为这将减少DB的负载并简化消费者代码的实现。

我对ActiveMQ, RabbitMQ和HornetQ做了一些研究,但我没有看到一个明显的方法来实现这个

ActiveMQ支持消费者订阅通配符,但我没有看到将每个队列的并发性限制为1的方法。如果我能做到这一点,那么解决方案就很简单了:

  • 以某种方式告诉broker允许所有以:/queue/person开头的队列并发为1。发布者使用队列名称中的Person ID将事件写入队列。例如:/队列/person.20
  • 消费者使用通配符订阅队列:/queue/person。>
  • 每个消费者将接收来自不同人员队列的消息。如果所有的人队列都在使用中,一些消费者可能处于空闲状态,这是ok的
  • 在处理消息之后,消费者发送一个ACK,它告诉代理它已经完成了该消息,并允许将Person队列的另一个消息发送给另一个消费者(可能是同一个)

ActiveMQ很接近:您可以进行通配符订阅并启用"独占消费者",但这种组合会导致单个消费者接收发送到所有匹配队列的所有消息,从而将所有person的并发性降低到1。我觉得我错过了一些明显的东西。

问题:

  • 是否有办法实现上述方法与任何主要的消息队列实现?我们对各种选择持相当开放的态度。唯一的要求是它在Linux上运行。
  • 是否有不同的方法来解决我没有考虑的一般问题?

谢谢!

看起来JMSXGroupID就是我要找的。来自ActiveMQ文档:

http://activemq.apache.org/message-groups.html

他们关于股票价格的例子用例正是我所追求的。我唯一关心的是如果单个消费者死了会发生什么。希望代理能够检测到这一点,并选择与该组id关联的另一个消费者。

解决这个问题的一个一般方法(如果我得到你的问题正确)是为Person引入一些独特的属性(例如,Person的数据库级id),并使用该属性的哈希值作为FIFO队列的索引,将该Person放入。
由于该属性的哈希值可能非常大(您负担不起2^32个队列/线程),因此只使用该哈希值的N个最低有效位。每个FIFO队列都应该有专门的工作人员来处理它——瞧,您的要求得到了满足!

这种方法有一个缺点——person必须具有分布良好的id,以使所有队列在或多或少相等的负载下工作。如果您不能保证这一点,请考虑使用队列的循环集,并跟踪正在处理的person,以确保对同一个人进行顺序处理。

如果您已经有了一个允许共享锁的系统,为什么不为每个队列都设置一个锁呢?

最新更新