我的工作流程如下,我使用芹菜与rabbitMQ
步骤1。一个大文件被分成多个部分(假设是4个),并放入MQ,
步骤2。一些worker(假设是2)将处理这些文件并存储在某个地方。
现在,我的问题是,我有另一个任务要完成,那就是连接这些文件,这当然是一个同步任务,即文件的所有部分都应该被处理,那么,我通过芹菜做什么来使连接任务依赖于步骤2。
我是否创建一个单独的应用程序来连接文件,它可以以某种方式接收这些工人的状态,无论他们是否完成了这些文件的处理。
或将文件连接作为MQ中的任务,这同样可以(阻塞等待)确保所有部分都被处理,然后连接文件(这同样可以由任何worker完成)
哪一种方法是可行的?使这两个任务相互依赖
是两个芹菜应用程序/任务可以相互依赖。
为了达到你的目标,我会使用芹菜画布:http://celery.readthedocs.org/en/latest/userguide/canvas.html更准确地说是'和弦'
一个和弦是一个任务,只有在一个组中的所有任务完成后才执行执行。
from celery import chord
@task
def process_parts(part):
pass
@task
def join_parts(parts)
pass
def split_file(f)
return file_parts_array
def process_file(f):
process_parts = [process_part.s(x) for x in split_file(f)]
join_parts = join_files.s()
result = chord(process_parts)(join_parts)
return result
您将任务join_parts映射到一个特定的队列,因此只有存储机器上的工作线程才能使用join_files任务。