我已经使用芹菜几个月了,偶然发现一个情况,我只是没有看到任何信息,甚至一个例子,我打算实现。
在这个特定的情况下,我有一个运行API的docker容器和另外两个独立的容器,其中包含芹菜工人。
我有我的队列和任务定义,我调用一个任务与send_task方法。例子:
r = celery_app.send_task('task_a')
同样,我有另一个容器"task_b"它的调用方法与"task_a"相同。
我通过更新我的芹菜应用程序的配置来定义我的任务,并详细说明它们各自的队列,因为它们在其他单独的容器上运行。
,
celery_app.conf.update({
'broker_url': 'amqp://admin:mypass@rabbit:5672',
'result_backend': 'redis://redis:6379/0',
'imports': (
'tasks_a_dev',
'tasks_b_dev',
),
'task_routes': {
'task_a': {'queue': 'qtasks_a_dev'},
'task_b': {'queue': 'qtasks_b_dev'},
},
'task_serializer': 'json',
'result_serializer': 'json',
'accept_content': ['json']
})
无论如何,我可以将这两个任务链在一起,同时将task_a的结果传递给task_b?
如果你使用芹菜>V3.0.0,你可以使用链接。
如果你想让task_b和task_a的结果一起运行,你可以这样做:
from celery import chain
@task()
def task_a(a, b):
time.sleep(5)
return a + b
@task()
def task_b(a, b):
time.sleep(5)
return a + b
# the result of the first job will be the first argument of the second job
res = chain(task_a.s(1, 2), task_b.s(3)).apply_async()
# Alternatively, you could do the following
res_2 = (task_a.s(1, 2) | task_b.s(3)).apply_async()
# check ret status to get result
if ret.status == u'SUCCESS':
print "result:", ret.get()