我有一个关于芹菜的问题。
我正在向任务队列发送大量数据。
我只是尝试了太多的任务数据,它不适合我的记忆。结果是芹菜被杀死了,因为它占用了整个系统内存。
消息不写入磁盘(代理rabbitmq),尽管
CELERY_DEFAULT_DELIVERY_MODE = 'persistent'
。
我的任务是这样的:
group(some_iterator).apply_async()
是否芹菜首先尝试序列化整个数据(我使用pickle),然后将其发送到任务队列?
如果我运行它,我可以看到任务被写入磁盘的数据越少。
谢谢你的帮助!
我发现了一种实现这种行为的可能性:
result_group = GroupResult(results = [])
for task_args task_arg_generator:
task = analyze_task.delay(*task_args)
result_group.add(task)
result_group.get()
# or :
result_group.join_native()
将每个任务单独序列化并发送,并将其收集在ResultGroup中,该ResultGroup可用于检索组结果。
但我选择的解决方案是将数据存储在数据库中,并让工作人员通过id获取它。
希望这能帮助一些面临同样问题的人!