我使用的是芹菜,我有几个任务需要按顺序执行。
例如,我有这样的任务:
@celery.task
def tprint(word):
print word
我想做这样的事情:
>>> chain(tprint.s('a') | tprint.s('b'))()
然后我得到TypeError: tprint() takes exactly 1 argument (2 given)
。
和弦也是如此,在这种情况下,我需要在一组任务之后执行一个任务:
>>> chord([tprint.s('a'), tprint.s('b')])(tprint.s('c'))
那么如何应对这种情况呢?我不在乎每项任务的结果,但它们需要按顺序执行。
添加第二个参数无效:
@celery.task
def tprint(word, ignore=None):
print word
>>> chain(tprint.s('a', 0) | tprint.s('b'))()
这将打印出"a"one_answers"无"。
有一个内置功能可以忽略链接和其他操作中的结果-不可变的子任务。您可以使用.si()快捷方式而不是.s()或.subtask(不可变=True)
更多详细信息请点击此处:http://docs.celeryproject.org/en/master/userguide/canvas.html#immutability
已经发布了一个可能的解决方案,但我想添加进一步的澄清和一个替代解决方案(在某些情况下还有一个更好的解决方案)。
您看到的错误表明您的任务签名需要考虑第二个参数,这是由于在调用chain
中的任务时,Celery会自动将每个任务result
作为下一个任务的第一个参数。
来自文档:
任务可以链接在一起,这实际上意味着添加一个回调任务:
>>> res = add.apply_async((2, 2), link=mul.s(16))
>>> res.get()
4
链接的任务将应用其父任务的结果作为第一个参数
因此,在您的情况下,您可以这样重写您的任务:
@celery.task
def tprint(result, word):
print word
如果你不打算对结果做任何事情,你也可以忽略它,通过这样更改装饰器:
@celery.task(ignore_result=True)
然后您就不必更改任务签名了
对不起,最后一点需要进一步研究。
您可以尝试这样做。您可以有两个参数,而不是只有一个参数用于功能tprint
def tprint(word, x=None):
print word
然后
chain(tprint.s('a', 0) | tprint.s('b'))()
最后找到一个解决方法,一个链式装饰器将完成这项工作。
我不知道西芹到底是怎么做到的,但西芹似乎强行将前一个任务的结果绑定到下一个任务中的第一个参数上。
这里有一个例子:
def chain_deco(func):
@functools.wraps(func)
def wrapper(chain=None, *args, **kwargs):
if chain is False:
return False
func(*args, **kwargs)
return True
return wrapper
@celery.task
@chain_deco
def hello(word):
print "hello %s" % word
现在这将给出正确的输出。
>>> (hello.s(word='a') | hello.s(word='b'))()
或
>>> (hello.s('a') | hello.s('b'))(True)
decorator还提供了在中间停止链的能力(使后面的级联失败)
同样的机制也应该适用于chord
。