我有一个执行批量更新的任务。此外,此任务会将任务的状态发送到某个位置,例如,任务已启动,任务已成功完成。我想把这个动作包装在一个装饰器中,这样我就可以在后续任务中使用它。但我遇到了一个问题,我不能将装饰器附加到芹菜任务中。
@celery.task
def change_statuses(*args, **kwargs) -> None:
call class which update status of task to pending
making bulk update
call class which update status of task to success
所以我想把这个装饰器作为基础装饰器
def update_state(state):
def decorator(func):
def wrapper(*args, **kwargs):
call class{state} which update status of task to pending
func(*args, **kwargs)
call class{state} which update status of task to success
return wrapper
return decorator
并在每个芹菜任务中调用这个装饰器,例如:
@celery.task
@update_state(UpdateBuildingsStatusEvent)
def change_status(*args, **kwargs) -> None:
making bulk update
但我有错误:
The full contents of the message body was:
'{"task": "tasks.buildings_changes_statuses.change_status", "name": "change_status", "id": "******", "args": [], "kwargs": {"building_ids": [1, 2], "macroservice_id": 1, "user_id": 1, "macroservice_status": "connect", "task_id": "***"}}' (314b)
Traceback (most recent call last):
File "/home/aidar/Work/services.background_tasks/.env/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 581, in on_task_received
strategy = strategies[type_]
KeyError: 'tasks.buildings_changes_statuses.change_status'
我怀疑您遇到的问题是celery.task
装饰器不再在名为change_status
的函数上被调用(该函数在其他地方被名称引用(。相反,它被应用于名为wrapper
的decorator的返回值。由于它有一个不同的名称,celery
代码不知道它代表的是change_status
。
为了解决这个问题,您可能可以使用functools.wraps
来更新wrapper
函数的各种属性,以匹配它所包含的函数。试试这个:
from functools import wraps
def update_state(state):
def decorator(func):
@wraps(func) # apply fuctools.wraps here
def wrapper(*args, **kwargs):
...
return wrapper
return decorator