使用 Python、Pika 和 AMQP 设计异步 RPC 应用程序的最佳模式是什么?



我的应用程序的生产者模块是由想要在小型集群上提交工作的用户运行的。它通过RabbitMQ消息代理以JSON形式发送订阅。

我尝试了几种策略,到目前为止最好的是下面的,但仍然不能完全工作:

每台集群机器运行一个消费者模块,该模块为自己订阅AMQP队列,并发出一个prefetch_count来告诉代理它一次可以运行多少任务。

我能够使用来自Pika AMQP库的SelectConnection使其工作。消费者和生产者都启动两个通道,一个连接到每个队列。生产者在通道[A]上发送请求并在通道[B]上等待响应,消费者在通道[A]上等待请求并在通道[B]上发送响应。然而,似乎当消费者运行计算响应的回调时,它会阻塞,因此每次对每个消费者只执行一个任务。

最后我需要什么:

  1. 消费者[A]订阅他的任务(每次大约5k)到集群
  2. 代理为每个消费者分发N个消息/请求,其中N是它可以处理的并发任务的数量
  3. 当单个任务完成时,消费者向代理/生产者返回结果
  4. 生产者接收回复,更新计算状态,最后打印一些报告

限制:

  • 如果另一个用户提交工作,他的所有任务将在前一个用户之后排队(我猜这是队列系统自动实现的,但我还没有考虑对线程环境的影响)
  • 任务有一个要提交的顺序,但是它们被回复的顺序并不重要

我进一步研究了一下,我的实际问题似乎是我使用一个简单的函数作为皮卡的SelectConnection.channel.basic_consume()函数的回调。我的最后一个(未实现的)想法是传递一个线程函数,而不是一个常规的线程函数,这样回调就不会阻塞,消费者就可以继续监听。

正如您所注意到的,您的进程在运行回调时阻塞。有几种方法可以处理这个问题,这取决于你的回调做什么。

如果你的回调是IO绑定的(做大量的网络或磁盘IO),你可以使用线程或基于greenlet的解决方案,如gevent、eventlet或greenhouse。但是请记住,Python受GIL(全局解释器锁)的限制,这意味着在单个Python进程中只能运行一段Python代码。这意味着,如果你要用python代码进行大量计算,这些解决方案可能不会比你现有的解决方案快多少。

另一个选择是使用multiprocessing将消费者实现为多个进程。我发现多处理在并行工作时非常有用。您可以通过使用Queue(让父进程作为消费者并将工作分配给其子进程)或简单地启动多个进程(每个进程各自消费)来实现这一点。除非您的应用程序是高度并发的(数千个工作线程),否则我建议简单地启动多个工作线程,每个工作线程都使用自己的连接。通过这种方式,您可以使用AMQP的确认特性,因此,如果一个消费者在仍在处理任务时死亡,消息将自动发送回队列,并将由另一个worker拾取,而不是简单地丢失请求。

最后一个选项,如果你控制生产者,它也是用Python编写的,是使用一个任务库,比如芹菜,为你抽象任务/队列工作。我在几个大型项目中使用过芹菜,发现它写得非常好。它还可以通过适当的配置为您处理多个消费者问题。

你的设置听起来不错。你是对的,你可以简单地设置回调来启动一个线程,并在线程完成时将其链接到一个单独的回调,以便将响应排在通道b上。

基本上,你的消费者应该有自己的队列(大小为N,他们支持的并行度)。当请求通过通道a进入时,它应该将结果存储在主线程与Pika共享的队列中,并将工作线程存储在线程池中。一旦它进入队列,pika应该响应ACK,你的工作线程将唤醒并开始处理。

一旦worker完成了它的工作,它会将结果放到一个单独的结果队列中,并向主线程发出回调,将结果发送回消费者。

如果工作线程使用任何共享资源,您应该注意并确保它们不会相互干扰,但这是一个单独的主题。

由于在线程方面没有经验,我的设置将运行多个消费者进程(其数量基本上是您的预取计数)。每个队列都连接到两个队列,并且它们会愉快地处理作业,而不知道彼此的存在。

最新更新