chain一个Celery任务,它将一个列表返回到链中间的一个组中



这个问题与这个问题相同:如何将返回列表的Celery任务链接到组中?除了我需要这发生在链的中间,并且只有当中间任务是链中的最后一个"环节"时,接受的解决方案才有效。

这是一个稍微修改过的例子,再现了这个问题:

from random import random
from celery import 
@app.task
def get_list(amount):
return [i for i in range(amount)]
@app.task
def process_item(item):
return [f'id-{item}', random() > .5]
@app.task
def dmap(it, callback):
# Map a callback over an iterator and return as a group
callback = subtask(callback)
return group(callback.clone([arg,]) for arg in it)()
@app.task
def handle_results(results):
for result in results:
if result[1] == None:
continue
return result[1] # return the first True value
def foo():
return chain(
get_list.s(10),
dmap.s(process_item.s()),
handle_results.s() # <-- if I add this, it fails
)
# in a terminal, or somewhere
foo()()

我得到的错误是:

文件"/usr/local/Cellar/python/3.7.4_1/Frameworks/python.framework/Versions/3.7/lib/python3.7/json/encoder.py",默认第179行raise TypeError(f'类型为{o..名称}的对象'kombu.exceptions.EncodeError:GroupResult类型的对象不是可序列化的JSON

这毕竟是dmap的返回值。。不,它不能序列化。。但请注意,如果我这样做:

>>> lst = [i for i in range(amount)]
>>> chain(group(process_item.s(i) for i in lst), handle_results.s())

那就行了。我对链中的一个成员到底需要传递什么感到困惑。。因为group(...)的结果是:

>>> from app.manager_tasks import process_item
>>> group(process_item.s(e) for e in [1, 2, 3, 4])
group([app.manager_tasks.process_item(1), process_item(2), process_item(3), process_item(4)])
>>> group(process_item.s(e) for e in [1, 2, 3, 4]).delay()
<GroupResult: 07c9be1a-b3e3-4da2-af54-7177f3d91d0f [cf777f54-4763-46bd-a405-2c1993ddbf66, 103298fc-8f1f-4183-ba45-670224fcd319, 3ad87c2c-7b64-4309-a61b-e53ae17302b9, bf2766a3-662a-415d-a35b-037a0476f4a4]>

它本身是一个GroupResult(称为延迟(,否则只是一个组。由于dmap本身就是一个签名,我猜这就是为什么delay()需要在其内部调用chain的原因。。🤔

如果我像在其他stackoverflow(与第一个链接相同(示例中那样调用结果,我将得到一个GroupResult,只有当它是链的最后一个成员(().delay().apply_async()(时,它才会成功。如果我在GroupResult上调用.get()来获得可序列化的东西,那么我会得到以下错误:RuntimeError: Never call result.get() within a task!,这给我带来了一个难题;我怎样才能做到这一点?

在这个问题上很困惑。。但我也是芹菜新手。非常感谢任何关于我如何解决这个问题的建议!

更多的背景知识,我打算重复使用这个链作为另一个链的一部分,该链位于指定管道中阶段的顶层。

正如@DejanLekic所提到的,我应该使用chord。这将解决上述问题:

def foo():
return chord(
get_list.s(10),
dmap.s(process_item.s())
)(handle_results.s())

我一直希望这仍然是chain的一部分,但现在看来还不支持。


下面的内容与这个问题的关系不大,但可能对一些人有用。

使用github问题线程的解决方案,我仍然可以通过嵌套和弦和链来做我需要的事情(在解决了主要问题之后(。不是最干净的,但它有效。。会是这样的:

def foo():
return chord(
get_list.s(10),
dmap.s(process_item.s())
)(chain(handle_results.s(), log_stuff.s()))

由于我无法得到公认的答案,这里有另一个观点,和弦在dmap函数中处理。

这里有一个可重复的例子:

from celery import Celery, subtask, group
app = Celery('tasks', backend='redis://', broker='redis://')

@app.task
def get_list(n):
return [i for i in range(n)]

@app.task
def process_item(item):
return f'id-{item}'

@app.task
def handle_results(results):
return ' - '.join(results)

@app.task
def dmap(it, callback, chord_callback):
# Map a callback over an iterator and return as a group
callback = subtask(callback)
chord_callback = subtask(chord_callback)
final_res = (group(callback.clone((arg,)) for arg in it) |
chord_callback)()
return final_res

if __name__ == "__main__":
pipeline = (get_list.s(10) |
dmap.s(process_item.s(), handle_results.s()))()
task_ids = pipeline.get()
chord_task_id = task_ids[0][0]
print(app.AsyncResult(chord_task_id).get())
# id-0 - id-1 - id-2 - id-3 - id-4 - id-5 - id-6 - id-7 - id-8 - id-9

这里发生的是:

  • get_list任务生成0:9范围,该范围与dmap任务链接
  • dmap任务反序列化两个回调签名,并异步运行一个chord任务,其中第一个回调(此处,转换为"id-."标签(应用于列表的每个元素,第二个回调随后应用于结果列表(此处,将标签连接到单个字符串(。(这里的和弦是隐含的,因为"将一个组与另一个任务链接在一起会自动将其升级为和弦",来自芹菜文档(
  • 这个管道最终返回所有任务id,并且可以从chord id中获取最终结果,例如get

在芹菜v.2.7 上测试

相关内容

  • 没有找到相关文章

最新更新