假设我有一个芹菜任务,它需要两个参数:X(a,b)
我需要使用以下两个规则来实现自定义并发逻辑:
-
如果
X
的实例具有不同的a
值,则它们可以同时运行。也就是说,如果当X(a=2,b=20)
被添加到队列时X(a=1,b=10)
正在运行,则后者被从队列中拉出并立即执行。 -
如果
X
的实例具有与a
相同的值,则它们不能同时运行。也就是说,如果在将X(a=1,b=20)
添加到队列时X(a=1,b=10)
正在运行,则后者必须在队列上等待,直到前者完成为止。
规则#1通过设置worker_concurrency>1
(docs)与芹菜一起开箱即用。第二条规则很棘手。
分布式任务锁定,如文档和本博客中所述,是一种让我接近所需的方法。甚至还有一些库可以为您实现它(芹菜单例)。然而,回过头来看规则#2,这种方法似乎可以防止第二个任务排队,直到第一个任务完成。我需要它排队,只是在第一个任务完成之前不要在工作人员上执行。
有没有办法实现这一点?这个SO问题提出了一个类似的问题,但到目前为止还没有答案。
这似乎是使用redis和绑定芹菜任务的一个很好的例子。如果你还没有这样做,你也可以使用redis作为你的芹菜代理,如果你需要的话,也可以使用它作为缓存层。这真是一把瑞士军刀。部署redis也非常简单。我强烈鼓励任何人更加熟悉它。它是一个很好的工具。
我会稍微改变一下这个例子,因为我总是被单字符函数和变量弄糊涂。
# Think of this as X(a,b) from the question
@task
def add(num1, num2):
return num1 + num2
然后我们可以升级add
,使其看起来更像这样:
# "bind" the task so we have access to all the Task base class functionality
# via "self".
# https://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.retry
@task(bind=True)
def add(self, num1, num2):
if does_running_task_exist_with(num1):
# requeue. Please visit the docs for "retry" mentioned above.
# There are also max_retries and some other nice things.
# Try again in 10s
self.retry(countdown=10)
return
return num1 + num2
然后,我们的does_running_task_exist_with
辅助函数将使用redisSet。像所有Set实现一样,它们保证唯一性,并且可以快速检查成员的存在。
# Using https://github.com/andymccurdy/redis-py
import redis
def does_running_task_exist_with(some_number):
# Connect to redis.
# Using database number 2. You might be using db 0 for celery brokerage,
# and db 1 for celery result storage. Using a separate DB is just nice
# for isolation. Redis has up to 16.
# Connects to localhost by default.
redis_conn = redis.StrictRedis(db=2)
# we try adding this number to the Set of currently processing numbers
# https://redis.io/commands/sadd
# Return value: the number of elements that were added to the set,
# not including all the elements already present into the set.
members_added = redis_conn.sadd("manager_task_args", str(some_number))
# Or shortcut it as "return members_added == 0". This here is
# more expressive though
if members_added == 0:
return True
return False
好吧。现在跟踪和决策已经到位。缺少的一件重要的事情是:一旦add
任务完成,我们需要从redis集中删除num1
。让我们稍微调整一下函数。
import redis
@task(bind=True)
def add(self, num1, num2):
if does_running_task_exist_with(num1):
self.retry(countdown=10)
return
# Do actual work…
result = num1 + num2
# Cleanup
redis_conn = redis.StrictRedis(db=2)
redis_conn.srem("manager_task_args", str(num1))
return result
但如果出了问题怎么办?如果添加失败怎么办?然后我们的num1
永远不会从集合中删除,我们的队列开始变得越来越长。我们不想那样。您可以在这里做两件事:要么用on_failure
方法创建一个基于类的任务,要么在try中包装它,除非finally。我们将走最后一次尝试的路线,因为在这种情况下更容易遵循:
import redis
@task(bind=True)
def add(self, num1, num2):
if does_running_task_exist_with(num1):
self.retry(countdown=10)
return
try:
result = num1 + num2
finally:
redis_conn = redis.StrictRedis(db=2)
redis_conn.srem("manager_task_args", str(num1))
return result
注意,如果你有大量的任务,你可能还想研究redis连接池。
我也遇到过同样的问题,只需使用CeleryOnce就可以解决简单地说,这个库为芹菜任务提供了一个基类,并使用Redis或文件备份的锁定机制。您甚至可以编写自己的Celery Task类,并从该库的QueueOnce类继承它。