我使用celry .chord(…)创建了一组任务和一个方法,该方法在组中的所有任务完成后被调用。
我使用amqp结果后端(但我想切换到memcached)。
我的worker每秒钟一遍又一遍地打印这一行。我不知道如何打破这个无限循环。我可以访问rabbitMQ web界面,但我找不到ID为"32ba5fe4-…"的东西。
[2013-03-22 14:18:26,896: INFO/MainProcess] Task celery.chord_unlock[32ba5fe4-918c-480f-8a78-a310c11d0c3a] retry: Retry in 1s
[2013-03-22 14:18:26,897: INFO/MainProcess] Got task from broker: celery.chord_unlock[32ba5fe4-918c-480f-8a78-a310c11d0c3a] eta:[2013-03-22 13:18:27.895123+00:00]
这是一个测试环境。没有数据可以丢失。
我用芹菜3.0.16
不应该是一个无限循环。
芹菜。Chord_unlock任务检查和弦子任务是否完成以调用合并回调任务。如果没有,它会安排自己在一秒钟后再次检查。当您的和弦任务完成时,您将不再在日志中看到这些消息。
编辑:你可以撤销chord_unlock任务来停止循环
celery.control.revoke('32ba5fe4-918c-480f-8a78-a310c11d0c3a')
为了进行完整性检查,我在worker启动时通过信号设置max_retries
:
from celery.signals import worker_init
@worker_init.connect
def limit_chord_unlock_tasks(worker, **kwargs):
"""
Set max_retries for chord.unlock tasks to avoid infinitely looping
tasks. (see celery/celery#1700 or celery/celery#2725)
"""
task = worker.app.tasks['celery.chord_unlock']
if task.max_retries is None:
retries = getattr(worker.app.conf, 'CHORD_UNLOCK_MAX_RETRIES', None)
task.max_retries = retries
然后在我的芹菜配置中添加一个CHORD_UNLOCK_MAX_RETRIES
变量
我也有同样的问题。为了停止循环,我安装了flower,然后在web界面的Tasks菜单中撤销了该任务。Revoke按钮位于单击任务UUID后出现的任务详细信息页面中。