我正在尝试将现有的芹菜组调用转换为和弦以防止死锁。以前的代码具有重试次数和过期时间。我设法在没有这些设置的情况下让和弦工作,但是当我尝试应用该设置时,我看不到正在运行的任务。我在文档中没有看到任何关于在整个和弦上应用相同设置的内容。我正在运行芹菜版本 3.1.6。
以前的代码:
jobs = group([reset_device.s(topoid, dev_list[i],
waittime_list[i], skipflag) for i in range(len(dev_list))]
).apply_async(expires=waittime, retry=True, retry_policy={
'max_retries': 3,
'interval_start': 0.5,
'interval_step': 0.2,
'interval_max': 0.2})
results = jobs.join_native(timeout=waittime + 600, propagate=True)
工作和弦(无设置(:
jobs = chord([reset_device.s(topoid, dev_list[i],
waittime_list[i], skipflag) for i in range(len(dev_list))])(callback)
非工作和弦 #1:
jobs = chord([reset_device.s(topoid, dev_list[i], waittime_list[i],
skipflag).set(expires=datetime.now() + timedelta(seconds=waittime)).set(retry=True).set(retry_policy=retry_policy)
for i in range(len(dev_list))])(callback)
非工作和弦 #2
jobs = chord([reset_device.subtask(args=(topoid, dev_list[i], waittime_list[i],skipflag),
expires=datetime.now()+timedelta(seconds=waittime), retry=True, retry_policy=retry_policy)
for i in range(len(dev_list))])(callback)
在 #1 和 #2 的情况下,和弦中的任务似乎都没有运行。 如何为和弦中调用的每个任务应用过期时间并重试?
我想通了,这是混合问题。
第一个问题是 expires 字段不接受和弦中的整数,只接受日期时间对象(也可能是组和链(,尽管文档没有进行任何区分。这已在更高版本中修复,我使用 3.1.25 进行了测试并能够验证修复程序。
第二个问题是芹菜 3.1.6 不会在和弦中记录错误(我认为组和链也是如此(。这也已修复,我在 3.1.25 中进行了测试,并且能够看到失败。
第三个问题与错误消息有关:
[2017-08-07 18:39:56,043: ERROR/Worker-5] Chord '98246849-0d5d-4be2-85e3-3fc08e90011d' raised: TaskRevokedError(u'expired',)
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/celery/app/builtins.py", line 90, in unlock_chord
ret = j(timeout=3.0, propagate=propagate)
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 691, in join_native
raise value
TaskRevokedError: expired
这是因为时区不正确。我使用了datetime.now()
而不是datetime.utcnow()
,这解决了该问题并在 3.1.6 中工作。
或者,我可以将芹菜配置设置为CELERY_ENABLE_UTC = False
,默认情况下设置为 True。这让我感到困惑,因为我们将配置CELERY_TIMEZONE
设置为本地时间。当与日期时间对象一起使用时,expires 字段使用本地时间或 UTC,具体取决于CELERY_ENABLE_UTC
设置的值。我建议保持两个配置设置相同。
有趣的是,创建回调函数并轮询以查看和弦是否完成,尽管和弦任务从未执行并且它永远停留在那里。我相信这可能已在芹菜 4.1 中修复。