传递前面任务的结果



这是我的代码。我想传递任务myname结果传递为任务reverse在签名中作为参数。

这是我的代码。我想传递任务myname结果传递为任务reverse在签名中作为参数。
from app import app
from app import app
from time import sleep
from celery.utils.log import get_task_logger
import os
from celery import signature, chain, group, chord
from celery.result import allow_join_result

MyQUEUE = os.getenv("SCANS_QUEUE")
logger = get_task_logger(__name__)
@app.task(queue=MyQUEUE, ignore_result=True)
def reverse(text):
logger.info('reverse order '.format(text))
return {"reversename": str(text[::-1])}
@app.task(queue=MyQUEUE, ignore_result=True)
def add(a,b):
logger.info('Addition --> a : {0} & b : {1} '.format(a,b))
return {"addition": str(a+b)}
@app.task(queue=MyQUEUE, ignore_result=True)
def myname(a):
logger.info('Name --> a : {0}'.format(a))
return {"name": str(a)}

@app.task(queue=MyQUEUE, ignore_result=True)
def run_pipeline(a,b,n):
resultchain = chain([
group([
signature(
add,
args=(a,b),
queue=MyQUEUE
),
signature(
myname,
args=(n),
queue=MyQUEUE
)
]),
signature
(
reverse,
args=(-------),
queue=MyQUEUE
)
]).apply_async()
with allow_join_result():
results = resultchain.join()
return results

首先,最重要的是,如果您要使用chain,group,starmap或其他类型的任务工作流,则需要使用ignore_result=False设置结果或省略参数(默认值为False)。需要存储值,至少在mynameadd

@app.task(queue=MyQUEUE)
def add(a,b):
logger.info('Addition --> a : {0} & b : {1} '.format(a,b))
return {"addition": str(a+b)}
@app.task(queue=MyQUEUE)
def myname(a):
logger.info('Name --> a : {0}'.format(a))
return {"name": str(a)}

现在,为了使reverse获得addmynamegroup中的结果,您需要调整reverse来处理组结果(结果列表)。

对于chain,一个任务的结果将被用作下一个任务的第一个参数,在这种情况下,组结果将被注入到反向任务的第一个值[{'addition': ...}, {'name': ...}]中,这样您可以访问正确的值。

@app.task(queue=MyQUEUE)
def reverse(group_data):
# group_data value: [{'addition': '3'}, {'name': 'VALUE'}]
text = group_data[1]['name']
logger.info('reverse order '.format(text))
return {"reversename": str(text[::-1])}

最后,如果你只想反转myname的结果,你必须只链接mynamereverse

resultchain = chain([
signature(myname, args=(n,)),
signature(reverse)
]).apply_async()

最新更新