我正在尝试创建一个 Celery 和弦来运行任务作为回调,即当队列中的其他人被标记为完成时。
问题是我仍然遇到此错误:KeyError: 'kwargs'
,我不知道为什么。
这是我的代码:
@celery_app.task(
base=Base,
bind=True,
ignore_result=True
)
def callback(self, date=None, queries=[], user_email=None, db_session=None, prestodb_session=None):
# Do something
chain(
another_task.si(user_email=user_email, db_session=db_session),
group(*tasks)
).apply_async()
@celery_app.task(
base=Base,
bind=True,
ignore_result=False
)
def run(self, user_email, pg_session=None, prestodb_session=None):
chord(
task1.s(
user_email=user_email,
db_session=pg_session
),
task2.s(
user_email=user_email,
db_session=pg_session
),
task3.s(
user_email=user_email,
pg_session=pg_session,
prestodb_session=prestodb_session
)
)(
callback.s(
user_email=user_email,
date=self.context.run_time,
db_session=pg_session,
prestodb_session=prestodb_session
)
)
和错误:
Traceback (most recent call last):
File "/home/virtualenv/lib/python3.6/site-packages/celery/app/trace.py", line 382, in trace_task
R = retval = fun(*args, **kwargs)
File "/home/virtualenv/lib/python3.6/site-packages/celery/app/trace.py", line 641, in __protected_call__
return self.run(*args, **kwargs)
File "/home/app/app/tasks/tasks.py", line 87, in run
prestodb_session=prestodb_session
File "/home/virtualenv/lib/python3.6/site-packages/celery/canvas.py", line 1189, in __call__
return self.apply_async((), {'body': body} if body else {}, **options)
File "/home/virtualenv/lib/python3.6/site-packages/celery/canvas.py", line 1223, in apply_async
kwargs = dict(self.kwargs['kwargs'], **kwargs)
KeyError: 'kwargs'
我尽我所能:将签名从可变更改为不可变,在回调方法定义中添加 *args 和 **kwargs 作为参数,删除所有参数 bu 我仍然有问题。
我不知道我怎样才能做到这一点。你能帮我吗?谢谢!!
--
蟒蛇版本:3.6 芹菜版本:4.2.1
显然,我还不能发表评论。 您如何运行run
任务? 我认为问题可能出在您的一项任务上,我尝试了类似的东西,只需打印一个字符串的简单任务就可以工作。
你看到有什么东西被执行了吗?也许是和弦?