如何将Django交易应用于每个芹菜任务


from celery.task import Task
from django.db import transaction
class MyTask(Task):
    # ...
    def run(self, *args, **kwargs):
        # doesn't work
        with transaction.atomic():
             super().run(*args, **kwargs)
celery_task = celery_app.task(ignore_result=True, base=MyTask)
@celery_task
# @transaction.atomic  # this should work, but I want to add transaction through base task class
def foo_task():
    pass

我需要在不使用其他装饰器的情况下使用celery_task装饰器向每个任务添加原子交易。

尝试覆盖__call__(*args, **kwargs)方法。

class AtomicTask(celery.Task):
    def __call__(self, *args, **kwargs):
        with transaction.atomic():
            return super().__call__(*args, **kwargs)

atomic_task = celery.task(ignore_result=True, base=AtomicTask)
@atomic_task
def update_user(user_id):
    # This code can run only inside a transaction. 
    # Otherwise, the TransactionManagementError will be raised.
    user = User.objects.select_for_update().get(pk=user_id)
    user.first_name = 'John'
    user.save()

最新更新