from celery.task import Task
from django.db import transaction
class MyTask(Task):
# ...
def run(self, *args, **kwargs):
# doesn't work
with transaction.atomic():
super().run(*args, **kwargs)
celery_task = celery_app.task(ignore_result=True, base=MyTask)
@celery_task
# @transaction.atomic # this should work, but I want to add transaction through base task class
def foo_task():
pass
我需要在不使用其他装饰器的情况下使用celery_task
装饰器向每个任务添加原子交易。
尝试覆盖__call__(*args, **kwargs)
方法。
class AtomicTask(celery.Task):
def __call__(self, *args, **kwargs):
with transaction.atomic():
return super().__call__(*args, **kwargs)
atomic_task = celery.task(ignore_result=True, base=AtomicTask)
@atomic_task
def update_user(user_id):
# This code can run only inside a transaction.
# Otherwise, the TransactionManagementError will be raised.
user = User.objects.select_for_update().get(pk=user_id)
user.first_name = 'John'
user.save()