以编程方式撤销芹菜任务的正确过程



我写了一个用于撤销芹菜任务的小 Python 脚本:

from mydjangoproj.celery import app
i = app.control.inspect()
active = i.active(safe=True)
reserved = i.reserved(safe=True)
# Get the uuids for the active and reserved tasks
revoke_ids_list_a = [task['id'] for (worker, tasks) in active_filtered.iteritems() for task in tasks]
revoke_ids_list_r = [task['id'] for (worker, tasks) in reserved_filtered.iteritems() for task in tasks]
# combine them into one big list
revoke_ids_list = revoke_ids_list_a + revoke_ids_list_r
# Perform the revoking
revoked = []
for tid in revoke_ids_list:
    app.control.revoke(tid, terminate=True)
    revoked.append(tid)

似乎这种工作。尽管如此,通常还是有一些剩余的任务不会被撤销,而且我似乎偶尔也需要多次运行脚本来杀死所有内容。

我错过了什么吗?(我知道停止守护进程并发出celery -A mydjangoproj purge -f就可以了,但由于各种原因我不想清除)

另一个问题是,在重新启动 celery 守护程序并 beat 后,我认为已被撤销的一些任务在 worker 中再次开始运行。这是为什么呢?

这取决于

您在项目上使用的后端,revoke() 仅与 AMQP 和 Redis 代理一起使用。

http://docs.celeryproject.org/en/latest/userguide/workers.html#remote-control

请注意,远程控制命令必须有效,吊销才能正常工作。 远程控制命令仅受 RabbitMQ (amqp) 和 此时的雷迪斯。

最新更新