我正在使用 Django Celery 和 Redis 来运行一些这样的任务:
header = [
tasks.invalidate_user.subtask(args = (user)),
tasks.invalidate_details.subtask(args = (user))
]
callback = tasks.rebuild.subtask()
chord(header)(callback)
所以基本上与文档中所述的相同。
我的问题是,当调用此任务和弦时 ,celery.chord_unlock
任务会永远重试。中的任务header
成功完成,但由于chord_unlock
从未完成,因此 永远不会调用callback
。
猜测我的问题是无法检测到header
的任务已完成,我转向文档以查看如何对其进行自定义。我找到了一个部分,描述了如何实现同步,提供了一个示例,我缺少的是如何调用该示例函数(即是否有信号?
注意的是,此方法不与 Redis 后端一起使用:
除 Redis 和 Memcached 之外的所有结果后端都使用它,它们在标头中的每个任务之后增加一个计数器,然后在计数器超过集合中的任务数时应用回调。
但也说,Redis方法更好:
Redis和Memcached方法是一个更好的解决方案
这是什么方法?它是如何实施的?
那么,为什么chord_unlock
从未完成,如何让它检测已完成的header
任务?
我正在使用: 姜戈 1.4, 芹菜 2.5.3, 姜戈-芹菜 2.5.5, 红薯 2.4.12
您没有任务示例,但我遇到了同样的问题,我的解决方案可能适用。
我ignore_result=True
了我添加到和弦的任务,定义如下:
@task(ignore_result=True)
显然,忽略结果会使chord_unlock任务不知道它们已完成。在我删除ignore_result(即使任务只返回 true(后,和弦正确地调用了回调。
我遇到了同样的错误,我将代理更改为 rabbitmq,并且chord_unlock工作直到我的任务完成(2-3 分钟任务(
使用 Redis 时,任务完成,chord_unlock每 1 秒重试 8-10 次,因此回调未正确执行。
[2012-08-24 16:31:05,804: INFO/MainProcess] Task celery.chord_unlock[5a46e8ac-de40-484f-8dc1-7cf01693df7a] retry: Retry in 1s
[2012-08-24 16:31:06,817: INFO/MainProcess] Got task from broker: celery.chord_unlock[5a46e8ac-de40-484f-8dc1-7cf01693df7a] eta:[2012-08-24 16:31:07.815719-05:00]
... just like 8-10 times....
更改代理对我有用,现在我正在测试@Chris解决方案,我的回调函数永远不会收到来自标头子任务 :S 的结果,因此,它对我不起作用。
芹菜==3.0.6
姜戈==1.4
姜戈-芹菜==3.0.6
雷迪斯==2.6
代理:Mac OS X 上的 redis-2.4.16
这可能会导致这样的问题;从文档中;
注意:
如果您在 Redis 结果后端中使用和弦并覆盖 Task.after_return(( 方法,则需要确保调用 super 方法,否则将不会应用和弦回调。
def after_return(self, *args, **kwargs):
do_something()
super(MyTask, self).after_return(*args, **kwargs)
据我了解,如果您在任务中覆盖了after_return
函数,则必须将其删除或至少调用 superone。
主题底部:http://celery.readthedocs.org/en/latest/userguide/canvas.html#important-notes