从上一个任务列表结果运行组任务



我一直在尝试使用芹菜执行管道。初始任务应该创建一个要处理的项目列表,我将使用组来进一步并行化每个项目处理。最后,我应该从小组任务中收集结果。

@app.task()
def prepare():
return [item1, item2, item3]
@app.task()
def parallel_process(items, additional_param):
return group(process.s(i, additional_param) for i in items)() # I get an error kombu.exceptions.EncodeError: Object of type GroupResult is not JSON serializable
@app.task()
def process(i, param):
return mapping_func(item, param)
@app.task()
def collect(results):
print(results)
pipeline = prepare.s() | parallel_process.s(param) | collect.s()
pipeline.apply_async()

我收到一个错误kombu.exceptions.EncodeError: 类型为 GroupResult 的对象不可 JSON 序列化

进程任务被调用,但收集任务不会被调用。 最终结果永远不会到来。还有其他方法可以做到这一点吗?无法在网上找到合适的示例。

根据您的错误消息,您应该将CELERY_RESULT_SERIALIZER更改为pickle,因为GroupResult类型不可序列化。

参考: https://docs.celeryproject.org/en/stable/userguide/configuration.html#result-serializer

最新更新