我有一个脚本在一个名为vv
的特定队列中调用芹菜的send_task
函数3047次。使用rabbitmqadmin list queues
命令,我注意到队列中只有2870条消息,没有任何worker连接到vv
队列。
你自己经历过这种行为吗?这只是rabbitmqadmin
的显示错误吗?
提前感谢您的帮助,如果您需要更多关于脚本的细节我将完成我的问题!
编辑:脚本看起来像这样,有大量的文档,并由celery-beat
触发
from celery import Celery
app = Celery()
app.config_from_object('celeryconfig')
@app.task
def document_monitoring(index, query, kind):
# generate random messages
documents = [{'_id': str(i)} for i in range(10000)]
for document in documents:
app.send_task(
"vv_consumer",
kwargs=document,
queue='vv'
)
return 0
在深入研究我的RabbitMQ实例的日志后,我发现我达到了由变量vm_memory_high_watermark
定义的内存警报的限制。默认值是0.4,这意味着一旦使用了实例内存的40%,RabbitMQ就会阻塞发布者(如示例中的脚本)并停止接收新消息。由于这种行为,我错过了最初发送到队列的几条消息。
要解决这个问题,我的解决方案是:
- 硬件端:增加实例的RAM
- 软件端:将
vm_memory_high_watermark
值增加到0.7(遵循文档)