如何在保证FIFO的情况下控制最大并行度



我正在研究使用rabbitmq来管理我的应用程序的事件。更具体地说,我想:

  1. 确保每个队列事件的FIFO处理:在所有以前的事件都被完全处理之前,不会处理新事件
  2. 确保我可以控制并行执行的事件的数量

一个典型的例子是,我有200到800个队列,并且我不希望并行工作人员超过8个。

我决定使用n+1个队列和n+m个工作者(n=200到800,m=8):

  • 第一类工人(n)负责确保队列中的所有事件
  • 第二类工作者(m)只是以并行方式执行事件

这里是伪代码:

def queues_declare(channel):
channel.queue_declare(queue='type1', durable=True)
channel.queue_declare(queue='type1_callback', durable=True)
channel.queue_declare(queue='type2', durable=True)
def type1(channel):
def callback_type1(ch, method, properties, body):
channel.basic_publish(exchange='',
routing_key='type2',
body=body,
properties=pika.BasicProperties(
reply_to = "type1_callback",
correlation_id = method.delivery_tag,
delivery_mode = 2,
))
def callback_type1_callback(ch, method, properties, body):
ch.basic_ack(delivery_tag = properties.correlation_id)
ch.basic_ack(delivery_tag = method.delivery_tag)
queues_declare(channel)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback_type1,
queue='type1')
channel.basic_consume(callback_type1_callback,
queue='type1_callback')
def type2(channel):
queues_declare(channel)
def callback_type2(ch, method, properties, body):
# XXX: do work !
channel.basic_publish(exchange='',
routing_key=properties.reply_to,
body='',
properties=pika.BasicProperties(
correlation_id = properties.correlation_id,
))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback_type2,
queue='type2')

所以,我的问题是:这是用rabbitmq实现我想要的目标的正确方式吗?有没有更好的方法来控制并行性并确保FIFO处理?

这里有几个问题。

  1. 保证FIFO顺序的唯一方法是使用单个队列进行串行访问。而且,使用大量只将消息重新发布到这个队列的工作人员实际上会稍微放松这一保证——因此,最好以这样的方式设置消息结构,即消息将直接到达该队列。无论如何,最大的缺点是,性能受CPU单个核心性能的约束。

  2. 有一种方法可以仅使用RabbitMQ本身来限制并发性。为此,您需要创建一个单独的队列,并预先填充与所需并发级别相等的消息量。然后,你的员工应该做的第一件事是尝试获取该消息,但不要确认它——因此,该消息将在员工的一生中一直处于这种未确认状态。当工作人员死亡(或只是关闭AMQP连接)时,任何其他工作人员都可以访问该消息来获取它。但也有一个缺点——这只在非集群环境中可靠地工作。例如,请参阅https://aphyr.com/posts/315-jepsen-rabbitmq这个用例几乎正是在哪里测试的。

相关内容

最新更新