如何按任务名称限制芹菜任务?



我正在使用Celery来处理来自Django应用程序的异步任务。大多数任务都很短,只需几秒钟即可运行,但我有一个任务可能需要几个小时。

由于我的服务器的处理限制,Celery 配置为一次只能运行 2 个任务。这意味着如果有人启动其中两个长时间运行的任务,它会有效地阻止所有其他 Celery 处理站点几个小时,这是非常糟糕的。

有没有办法配置 Celery,使其一次只处理一种类型的任务?像这样:

@task(max_running_instances=1)
def my_really_long_task():
for i in range(1000000000):
time.sleep(6000)

请注意,我不想取消my_really_long_task的所有其他发布。我只是不希望它们立即开始,并且只有在所有其他同名任务完成后才开始。

由于 Celery 似乎不支持这一点,我目前的黑客解决方案是查询任务中的其他任务,如果我们找到其他正在运行的实例,则重新安排自己稍后运行,例如

from celery.task.control import inspect
def get_all_active_celery_task_names(ignore_id=None):
"""
Returns Celery task names for all running tasks.
"""
i = inspect()
task_names = defaultdict(int) # {name: count}
if i:
active = i.active()
if active is not None:
for worker_name, tasks in i.active().iteritems():
for task in tasks:
if ignore_id and task['id'] == ignore_id:
continue
task_names[task['name']] += 1
return task_names
@task
def my_really_long_task():
all_names = get_all_active_celery_task_names()
if 'my_really_long_task' in all_names:
my_really_long_task.retry(max_retries=100, countdown=random.randint(10, 300))
return
for i in range(1000000000):
time.sleep(6000)

有没有更好的方法可以做到这一点?

我知道其他像这样的黑客解决方案,但是设置一个单独的memcache服务器来跟踪任务的唯一性甚至更不可靠,并且比我上面使用的方法更复杂。

另一种解决方案是将my_really_long_task排队到单独的队列中。

my_really_long_task.apply_async(*args, queue='foo')

然后启动并发性为 1 的工作线程来使用这些任务,以便一次只执行 1 个任务。

celery -A foo worker -l info -Q foo

相关内容

  • 没有找到相关文章

最新更新