芹菜装饰师



我有一个执行批量更新的任务。此外,此任务会将任务的状态发送到某个位置,例如,任务已启动,任务已成功完成。我想把这个动作包装在一个装饰器中,这样我就可以在后续任务中使用它。但我遇到了一个问题,我不能将装饰器附加到芹菜任务中。

@celery.task
def change_statuses(*args, **kwargs) -> None:
call class which update status of task to pending
making bulk update
call class which update status of task to success

所以我想把这个装饰器作为基础装饰器

def update_state(state):
def decorator(func):
def wrapper(*args, **kwargs):
call class{state} which update status of task to pending
func(*args, **kwargs)
call class{state} which update status of task to success
return wrapper
return decorator

并在每个芹菜任务中调用这个装饰器,例如:

@celery.task
@update_state(UpdateBuildingsStatusEvent)
def change_status(*args, **kwargs) -> None:
making bulk update

但我有错误:

The full contents of the message body was:
'{"task": "tasks.buildings_changes_statuses.change_status", "name": "change_status", "id": "******", "args": [], "kwargs": {"building_ids": [1, 2], "macroservice_id": 1, "user_id": 1, "macroservice_status": "connect", "task_id": "***"}}' (314b)
Traceback (most recent call last):
File "/home/aidar/Work/services.background_tasks/.env/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 581, in on_task_received
strategy = strategies[type_]
KeyError: 'tasks.buildings_changes_statuses.change_status'

我怀疑您遇到的问题是celery.task装饰器不再在名为change_status的函数上被调用(该函数在其他地方被名称引用(。相反,它被应用于名为wrapper的decorator的返回值。由于它有一个不同的名称,celery代码不知道它代表的是change_status

为了解决这个问题,您可能可以使用functools.wraps来更新wrapper函数的各种属性,以匹配它所包含的函数。试试这个:

from functools import wraps
def update_state(state):
def decorator(func):
@wraps(func)                     # apply fuctools.wraps here
def wrapper(*args, **kwargs):
...
return wrapper
return decorator

相关内容

  • 没有找到相关文章

最新更新