将任务的结果输入到 Celery 中的地图中



我觉得这是一个非常简单的用例,但我没有从文档中成功解决。

这是我的问题的人为版本:

我有一个任务,它接受单个参数并返回一个列表。我想对列表中的每个元素应用另一个任务。然后(根据情况(我可能想继续处理其他任务。

chain(
my.tasks.divide.s(data),
my.tasks.conquer.map(),
).apply_async().get()

chain(
my.tasks.divide.s(data),
my.tasks.conquer.map(),
my.tasks.unite.s(),
my.tasks.rule.s(),
).apply_async().get()

地图不是那样工作的,但我觉得它应该!我想要的只是从单个参数链接到列表,然后再返回。.这当然不难做到吗?

这里和这里的"解决方案"似乎过于复杂,我不敢相信这是最佳实践。

关于和弦、映射和链的文档对这种特殊情况没有帮助。

编辑:到目前为止我有什么。这里的问题是我必须调用.apply_async().get()两次才能获得实际结果,因为中间步骤返回的是和弦而不是实际结果:

@app.task()
def divide(item):
return [x for x in item]
@app.task()
def conquer(item):
return item.upper()
@app.task()
def rule(items):
return "".join(items)
@app.task()
def conquer_group(items):
return group([conquer.s(x) for x in items])
@app.task()
def rule_group(items):
return chord([conquer.s(x) for x in items], rule.s())
def main():
print chain(
divide.s("foobarbaz"),
rule_group.s(),
).apply_async().get().apply_async().get()

这里的问题是map()需要一个可迭代对象来实例化。 这就是为什么可以做(sample.map(range(100)) | another_task.s()).apply_async()但不能实例化接受链中最后一个任务结果的map()签名。 我认为最简单的方法是在任务中实例化map()签名。

@app.task()
def divide(item):
return [x for x in item]
@app.task()
def conquer(item):
return item.upper()
@app.task()
def process(items):
return conquer.map(items)
def run():
(divide.s("foobar") | process.s()).apply_async()

相关内容

  • 没有找到相关文章

最新更新