使用django,芹菜和redis的逐一任务



我使用 django、celery 和 redis 异步启动任务。

# tasks.py
@shared_task
def my_task():
# Do stuff of task 1
return True
# Somewhere in test1.py
my_task.delay()
# Few milli seconds later in test2.py
my_task.delay()

使用该配置,my_task在 2 个不同的文件上启动 2 次。因此,它们几乎同时在不同的线程上执行。

我需要这两个任务一一执行。如果my_task #1 正在执行并且启动了另一个my_task #2,我需要my_task #2 等待 #1 结束再执行。

我不想只使用一个线程将参数传递给芹菜celery worker --concurrency=1

我 settings.py 中的芹菜配置是基本的:

# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'

我找到了很多谈论这个主题的资源,但我真的不明白如何实现我的目标

  • https://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html#ensuring-a-task-is-only-executed-one-at-a-time
  • 用芹菜运行"独特"任务
  • http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html

http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html 的解决方案几乎有效。以下是一些改编:

import redis
REDIS_CLIENT = redis.Redis()
def only_one(function=None, key="", timeout=None):
"""Enforce only one celery task at a time."""
def _dec(run_func):
"""Decorator."""
def _caller(*args, **kwargs):
"""Caller."""
ret_value = None
have_lock = False
lock = REDIS_CLIENT.lock(key, timeout=timeout)
try:
have_lock = lock.acquire(blocking=True)
if have_lock:
ret_value = run_func(*args, **kwargs)
finally:
if have_lock:
lock.release()
return ret_value
return _caller
return _dec(function) if function is not None else _dec
@task(name='my_app.sample.tasks.single_task')
@only_one(key="SingleTask", timeout=60 * 5)
def single_task(self, **kwargs):
"""Run task."""
print("test")

问题是,我没有在 settings.py 的任何地方配置 Redis,所以我不明白它是如何找到正确的 redis 数据库的。我想它来自芹菜的配置。

相关内容

  • 没有找到相关文章

最新更新