芹菜画布组的链条将太多的论点传递给了组成任务



我在以下类型的芹菜工作流程中遇到一些非常奇怪的行为:

workflow = group(
    chain(task1.s(), task2.s()),
    chain(task3.s(), task4.s()),
)

这是在django的背景下。

当我调用工作流时如下:

workflow.apply_async((n,))

...对于任何n个整数值,每个链中的第一个任务(task1task3)将使用如下的typeError失败(从celery events取):

   args: [9, 8, 7, 5, 4, 3]
   kwargs: {} 
   retries: 0 
   exception: TypeError('task1() takes exactly 1 argument (6 given)',) 
   state: FAILURE

第一个之后的参数总是以前调用了工作流程的参数。因此,在此示例中,我在这种情况下称为workflow.apply_async((9,)),而其他数字是以前通过的值。每种情况下,传递给task1task3的错误论点都是相同的。

我很想将其作为芹菜的错误报告发布,但我还不确定这个错误不是我的。

我排除的事情:

  • 我肯定会通过我认为我传递给workflow.apply_async的论点。我已经单独构建并记录了我通过的元组,以确保这一点。
  • 与将列表(即可变)传递给apply_async而不是元组无关。我肯定会通过元组(即不可变)。

关于我的设置的唯一中等不寻常的事情,尽管我看不到它是如何连接的,但task1task3是用不同的队列配置的。

我使用芹菜任务时也遇到了类似的问题。

我通过将包含的项目列表列入一个元组中来解决。例如,

假设任务log_i()是一个共享的_task,它本质上是登录变量i,我希望通过块来记录所有i s的列表 -

# log_i shared Task
@shared_task(bind=True)
def log_i(self, i):
    logger.info(i)
    return i

# some calling site 
# ...
very_long_list = [{"i1": i, "i2": i+i, "i3": i**2} for i in range(1000)]
res = log_i.chunks(zip(very_long_list), 10)()
print(res.get())
# ...

注意,做诸如 -

之类的事情
# ...
res = log_i.chunks(very_long_list, 10)()
# ...

将因列表中的项目不是迭代时所说的错误而失败。

拉链将项目按原样移动到新的元组中,因此,您可以将相同的内容捕获到log_i任务中的一个参数中。

最新更新