我在django项目中使用celery任务,并如本文所述使用锁。它效果很好,但是我的任务创建了一个对象,我不希望在数据库中释放该对象之前释放锁。我如何更改此上下文管理器等待任务中的对象?
@contextmanager
def lock(lock_id, oid, expire=600):
timeout_at = monotonic() + expire - 3
status = cache.add(lock_id, oid, expire)
try:
yield status
finally:
if monotonic() < timeout_at:
cache.delete(lock_id)
@celery.task(bind=True, ignore_result=True)
def my_task(self, object_id):
with lock('my_task.{}'.format(object_id), self.app.oid) as acquired, transaction.atomic():
if not acquired:
self.retry(countdown=1)
def on_commit():
# release the lock only in this moment
pass
transaction.on_commit(on_commit)
MyModel.objects.create(object_id=object_id)
此上下文管理器创建锁定并将身体包裹在事务中。它仅在进行交易或升级(celery.exceptions.Retry
除外)时才释放锁。
如芹菜文档中所述:
为了使此操作正确,您需要使用.ADD操作为原子的缓存后端。众所周知,Memcached可以为此目的运作良好。
from celery.exceptions import Retry
from contextlib import contextmanager
from time import monotonic
from django.core.cache import cache
from django.db import transaction
@contextmanager
def lock_transaction(lock_id, oid, expire=600):
status = cache.add(lock_id, oid, expire)
timeout_at = monotonic() + expire - 3
is_retry = False
def on_commit():
if not is_retry and monotonic() < timeout_at:
cache.delete(lock_id)
with transaction.atomic():
transaction.on_commit(on_commit)
try:
yield status
except Retry as e:
is_retry = True
except:
if monotonic() < timeout_at:
cache.delete(lock_id)
raise
使用:
的示例@celery.task(bind=True, ignore_result=True, max_retries=90, time_limit=60)
def create_or_add_counter_task(self, object_id):
with lock_transaction('object_id.{}'.format(object_id), self.app.oid) as acquired:
if not acquired:
self.retry(countdown=1)
try:
obj = MyModel.objects.get(object_id=object_id)
obj.counter += 1
obj.save()
except MyModel.DoesNotExist:
MyModel.objects.create(object_id=object_id)