收集并行Celery任务执行的结果



我正在尝试实现一个相当简单的Celery工作流,在该工作流中,我将接收同一任务的多个并行调用的结果作为元组(或列表)。

@app.task
def add(x, y):
    return x + y
@app.task
def master():
    return group(add.s(1, 2), add.s(3, 4))()

由此,我希望以一种通用的方式检索(3, 7),也就是说,以一种不依赖于工作流本身的方式。我正在寻找某种"将异步结果图简化为基元"操作。我对以下内容进行了实验(为了简洁起见,我用#num替换了结果ID)

r = master.delay()
r.get()      # <GroupResult: #1 [#2, #3]>
r.collect()  # [(<AsyncResult: #0>, <GroupResult #1 [#2, #3]>),
             #  (<GroupResult: #1 [#2, #3]>, [3, 7])
             #  (<AsyncResult: #2>, 3),
             #  (<GroupResult: #3>, 7)]

r.get()返回一个围绕两个AsyncResult ID的包装器,所以我必须递归地处理每个ID。r.collect()是接近的,但它重复得太深。

我可以做一些类似的事情

r.children[0].get()

但这不是通用的,因为它明确地取决于结果图的结构。此外,我可以遍历r.collect(),直到找到一个值不是ResultBase实例的元组,比如

next(value for _, value in r.collect() if not isinstance(value, ResultBase))

但我不确定这是否真的在所有情况下都是正确的,我希望有一种更优雅的方法来做到这一点。

如果有一种方法可以重新构造master任务,使检索结果更容易,我对此持开放态度,只要子任务是并行启动的。如有任何建议,我们将不胜感激。提前谢谢。


EDIT一个相关的问题是,如果我想以非阻塞的方式检索任务结果(例如,在调用r.get()r.collect()之前手动轮询r.status,我不能简单地执行此

r = master.delay()
# some time later...
if r.status in READY_STATES:
    r.get()

因为r是解析为GroupResultAsyncResult,即它在GroupResult或其子代之前完成。是否有一种方法可以以"跳过"顶级AsyncResult的方式调用组?这将解决这两个问题,因为r.statusr.get()将分别反映子任务的状态和值。

当然,正确的解决方案是最简单的:调用master作为函数,在当前进程中执行它。

r = master()
r.get()      # [3, 7]
r.collect()  # [(<GroupResult: #1 [#2, #3]>, [3, 7]),
             #  (<AsyncResult: #2>, 3),
             #  (<AsyncResult: #3>, 7)]

group启动代码不是延迟到工作进程,而是在当前进程中启动。由于group是完全异步的,因此行为不会改变,性能也会提高。

相关内容

  • 没有找到相关文章

最新更新