我正在寻找一些建议,以最好的方式映射从一个任务生成的列表到另一个任务在芹菜。
假设我有一个名为parse
的任务,它解析PDF文档并输出页面列表。然后,每个页面都需要单独传递给另一个名为feed
的任务。这些都需要放入一个名为process
一种方法是:
@celery.task
def process:
pages = parse.s(path_to_pdf).get()
feed.map(pages)
当然,这不是一个好主意,因为我在一个任务中调用get()
。
此外,这是低效的,因为我的parse
任务是围绕生成器函数包装的,并且能够生成页面,这意味着应该可以在解析器生成最后一页之前将第一页排队以供提供。
@celery.task
def process:
for page in parse.s(path_to_pdf).get():
feed.delay(page)
这个例子仍然涉及到在任务中调用get()
。此外,这个例子是一个过度简化,我真的需要做一些事情后,所有的页面已经馈送(即在chord
)。
我正在寻找芹菜中做到这一点的最佳方法。如有任何建议,我将不胜感激。
谢谢!
这对您来说可能为时已晚,但您可能希望使用任务链:
@celery.task
def process():
return chain(parse.s(), feed_map.s())
@celery.task
def feed_map(pages):
return feed.map(pages)
如果你有一些最后的任务,比如final
,你可以这样做:
@celery.task
def feed_map(pages):
return chord(feed.map.s(page) for page in pages, final.s)