我正在尝试实现一个相当简单的Celery工作流,在该工作流中,我将接收同一任务的多个并行调用的结果作为元组(或列表)。
@app.task
def add(x, y):
return x + y
@app.task
def master():
return group(add.s(1, 2), add.s(3, 4))()
由此,我希望以一种通用的方式检索(3, 7)
,也就是说,以一种不依赖于工作流本身的方式。我正在寻找某种"将异步结果图简化为基元"操作。我对以下内容进行了实验(为了简洁起见,我用#num
替换了结果ID)
r = master.delay()
r.get() # <GroupResult: #1 [#2, #3]>
r.collect() # [(<AsyncResult: #0>, <GroupResult #1 [#2, #3]>),
# (<GroupResult: #1 [#2, #3]>, [3, 7])
# (<AsyncResult: #2>, 3),
# (<GroupResult: #3>, 7)]
r.get()
返回一个围绕两个AsyncResult
ID的包装器,所以我必须递归地处理每个ID。r.collect()
是接近的,但它重复得太深。
我可以做一些类似的事情
r.children[0].get()
但这不是通用的,因为它明确地取决于结果图的结构。此外,我可以遍历r.collect()
,直到找到一个值不是ResultBase
实例的元组,比如
next(value for _, value in r.collect() if not isinstance(value, ResultBase))
但我不确定这是否真的在所有情况下都是正确的,我希望有一种更优雅的方法来做到这一点。
如果有一种方法可以重新构造master
任务,使检索结果更容易,我对此持开放态度,只要子任务是并行启动的。如有任何建议,我们将不胜感激。提前谢谢。
EDIT一个相关的问题是,如果我想以非阻塞的方式检索任务结果(例如,在调用r.get()
或r.collect()
之前手动轮询r.status
,我不能简单地执行此
r = master.delay()
# some time later...
if r.status in READY_STATES:
r.get()
因为r
是解析为GroupResult
的AsyncResult
,即它在GroupResult
或其子代之前完成。是否有一种方法可以以"跳过"顶级AsyncResult
的方式调用组?这将解决这两个问题,因为r.status
和r.get()
将分别反映子任务的状态和值。
当然,正确的解决方案是最简单的:调用master
作为函数,在当前进程中执行它。
r = master()
r.get() # [3, 7]
r.collect() # [(<GroupResult: #1 [#2, #3]>, [3, 7]),
# (<AsyncResult: #2>, 3),
# (<AsyncResult: #3>, 7)]
group
启动代码不是延迟到工作进程,而是在当前进程中启动。由于group
是完全异步的,因此行为不会改变,性能也会提高。