我在我的系统中成功地使用了和弦。但是,现在我有一个情况,我必须按顺序运行和弦的数量(第二个在第一个结束时开始(。所以,我试图使用和弦链。但它对我不起作用。
@app.task(bind=True)
def some_celery_beat_worker(self):
feed_chain = []
for feed in feed_list:
celery_task_list = [perform single_task.si(url)
for url in some_url_list]
per_feed_chord = chord(celery_task_list, chord_callback.si(feed['_id'], feed['xml_file_name']))
feed_chain.append(per_feed_chord)
chain(*feed_chain).delay()
获取此作为回溯:
回溯(最近一次调用(: 文件 "python3.4/site-packages/celery/app/trace.py",第 374 行,trace_task R = retval = fun(*args, **kwargs( 文件 "python3.4/site-packages/celery/app/trace.py",第 629 行,__protected_call__ return self.run(*args, **kwargs( 文件"workers.py",第 156 行,joblist_updater_worker chain(*feed_chain(.delay(( 文件 "python3.4/site-packages/celery/canvas.py",第 182 行,延迟 返回self.apply_async(partial_args、partial_kwargs( 文件 "/python3.4/site-packages/celery/canvas.py",第 566 行,apply_async dict(self.options, **options( if options else self.options(( 文件"python3.4/site-packages/celery/canvas.py",第 596 行,正在运行 first_task.apply_async(**options( 文件 "python3.4/site-packages/celery/canvas.py",第 1241 行,apply_async return (self.tasks[0] | body(.set(task_id=task_id(.apply_async( KeyError: 0
我需要的是修复这个特定的工作流程,或任何替代工作流程,它可以解决这个特定的问题(一个接一个地运行和弦的数量(
问题是关于我如何形成chain
.这是不正确的(至少对于我的芹菜版本(,尽管在很多地方都建议。
对我有用的方法。
feed_chain = chain()
for feed in feed_list:
feed_chain |= chord(args)
feed_chain.delay()