AMQP/RabbitMQ有多个消费者,但只有一个起作用



我正在学习AMQP,所以这可能是对我正在做的事情的误解。我正试图在我自己的私人服务器上设置rabbitmq,为一个系统农场提供工作(我有一堆需要处理的图像)。我建立了一个三步管道:

| put image on queue | --work_queue--> | process | --results--> | archive results | 

我安装了rabbitmq,并在服务器上创建了两个队列work_queue和results。我使用amqplib编写了一些python脚本,并且使用一个图像处理器工作人员可以很好地运行管道。我已经向队列中添加了100个图像,我的一台机器很高兴地一次抓取一个图像,大量地处理数据,并将结果放在结果队列中。

问题是,我曾假设,如果我在另一台机器上启动另一个图像处理程序,它也会简单地将工作从队列中删除。这似乎正是rabbitmq网站上"工作队列"教程中列出的情况。我原以为这会奏效。然而,真正发生的是,无论我开始雇佣多少其他员工,他们都只是永远等待,永远不会得到任何工作,即使有很多消息在work_queue中等待。

我误解了什么?在服务器上排队工作的相关代码:

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="foo:5672 ", userid="pipeline",
    password="XXXXX", virtual_host="/", insist=False)
chan = conn.channel()
....
msg = { 'filename': os.path.basename(i) }
chan.basic_publish(amqp.Message(json.dumps(msg)), exchange='',
        routing_key='work_queue')

而流程工作者的消费者端:

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="foo:5672 ", userid="pipeline",
    password="XXXXX", virtual_host="/", insist=False)
chan = conn.channel()
def work_callback(msg):
.... 
while True:
    chan.basic_consume(callback=work_callback, queue='work_queue')
    try:
        chan.wait()
    except KeyboardInterrupt:
        print "nExiting cleanly"
        sys.exit(0)

我可以看到其他工人都有联系:

$ sudo rabbitmqctl list_queues
Listing queues ...
results 0
work_queue      246
...done.
$ sudo rabbitmqctl list_connections 
Listing connections ...
pipeline        192.168.8.1     41553   running
pipeline        XX.YY.ZZ.WW     46676   running
pipeline        192.168.8.4     44482   running
pipeline        192.168.8.6     41884   running
...done.

其中XX.YYY.ZZ.WW是外部IP。192.168.8.6的工作程序正在队列中大量工作,但外部IP上的工作程序闲置在那里,而246条消息在它应该等待的队列中等待。

想法?

很抱歉回答了我自己的问题,但我终于得到了我想要的行为。我需要将QoS预取计数设置为1。显然,通过不设置预取计数,一旦一个客户端连接,所有200多条消息都会被传递到其"通道"并从队列中排出,因此其他人看不到任何工作,除非原始工作已断开连接(从而关闭通道并将消息放回队列)。

通过添加:

chan.basic_qos(0,1,False)

对我的员工来说,他们现在一次只收到一条信息。这并不完全是我所期望的,但它现在仍按预期运行。

最新更新