我正在学习AMQP,所以这可能是对我正在做的事情的误解。我正试图在我自己的私人服务器上设置rabbitmq,为一个系统农场提供工作(我有一堆需要处理的图像)。我建立了一个三步管道:
| put image on queue | --work_queue--> | process | --results--> | archive results |
我安装了rabbitmq,并在服务器上创建了两个队列work_queue和results。我使用amqplib编写了一些python脚本,并且使用一个图像处理器工作人员可以很好地运行管道。我已经向队列中添加了100个图像,我的一台机器很高兴地一次抓取一个图像,大量地处理数据,并将结果放在结果队列中。
问题是,我曾假设,如果我在另一台机器上启动另一个图像处理程序,它也会简单地将工作从队列中删除。这似乎正是rabbitmq网站上"工作队列"教程中列出的情况。我原以为这会奏效。然而,真正发生的是,无论我开始雇佣多少其他员工,他们都只是永远等待,永远不会得到任何工作,即使有很多消息在work_queue中等待。
我误解了什么?在服务器上排队工作的相关代码:
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="foo:5672 ", userid="pipeline",
password="XXXXX", virtual_host="/", insist=False)
chan = conn.channel()
....
msg = { 'filename': os.path.basename(i) }
chan.basic_publish(amqp.Message(json.dumps(msg)), exchange='',
routing_key='work_queue')
而流程工作者的消费者端:
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="foo:5672 ", userid="pipeline",
password="XXXXX", virtual_host="/", insist=False)
chan = conn.channel()
def work_callback(msg):
....
while True:
chan.basic_consume(callback=work_callback, queue='work_queue')
try:
chan.wait()
except KeyboardInterrupt:
print "nExiting cleanly"
sys.exit(0)
我可以看到其他工人都有联系:
$ sudo rabbitmqctl list_queues
Listing queues ...
results 0
work_queue 246
...done.
$ sudo rabbitmqctl list_connections
Listing connections ...
pipeline 192.168.8.1 41553 running
pipeline XX.YY.ZZ.WW 46676 running
pipeline 192.168.8.4 44482 running
pipeline 192.168.8.6 41884 running
...done.
其中XX.YYY.ZZ.WW是外部IP。192.168.8.6的工作程序正在队列中大量工作,但外部IP上的工作程序闲置在那里,而246条消息在它应该等待的队列中等待。
想法?
很抱歉回答了我自己的问题,但我终于得到了我想要的行为。我需要将QoS预取计数设置为1。显然,通过不设置预取计数,一旦一个客户端连接,所有200多条消息都会被传递到其"通道"并从队列中排出,因此其他人看不到任何工作,除非原始工作已断开连接(从而关闭通道并将消息放回队列)。
通过添加:
chan.basic_qos(0,1,False)
对我的员工来说,他们现在一次只收到一条信息。这并不完全是我所期望的,但它现在仍按预期运行。