将芹菜任务的结果链接到分布式组中



就像在另一个问题中一样,我想从芹菜任务返回的列表创建一个芹菜组。这个想法是,第一个任务将返回一个列表,第二个任务会将该列表分解为列表中每个项目的并发任务。

计划是在下载内容时使用它。第一个任务从网站获取链接,第二个任务是一个链,用于下载页面、处理页面,然后将其上传到 s3。最后,一旦完成所有子页面,网站就会在我们的数据库中标记为完成。像这样:

chain(
get_links_from_website.si('https://www.google.com'),
dmap.s(  # <-- Distributed map
download_sub_page.s() | 
process_sub_page.s() | 
upload_sub_page_to_s3.s()
),
mark_website_done.s()
)

到目前为止,我看到的解决方案似乎在这方面做得足够好,但是当第二个任务是链时,由于clone不进行深度复制的问题而失败(有关详细信息,请参阅此答案的评论):

@task
def dmap(it, callback):
# Map a callback over an iterator and return as a group
callback = subtask(callback)
return group(callback.clone([arg,]) for arg in it)()

它还存在一个问题,即如果可迭代对象长度为 10,000 个项目,它将创建一个包含 10,000 个项目的组。正如您可以想象的那样,这正在消耗我们的内存使用量。

所以,我正在寻找一种dmap的方法:

  • 不会通过创建可怕的组来炸毁 RAM(也许有一种方法可以对可迭代对象进行分块?
  • 适用于芹菜链,深拷贝没有问题。

芹菜画布提供了将任务拆分为块的块。不幸的是,这不适用于链、组等原语。

您可以使用芹菜信号来防止 dmap/克隆出现问题。

ch = chain(
download_sub_page.s(),
process_sub_page.s(),
upload_sub_page.s(),
)
@task_success.connect(sender='get_links_from_website')
def task_success_handler(sender=None, headers=None, body=None, **kwargs):
result = kwargs['result']    
header = [ch(i) for i in result]
callback = mark_website_done.si()
chord(header)(callback)

创建一个用于处理页面的链,并使用和弦将最后一个任务挂接到它。只要get_links_from_website成功运行,就会执行此函数。

根据链所花费的时间,您还可以将get_links_from_website的结果保存在某处。然后迭代其中一批以排队链,使用最后一批,您可以将回调挂接到最后一个任务。

这有点笨拙,但我们使用 deepcopy 来克隆回调,这修复了签名浅拷贝的错误

def dmap(it, callback, final=None):
# Map a callback over an iterator and return as a group
callback = subtask(callback)
run_in_parallel = group(subtask(copy.deepcopy(dict(callback))).clone([arg, ]) for arg in it)
if len(run_in_parallel.tasks) == 0:
return []
if final:
return chord(run_in_parallel)(final)
return run_in_parallel.delay()

请注意,这仅适用于一个嵌套级别(即回调是链/组/和弦),但不适用于深度嵌套回调

对于深度嵌套的回调图,我们使用这个技巧,它有点慢,但工作完美

# Hack to completely clone a signature with possibly complex subtasks (chains, chords, etc...)
run_in_parallel = group(pickle.loads(pickle.dumps(callback)).clone([arg, ]) for arg in it)

对于组的大小,您始终可以将迭代器拆分为块

如果有人遇到这种情况,Jether的回答有很大帮助,但并不完美。对我们来说,有三个问题:

  1. 如果callback本身是一个链,答案不会将参数传递到链上。 https://stackoverflow.com/a/59023231/19882725 有助于通过clone_signature提供解决方案。这似乎适用于使用 RabbitMQ 作为代理的合理嵌套链,但我们没有尝试任何极端的方法(因此不需要调整它以使用pickle)。
  2. 如果callback是组或和弦,我们需要将参数应用于克隆的每个任务,因此我们修改了 (1) 中的clone_signature以适应这种情况。
  3. 添加 (1) 后,通过final中断 - 我们采用了 https://github.com/celery/celery/issues/5265 的解决方案,将 finaldict转换为Signature
  4. 最后,我们发现final在许多情况下实际上不会执行,因为chord收到的是Group而不是任务列表。

对于任何好奇的人,这是我们的最终解决方案:

import copy
from celery import Signature, chord, group, shared_task, subtask

def clone_signature(sig, args=(), kwargs=(), **opts):
"""
Turns out that a chain clone() does not copy the arguments properly - this
clone does.
From: https://stackoverflow.com/a/53442344/3189
"""
if sig.subtask_type and sig.subtask_type not in ["chain", "group", "chord"]:
raise NotImplementedError(
"Cloning only supported for tasks, chains, groups, and chords, not {}".format(
sig.subtask_type
)
)
clone = sig.clone()
# if the task we're cloning is a group or chord, apply the arguments to each of the children
if sig.subtask_type and sig.subtask_type in ["group", "chord"]:
clone.tasks = [
clone_signature(task, args=args, kwargs=kwargs, opts=opts)
for task in clone.tasks
]
# otherwise, apply the arguments to either the task itself (if it's a single task)
# or the first child task (if it's a chain)
else:
if hasattr(clone, "tasks"):
task_to_apply_args_to = clone.tasks[0]
else:
task_to_apply_args_to = clone
args, kwargs, opts = task_to_apply_args_to._merge(
args=args, kwargs=kwargs, options=opts
)
task_to_apply_args_to.update(
args=args, kwargs=kwargs, options=copy.deepcopy(opts)
)
return clone

@shared_task
def dmap(it, callback, final=None):
if not len(it):
return []
callback = subtask(callback)
run_in_parallel = [
clone_signature(callback, args if type(args) is list else [args]) for args in it
]
if not final:
return group(*run_in_parallel).delay()
# see https://github.com/celery/celery/issues/5265
if not isinstance(final, Signature):
final["immutable"] = True
final = Signature.from_dict(final)
return chord(run_in_parallel)(final)

这使我们能够成功执行如下所示的嵌套dmap

chain(
taskA.s(),
dmap.s(
chain(
taskB.s(),
taskC.s(),
dmap.s(
taskD.s(),
final=chain(
taskE.s(),
taskF.s(),
),
),
),
),
).delay()

相关内容

  • 没有找到相关文章

最新更新