释放芹菜锁定时



我在django项目中使用celery任务,并如本文所述使用锁。它效果很好,但是我的任务创建了一个对象,我不希望在数据库中释放该对象之前释放锁。我如何更改此上下文管理器等待任务中的对象?

@contextmanager
def lock(lock_id, oid, expire=600):
    timeout_at = monotonic() + expire - 3
    status = cache.add(lock_id, oid, expire)
    try:
        yield status
    finally:
        if monotonic() < timeout_at:
            cache.delete(lock_id)
@celery.task(bind=True, ignore_result=True)
def my_task(self, object_id):
    with lock('my_task.{}'.format(object_id), self.app.oid) as acquired, transaction.atomic():
        if not acquired:
            self.retry(countdown=1)
        def on_commit():
            # release the lock only in this moment
            pass
        transaction.on_commit(on_commit)
        MyModel.objects.create(object_id=object_id)

此上下文管理器创建锁定并将身体包裹在事务中。它仅在进行交易或升级(celery.exceptions.Retry除外)时才释放锁。

如芹菜文档中所述:

为了使此操作正确,您需要使用.ADD操作为原子的缓存后端。众所周知,Memcached可以为此目的运作良好。

from celery.exceptions import Retry
from contextlib import contextmanager
from time import monotonic
from django.core.cache import cache
from django.db import transaction

@contextmanager
def lock_transaction(lock_id, oid, expire=600):
    status = cache.add(lock_id, oid, expire)
    timeout_at = monotonic() + expire - 3
    is_retry = False
    def on_commit():
        if not is_retry and monotonic() < timeout_at:
            cache.delete(lock_id)
    with transaction.atomic():
        transaction.on_commit(on_commit)
        try:
            yield status
        except Retry as e:
            is_retry = True
        except:
            if monotonic() < timeout_at:
                cache.delete(lock_id)
            raise

使用:

的示例
@celery.task(bind=True, ignore_result=True, max_retries=90, time_limit=60)
def create_or_add_counter_task(self, object_id):
    with lock_transaction('object_id.{}'.format(object_id), self.app.oid) as acquired:
        if not acquired:
            self.retry(countdown=1)
        try:
            obj = MyModel.objects.get(object_id=object_id)
            obj.counter += 1
            obj.save()
        except MyModel.DoesNotExist:
            MyModel.objects.create(object_id=object_id)

相关内容

  • 没有找到相关文章

最新更新