RabbitMQ队列中芹菜发送的消息丢失



我有一个脚本在一个名为vv的特定队列中调用芹菜的send_task函数3047次。使用rabbitmqadmin list queues命令,我注意到队列中只有2870条消息,没有任何worker连接到vv队列。

你自己经历过这种行为吗?这只是rabbitmqadmin的显示错误吗?

提前感谢您的帮助,如果您需要更多关于脚本的细节我将完成我的问题!

编辑:脚本看起来像这样,有大量的文档,并由celery-beat触发

from celery import Celery
app = Celery()
app.config_from_object('celeryconfig')
@app.task
def document_monitoring(index, query, kind):
# generate random messages
documents = [{'_id': str(i)} for i in range(10000)]
for document in documents:
app.send_task(
"vv_consumer", 
kwargs=document,
queue='vv'
)
return 0

在深入研究我的RabbitMQ实例的日志后,我发现我达到了由变量vm_memory_high_watermark定义的内存警报的限制。默认值是0.4,这意味着一旦使用了实例内存的40%,RabbitMQ就会阻塞发布者(如示例中的脚本)并停止接收新消息。由于这种行为,我错过了最初发送到队列的几条消息。

要解决这个问题,我的解决方案是:

  • 硬件端:增加实例的RAM
  • 软件端:将vm_memory_high_watermark值增加到0.7(遵循文档)

最新更新