我一直在尝试使用芹菜执行管道。初始任务应该创建一个要处理的项目列表,我将使用组来进一步并行化每个项目处理。最后,我应该从小组任务中收集结果。
@app.task()
def prepare():
return [item1, item2, item3]
@app.task()
def parallel_process(items, additional_param):
return group(process.s(i, additional_param) for i in items)() # I get an error kombu.exceptions.EncodeError: Object of type GroupResult is not JSON serializable
@app.task()
def process(i, param):
return mapping_func(item, param)
@app.task()
def collect(results):
print(results)
pipeline = prepare.s() | parallel_process.s(param) | collect.s()
pipeline.apply_async()
我收到一个错误kombu.exceptions.EncodeError: 类型为 GroupResult 的对象不可 JSON 序列化
进程任务被调用,但收集任务不会被调用。 最终结果永远不会到来。还有其他方法可以做到这一点吗?无法在网上找到合适的示例。
根据您的错误消息,您应该将CELERY_RESULT_SERIALIZER
更改为pickle
,因为GroupResult
类型不可序列化。
参考: https://docs.celeryproject.org/en/stable/userguide/configuration.html#result-serializer