带有python的Rabbitmq的工作方式出乎意料



我正在与rabbitmq合作进行一些练习。

但是rabbmitmq的行为与官方网站上的教程不同。

workertask_sender使用以下代码与rabbitmq。

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue')

task_sender通过调用发送任务

for i in range(10):
    message = "job%s %d %s" % (str(random.randint(1,10)), i , '.'*i)
    channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

CCD_ 4通过调用并等待一段时间来获得任务。

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Job Done!")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='task_queue',
                      no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

首先,我运行task_sender.py来发送十个作业,它运行得很好。

但当我在不同的shell中启动两个worker.py时,似乎只有一个工作者在获取任务,而另一个工作者什么也不做。

更重要的是,当工作的worker完成队列中的所有作业时,我再次运行task_sender.py来发送新任务,workers中的None将不再获取作业。

看来rabbitmq正在处理阻塞,我该怎么解决呢?

这是我的Rabbitmq状态

欢迎任何帮助,提前表示感谢。

在您的worker中,尝试将prefetch_count设置为合理的值,并将no_ack设置为false:

...
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                  queue='task_queue',
                  no_ack=False)
...

我不确定pika,但在使用以rabbitmq为后端的oslo.messageing时,我也遇到了同样的问题。实际情况是,通知消息的生产者发送了它,第一个捕获消息的侦听器(消费者)消耗了它,而其他侦听器从未得到它

oslo有一个功能"扇出",它向所有服务器发送通知消息,而不是"第一个赢家,其他人输家"的风格。

我想鼠兔可能也有类似的东西。

奥斯陆通知服务器文档

请尝试在此处搜索关键字"fanout"。然后你可以检查一下pika文档中是否有类似的内容。希望这能有所帮助。

最新更新