我对芹菜及其基本原理很熟悉,所以请原谅我的无知。我正在尝试设计一个工作流程,如下所示:
import os
from celery import Celery, group, chord, chain
class CeleryConfig:
"""
Configuration for Celery
"""
broker_url = os.environ.get('CELERY_BROKER_URL','mongodb://localhost:9001/jobs')
result_backend = os.environ.get('CELERY_RESULT_BACKEND', 'mongodb://localhost:9001/')
celery_mongodb_backend_settings = {
"database": "results",
"taskmeta_collection": "test",
}
enable_utc = True
timezone = "UTC"
celery_imports = ('test',)
task_always_eager = os.environ.get('CELERY_ALWAYS_EAGER', False)
task_eager_propagates = os.environ.get('CELERY_EAGER_PROPAGATES',False)
task_serializer = 'json'
celery_app = Celery("test")
celery_app.config_from_object(CeleryConfig)
@celery_app.task(bind=True)
def dummy_task(self, *args, **kwargs):
return True
@celery_app.task(bind=True)
def a(self, pagination, *args, **kwargs):
return pagination[0] + pagination[1]
@celery_app.task(bind=True)
def b(self, pagination, *args, **kwargs):
return pagination[0] + pagination[1]
@celery_app.task(bind=True)
def workflow(self):
batch_size = 10
flow_a = chord(group(a.s((k,batch_size), max_retries=None) for k in range(5)), dummy_task.s())()
flow_b = chord(group(b.si((k,batch_size), immutable=True, max_retries=None) for k in range(5)), dummy_task.s())()
r = chain(flow_a, flow_b)()
return r
task = workflow.apply_async()
我使用celery -A test worker -l info
来运行芹菜工人。当我尝试执行上面的脚本时,我得到以下错误:
[2018-07-20 22:51:11,221: ERROR/ForkPoolWorker-1] Task test.workflow[aaea266e-bd5a-4414-a42f-83ccb5d16145] raised unexpected: TypeError("unsupported operand type(s) for |: 'AsyncResult' and 'AsyncResult'",)
Traceback (most recent call last):
File "/Users/sohaibfarooqi/projects/code/env/lib/python3.5/site-packages/celery/app/trace.py", line 382, in trace_task
R = retval = fun(*args, **kwargs)
File "/Users/sohaibfarooqi/projects/code/env/lib/python3.5/site-packages/celery/app/trace.py", line 641, in __protected_call__
return self.run(*args, **kwargs)
File "/Users/sohaibfarooqi/projects/code/test.py", line 42, in workflow
r = chain(flow_a, flow_b)()
File "/Users/sohaibfarooqi/projects/code/env/lib/python3.5/site-packages/celery/canvas.py", line 784, in __new__
return reduce(operator.or_, tasks)
TypeError: unsupported operand type(s) for |: 'AsyncResult' and 'AsyncResult'
尽管出现了此错误,但任务执行良好。有人能告诉我我做错了什么,并给我指明正确的方向吗?
芹菜版本:4.2.0
Python版本:3.5.0
在做类似的事情之前,我也遇到了类似的错误
链(task1.si(param1,param2(,task2.si(paramX((((
如果使用"si"代替"s",则它是不可变的,并且第一个任务的结果不会传递给下一个任务。
关于这个的更多信息http://docs.celeryproject.org/en/latest/userguide/canvas.html#immutability