芹菜任务未从队列中完全删除



我有一个自称的芹菜任务(带有do_stuff.apply_async(queue="foo"))。以前我运行了app.control.add_consumer("foo", reply=True),所以我的工作人员可以从这个队列中消费。

一段时间后,我想停止该队列中的所有任务以及从do_stuff启动的所有正在运行的任务。

所以我运行了这段代码:

app.control.cancel_consumer("foo", reply=True)
i = app.control.inspect()
for queue in [i.active, i.scheduled, i.reserved]:
    for worker_name, worker_tasks in queue().items():
        for task in worker_tasks:
            args = ast.literal_eval(task["args"])
            if "do_stuff" in task["name"] and args[0] == crawler.name:
                app.control.revoke(task["id"], terminate=True)

这种"工作"有点。它确实会阻止所有正在运行的任务do_stuff并且确实清除了计划任务(或者至少在运行此代码后,我在 Flower 中看不到任何任务)。

问题是,如果我再次运行app.control.add_consumer("foo", reply=True),而不运行任何其他内容,新任务将开始运行。这意味着芹菜/redis以某种方式设法将任务保留在某个地方。

为什么会这样?这些"隐藏"的任务保存在哪里?我怎样才能删除它们?

回答我自己的问题:发生这种情况是因为当我让工人不要从队列中消费(通过调用cancel_consumer),队列本身仍然包含所有内容。

我找到了一种(以编程方式)刷新队列的方法:

from celery.bin.celery import CeleryCommand
cmd = CeleryCommand()
super(CeleryCommand, cmd).execute_from_commandline([
    '',
    'purge',
    '-f',
    '-Q', queue_name,
    '-A', 'main'
])

相关内容

  • 没有找到相关文章

最新更新