我正试图在Celery中运行一组链。我创建了一组链:
chains = [celery.chain(a.s(i), b.s(), c.s()) for i in items]
返回将其分组:
group = celery.group(*chains)
这里的期望是,Celery将安排每个完整的链作为一个独立的任务运行。事实上,从逻辑上讲,这似乎就是正在发生的事情。但有两个问题:
如果链的数量很大,则似乎什么都不会运行。Celery或rabbitmq控制台中没有错误。(是,使用rabbitmq.)
Celery似乎在组中的所有任务中执行每个链的第一个任务,然后转到每个链的第二个任务。(也就是说,它似乎将链展开为一组任务
a
s、任务b
s,然后是任务c
s。它们仍然链接到相应的链条目,但当某些任务a
s比其他任务完成得快得多时,会引入延迟。
你知道发生了什么事吗?
一个非常有趣的问题!
我已经编写了代码来测试您的案例,使用内存后端和一个进程(位于底部)。
celery -A module-name --loglevel=info -c 10
-
类似屏障的行为:这似乎不是问题。如果你应用不同的睡眠,或者以高并行度执行许多任务,你会看到
b
和c
任务是与a
并行执行的 -
在大链上失败:当我试图创建1000000个链时,代码实际上在创建链时默默地失败了,所以这看起来更像是python内存问题。100000长度的查韦斯是很好的
代码
from celery import Celery, chain, group
from pprint import pprint
import threading
from time import sleep
app = Celery('chaintext')
app.conf.update(
BROKER_BACKEND = 'memory',
CELERY_RESULT_BACKEND = 'cache',
CELERY_CACHE_BACKEND = 'memory',
CELERY_EAGER_PROPAGATES_EXCEPTIONS = True,
CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'], # Ignore other content
CELERY_ENABLE_UTC=True,
CELERYD_POOL = 'celery.concurrency.threads:TaskPool'
)
@app.task
def a(i):
result = 'A %s' % i
sleep((i%3)/ 10.0)
pprint(result)
return result
@app.task
def b(self,i):
result = 'B %s' % i
sleep((i%3)/ 10.0)
pprint(result)
return result
@app.task
def c(self,i):
result = 'C %s' % i
sleep((i%3)/ 10.0)
pprint(result)
return result
def main():
print "MAIN"
import time
time.sleep(5)
print "STARTING"
chains = [chain(a.s(i), b.s(i), c.s(i)) for i in range(1000000)]
print "CREATED CHAINS"
g = group(*chains)
print "CREATED GROUP"
result = g.apply_async()
print "QUEUED GROUP"
print result.get()
t1 = threading.Thread(target=main)
t1.start()