我正在使用芹菜来运行成千上万个任务的组,每个任务需要几分钟才能运行。下面的代码是我对multiprocessing.pool.Pool.map
的简单插入替换:
def map(task, data):
"""
Perform the *task* on *data* in distributed way. Blocks until finished.
"""
ret = celery_module.group(task.s(val) for val in data).apply_async()
return ret.get(interval = 0.1)
只要工人永远不会破裂,这就像魅力一样。但是有时候,一个节点死亡,并用它执行一些运行的任务。然后发生的事情是,所有其他任务都完成了,工人变得空闲,但是get
永远等待死者工人的结果。
如何在超时后重试死去的任务?这些任务是掌握的,我一点都不担心重复执行。我已经尝试使用CELERY_ACKS_LATE
玩弄,然后将超时放在这里,但是似乎没有什么能补救这种情况。我觉得我错过了一些明显的东西,但找不到。
编辑:用于经纪人和结果所用的运输是Redis。
正确的行为是设置超时,而当死亡重试的整个map
任务时。