我正在与rabbitmq合作进行一些练习。
但是rabbmitmq的行为与官方网站上的教程不同。
worker
和task_sender
使用以下代码与rabbitmq。
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue')
和task_sender
通过调用发送任务
for i in range(10):
message = "job%s %d %s" % (str(random.randint(1,10)), i , '.'*i)
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
CCD_ 4通过调用并等待一段时间来获得任务。
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Job Done!")
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue='task_queue',
no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
首先,我运行task_sender.py
来发送十个作业,它运行得很好。
但当我在不同的shell中启动两个worker.py
时,似乎只有一个工作者在获取任务,而另一个工作者什么也不做。
更重要的是,当工作的worker
完成队列中的所有作业时,我再次运行task_sender.py
来发送新任务,workers
中的None将不再获取作业。
看来rabbitmq正在处理阻塞,我该怎么解决呢?
这是我的Rabbitmq状态
欢迎任何帮助,提前表示感谢。
在您的worker中,尝试将prefetch_count设置为合理的值,并将no_ack设置为false:
...
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue',
no_ack=False)
...
我不确定pika,但在使用以rabbitmq为后端的oslo.messageing时,我也遇到了同样的问题。实际情况是,通知消息的生产者发送了它,第一个捕获消息的侦听器(消费者)消耗了它,而其他侦听器从未得到它
oslo有一个功能"扇出",它向所有服务器发送通知消息,而不是"第一个赢家,其他人输家"的风格。
我想鼠兔可能也有类似的东西。
奥斯陆通知服务器文档
请尝试在此处搜索关键字"fanout"。然后你可以检查一下pika文档中是否有类似的内容。希望这能有所帮助。