芹菜弦不是在等待子任务(一组链)



我的现实情况是,我想从api调用中获得一个活动列表,并为每个活动触发一系列函数。一旦所有的链都完成了,我需要调用一个函数来报告结果。

我已经尽可能地简化了这一点,并编写了以下代码。这会运行,但在链完成之前会调用和弦解锁功能。在这段代码中,这意味着它不能对结果数组求和。

import time
from celery import Celery, chain, chord, group
app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')

@app.task
def generate():
    return [1, 2, 3, 4, 5]

@app.task
def dmap(it, first, second):
    chains = []
    for arg in it:
        c = chain(first.clone([arg, ]), second)
        chains.append(c)
    return group(chains)()

@app.task
def add(x, y):
    print 'add {x} {y}'.format(x=x, y=y)
    time.sleep(3)
    return x + y

@app.task
def mul(x, y):
    print 'mul {x} {y}'.format(x=x, y=y)
    time.sleep(2)
    return x * y

@app.task
def xsum(numbers):
    print numbers
    to_sum = []
    for x in numbers[0]:
        to_sum.append(x.result)
    print to_sum
    return sum(to_sum)
if __name__ == '__main__':
    x = add.s(0)
    y = mul.s(1)
    workers = generate.si() | dmap.s(x, y)
    result = chord(workers)(xsum.s())
    print result.get()

dmap函数就是基于这个答案。我也看到了这个答案。最后一个链接意味着我想做的事情可能是不可能的,因为"没有什么可以同步的,因为组是并行发生的。"

generate函数返回一个数组而不是单个项时,我不知道如何弯曲解决方案。

运行上面的日志显示了(早期?)和弦解锁,因此xsum试图对一组结果求和,其中3是None

[2014-11-11 14:03:10,308: INFO/MainProcess] Received task: tasks.generate[2eedc847-ff67-4e0c-90e1-48314133bb51]
[2014-11-11 14:03:10,311: INFO/MainProcess] Received task: celery.chord_unlock[7d07e506-1aae-40e5-bd05-bbc53b286103] eta:[2014-11-11 14:03:11.307477+00:00]
[2014-11-11 14:03:10,338: INFO/MainProcess] Received task: tasks.dmap[0f2efa72-402d-412e-807e-bbf191850c18]
[2014-11-11 14:03:10,365: INFO/MainProcess] Task tasks.generate[2eedc847-ff67-4e0c-90e1-48314133bb51] succeeded in 0.0523488249746s: [1, 2, 3, 4, 5]
[2014-11-11 14:03:10,386: INFO/MainProcess] Received task: tasks.add[eccf5faa-069c-4634-826e-af5793a11c68]
[2014-11-11 14:03:10,388: WARNING/Worker-2] add 1 0
[2014-11-11 14:03:10,390: INFO/MainProcess] Received task: tasks.add[6b66167b-2767-4bde-a0a0-32f5fab7a961]
[2014-11-11 14:03:10,392: WARNING/Worker-1] add 2 0
[2014-11-11 14:03:10,394: INFO/MainProcess] Received task: tasks.add[d74659b0-b512-44f9-88b4-1908f79bfc52]
[2014-11-11 14:03:10,397: INFO/MainProcess] Received task: tasks.add[e9b3336f-9b37-4f25-81a7-cbac819da38c]
[2014-11-11 14:03:10,398: INFO/MainProcess] Received task: tasks.add[63b2ce22-1288-4cac-9018-8ddefaab575d]
[2014-11-11 14:03:10,399: WARNING/Worker-4] add 3 0
[2014-11-11 14:03:10,401: INFO/MainProcess] Task tasks.dmap[0f2efa72-402d-412e-807e-bbf191850c18] succeeded in 0.061700456019s: <GroupResult: 9a3972ff-0976-46d2-937f-9ea4a1ead56b [925ec9c3-09da-43c1-9b94-c04dbe67f195,...
[2014-11-11 14:03:10,402: WARNING/Worker-3] add 4 0
[2014-11-11 14:03:13,409: INFO/MainProcess] Received task: tasks.mul[f696aa0a-844f-4e81-9722-0693c6e8c344]
[2014-11-11 14:03:13,410: INFO/MainProcess] Received task: tasks.mul[538c3c60-67f8-409d-b4ce-bf09184aa03b]
[2014-11-11 14:03:13,418: INFO/MainProcess] Received task: tasks.mul[4ffb6d04-0cf2-4300-a0de-bf53acf6662d]
[2014-11-11 14:03:13,419: INFO/MainProcess] Received task: tasks.mul[925ec9c3-09da-43c1-9b94-c04dbe67f195]
[2014-11-11 14:03:13,436: INFO/MainProcess] Task tasks.add[d74659b0-b512-44f9-88b4-1908f79bfc52] succeeded in 3.03667491797s: 3
[2014-11-11 14:03:13,437: INFO/MainProcess] Task tasks.add[e9b3336f-9b37-4f25-81a7-cbac819da38c] succeeded in 3.03460178198s: 4
[2014-11-11 14:03:13,438: INFO/MainProcess] Task tasks.add[6b66167b-2767-4bde-a0a0-32f5fab7a961] succeeded in 3.04608612298s: 2
[2014-11-11 14:03:13,439: WARNING/Worker-4] mul 4 1
[2014-11-11 14:03:13,450: WARNING/Worker-2] add 5 0
[2014-11-11 14:03:13,452: INFO/MainProcess] Task tasks.add[eccf5faa-069c-4634-826e-af5793a11c68] succeeded in 3.06420573901s: 1
[2014-11-11 14:03:13,454: WARNING/Worker-3] mul 3 1
[2014-11-11 14:03:13,481: INFO/MainProcess] Task celery.chord_unlock[7d07e506-1aae-40e5-bd05-bbc53b286103] succeeded in 0.0413383140112s: None
[2014-11-11 14:03:13,485: INFO/MainProcess] Received task: tasks.xsum[575f5375-bf0f-4d41-b9a3-57661eaf4373]
[2014-11-11 14:03:15,470: INFO/MainProcess] Task tasks.mul[f696aa0a-844f-4e81-9722-0693c6e8c344] succeeded in 2.031282346s: 4
[2014-11-11 14:03:15,472: WARNING/Worker-1] mul 1 1
[2014-11-11 14:03:15,477: INFO/MainProcess] Task tasks.mul[538c3c60-67f8-409d-b4ce-bf09184aa03b] succeeded in 2.02354899806s: 3
[2014-11-11 14:03:15,479: WARNING/Worker-4] [<GroupResult: 9a3972ff-0976-46d2-937f-9ea4a1ead56b [925ec9c3-09da-43c1-9b94-c04dbe67f195, 4ffb6d04-0cf2-4300-a0de-bf53acf6662d, 538c3c60-67f8-409d-b4ce-bf09184aa03b, f696aa0a-844f-4e81-9722-0693c6e8c344, 82a6b814-53a5-45f1-a0dc-43885f92eca4]>]
[2014-11-11 14:03:15,555: WARNING/Worker-4] [None, None, 3, 4, None]
[2014-11-11 14:03:15,564: ERROR/MainProcess] Task tasks.xsum[575f5375-bf0f-4d41-b9a3-57661eaf4373] raised unexpected: TypeError("unsupported operand type(s) for +: 'int' and 'NoneType'",)
Traceback (most recent call last):
  File "/home/duncan/VEnvs/adwords/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/home/duncan/VEnvs/adwords/local/lib/python2.7/site-packages/celery/app/trace.py", line 437, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/duncan/projects/celerychordtest/tasks.py", line 47, in xsum
    return sum(to_sum)
TypeError: unsupported operand type(s) for +: 'int' and 'NoneType'
[2014-11-11 14:03:16,460: INFO/MainProcess] Received task: tasks.mul[82a6b814-53a5-45f1-a0dc-43885f92eca4]
[2014-11-11 14:03:16,462: WARNING/Worker-3] mul 5 1
[2014-11-11 14:03:16,476: WARNING/Worker-2] mul 2 1
[2014-11-11 14:03:16,476: INFO/MainProcess] Task tasks.add[63b2ce22-1288-4cac-9018-8ddefaab575d] succeeded in 3.02716274199s: 5
[2014-11-11 14:03:17,480: INFO/MainProcess] Task tasks.mul[925ec9c3-09da-43c1-9b94-c04dbe67f195] succeeded in 2.00813938997s: 1
[2014-11-11 14:03:18,485: INFO/MainProcess] Task tasks.mul[4ffb6d04-0cf2-4300-a0de-bf53acf6662d] succeeded in 2.00837794197s: 2
[2014-11-11 14:03:18,471: INFO/MainProcess] Task tasks.mul[82a6b814-53a5-45f1-a0dc-43885f92eca4] succeeded in 2.009012155s: 5

我希望/期待这个过程等到每个链条完成后才能调用和弦解锁。

Like@ChillarAnand建议我最终重新设计任务,但我这样做是为了消除对和弦的需求。我想要有一组链条的能力,这意味着我无法(尽我所能)将其与和弦结合起来。

我现在所做的是触发"最后"任务,作为触发链组的一部分。为了完成这项工作,最后一项任务必须检查其他任务是否已完成。由于我知道我的最后一个任务(在我的现实世界程序中)写入数据库,我可以简单地检查数据库中是否有一行对应于生成的每个项目。

对于任何面临类似问题的人来说,最终函数的相关部分大致如下:

class NotReady(Exception):
    pass
@shared_task(default_retry_delay=30, max_retries=10)
def output(generated_list):
    list_from_db = query db ...
    try:
        raise_if_not_equal(list_from_db, generated_list)
    except NotReady, exc:
        raise current.retry(exc=exc, countdown=30)
    ... everything is ready do stuff ...

FWIW:我可能会更新重试以后退,大致基于以下线程的代码

这感觉是一个很好的答案,至关重要的是,因为这项任务引发了一个例外,我从来没有让一名员工坐下来投票,看看是否一切都完成了。

和弦是只在组中的所有任务之后执行的任务已完成执行。

如果你有像这样的简单和弦

>>> callback = tsum.s()
>>> header = [add.s(i, i) for i in range(100)]
>>> result = chord(header)(callback)
>>> result.get()

它首先执行头中的任务组,并将异步对象存储在列表中。然后,当它调用回调时,它会遍历列表,并从aync对象中获取任务的结果。

在您的情况下,您将传递workers作为标头。workers是一个管道(或单个大任务),当它被执行时,它只提供单个异步对象,而不是对象列表。因此,一旦xsum收到这个消息,它就会尝试在异步对象字典上进行迭代,并尝试对不同类型的对象进行求和。所以,它抛出了错误。

TypeError: unsupported operand type(s) for +: 'int' and 'NoneType'

所以你必须重新设计你的任务,这样你就只能把一组任务作为和弦的标题。

最新更新