我是芹菜的新手,我正在尝试使用Chain、Groups和Chord设计芹菜的工作流程。以下是我迄今为止所做的:
def __chainfileprocessing(config):
filelist, src = get_files_for_processing(config)
for fileattr in filelist:
chain( download_file.s(fileattr, src)
,importdata.s(fileattr, src)
,post_processing.s(fileattr, src)
).apply_async()
当前执行顺序:
- 正在执行所有的download_file((任务
- 然后执行所有import_data((任务
- 然后执行所有post_processing((任务
我需要什么:
对于文件列表中的每个项目,任务应按download_file() => import_data() => post_processing()
顺序执行。
您的代码正在执行您所说的操作。对于文件列表中的每个项目,chain
将依次启动download_file
、import_date
和post_processing
的任务。
您的代码的作用是:
- 异步启动文件a的任务链(
download_file
,然后是importdata
,然后是post_processing
(;这将启动文件A的download_file
。完成后,它将启动文件A.的importdata
任务。apply_async
立即返回;它不等待任何任务完成 - 异步地为文件B启动一系列任务(
download_file
,然后是importdata
,再是post_processing
(;这将启动文件B的download_file
。完成后,它将启动文件B.的importdata
任务。文件A的download_file
任务可能正在运行,但此调用不知道这一点;它只是将一个任务添加到队列中 - 等等
在循环结束时,您发送给cerele的是文件A…n的download_file
。当每个download_file
任务完成时,它将提交其链中的下一个任务。
不同文件的任务之间没有依赖关系;从你发布的代码来看,它看起来不需要(为什么文件2必须等待文件1完成?(