使用链式和成组的方法设计芹菜的工作流程



我是芹菜的新手,我正在尝试使用Chain、Groups和Chord设计芹菜的工作流程。以下是我迄今为止所做的:

def __chainfileprocessing(config):
filelist, src = get_files_for_processing(config)
for fileattr in filelist:
chain(  download_file.s(fileattr, src)
,importdata.s(fileattr, src)
,post_processing.s(fileattr, src)
).apply_async()

当前执行顺序:

  • 正在执行所有的download_file((任务
  • 然后执行所有import_data((任务
  • 然后执行所有post_processing((任务

我需要什么:

对于文件列表中的每个项目,任务应按download_file() => import_data() => post_processing()顺序执行。

您的代码正在执行您所说的操作。对于文件列表中的每个项目,chain将依次启动download_fileimport_datepost_processing的任务。

您的代码的作用是:

  • 异步启动文件a的任务链(download_file,然后是importdata,然后是post_processing(;这将启动文件A的download_file。完成后,它将启动文件A.的importdata任务。apply_async立即返回;它不等待任何任务完成
  • 异步地为文件B启动一系列任务(download_file,然后是importdata,再是post_processing(;这将启动文件B的download_file。完成后,它将启动文件B.的importdata任务。文件A的download_file任务可能正在运行,但此调用不知道这一点;它只是将一个任务添加到队列中
  • 等等

在循环结束时,您发送给cerele的是文件A…n的download_file。当每个download_file任务完成时,它将提交其链中的下一个任务。

不同文件的任务之间没有依赖关系;从你发布的代码来看,它看起来不需要(为什么文件2必须等待文件1完成?(

最新更新