当我使用芹菜组和链条安排以下任务
时(group([group_task]) | sum_task).apply_async()
可以在许多工人中执行组任务,在所有组任务完成后,SUM_TASK开始执行(也许在其他工人中),因此谁能告诉我芹菜多么熟悉组任务,然后开始sum_task?
您可以为每个链式任务指定队列和组/和弦回调任务。
片段喜欢:
@shared_task(name="analyze_atom", queue="atom")
def analyze_atom(image_urls, targetdir=target_path, studentuid=None):
return {}
@shared_task(name="summary_up", queue="summary")
def summary_up(rets, studentuid, images):
return {}
chord(analyze_atom.s([image]) for image in images)(summary_up.s(studentuid, images))
,并且,当任务运行时,您可以检查经纪人内容,假设您正在使用RabbitMQ作为经纪人,则可以通过RabbitMQ Management插件来检查队列深度,或者可以在此处使用Pyrabbit Interface片段:
from pyrabbit.api import Client
cl = Client('localhost:15672', 'guest', 'guest')
count = cl.get_queue_depth('/', 'summary') # this guy check queue depth
cl.get_messages('/','paperanalyzer') # this guy get messages within queue
,您应该有结果后端,您可以通过任务ID获得每个任务结果。
我认为上面的技能,很容易检查芹菜任务的进行方式。
祝你好运: - )