django.core.cache.lock 在 Celery 任务中不起作用



我有以下(tasks.py(:

from celery import shared_task
from django.core.cache import cache
@shared_task
def test(param1: str) -> None:
with cache.lock("lock-1"):
print("hello")

当我做test.delay()时,没有打印任何内容,这让我相信cache.lock("lock-1")有问题。

我使用Redis作为我的缓存和Celery后端,这是在settings.py中配置的。

这里怎么了?如果django.core.cache不能用作锁定机制(为了确保一次只能运行一个test,可以使用什么?(谢谢!

弄清楚了原因-原来我没有阅读文档。

添加此项有效:

# https://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html
@contextmanager
def redis_lock(lock_id):
timeout_at = time.monotonic() + LOCK_EXPIRE - 3
# cache.add fails if the key already exists
# Second value is arbitrary
status = cache.add(lock_id, "lock", timeout=LOCK_EXPIRE)
try:
yield status
finally:
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if time.monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_id)

然后,锁定语句变成

with redis_lock("lock-1"):

最新更新