芹菜+Django——带有数据库相关任务的原子事务



在我目前使用Django, Docker-Compose和芹菜(以及其他东西)的项目中,基本的上传文件函数insertIntoDatabase是从任务中调用的,而在views.py中,任务是用delay调用的。

在databaseinserter.py:

def insertIntoDatabase(datapoints, user, description): # datapoints is a list of dictionaries, user and description are just strings
    # convert data and upload to our database
在tasks.py:

@app.task()
def db_ins_task(datapoints, user, description):
    from databaseinserter import insertIntoDatabase
    insertIntoDatabase(datapoints, user, description)
在views.py:

with transaction.atomic():
    db_ins_task.delay(datapoints, user, description)

在芹菜被引入到项目之前,insertIntoDatabase只是直接在views.py中调用,所以任何无效的数据点列表(即格式不正确)都不会被插入,整个上传将被取消并回滚。但是,既然上传是在异步的芹菜任务中,那么无效的上传就不能再正确回滚了。我怎么能确保一个无效的上传仍然被取消和撤消,现在上传是一个任务?看起来Django 1.9有一些新的东西可能是我需要的:transaction.on_commit。然而,目前切换到1.9的主要问题是,Django-Hstore在我们的项目中似乎不是一个重要的依赖,是兼容的。1.9也在alpha中,所以即使两者兼容,目前也不理想使用。在Django 1.8中有办法做到这一点吗?

我还研究了django - transaction_barrier,并尝试使用它,但没有运气。在tasks.py中,我将任务更改为

@task(base=TransactionBarrierTask)
def db_ins_task(datapoints, user, description):
    from databaseinserter import insertIntoDatabase
    insertIntoDatabase(datapoints, user, description)

在views.py中我更改了任务执行:

with transaction.atomic():
    db_ins_task.apply_async_with_barrier(args=(data, user, description,))
但是,这里的主要问题是,一旦接收到任务,Celery就会抛出一个关于意外关键字参数的错误:
worker_1   | Traceback (most recent call last):
worker_1   |   File "/usr/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
worker_1   |     R = retval = fun(*args, **kwargs)
worker_1   |   File "/usr/local/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
worker_1   |     return self.run(*args, **kwargs)
worker_1   | TypeError: db_ins_task() got an unexpected keyword argument '__transaction_barrier'

那么,最好的方法是什么呢?我应该继续尝试使用django_transaction_barrier(如果我确实在使用它做正确的事情)吗?如果是这样,我做错了什么/缺失会导致错误?如果没有,有什么更好的方法从数据库中清除无效上传?

芹菜是一个异步任务运行器,基本上一旦任务被交给芹菜,它就会被遗忘。你不能有跨进程边界的事务,因为芹菜将作为工作线程运行。

您总是可以运行另一个任务来查找无效数据点并清理数据库。简而言之,你想要一个两阶段提交的分布式事务,这并不容易实现,因为它有自己的问题,而且不确定Python是否可用。

您考虑过将transaction.atomic语句移动到任务中吗?或者甚至插入函数本身?

最新更新