我想明确地从芹菜中撤销一个任务。这是我目前正在做的:-
from celery.task.control import revoke
revoke(task_id, terminate=True)
其中task_id是string
(也尝试将其转换为UUID uuid.UUID(task_id).hex)
)。
在上述过程之后,当我再次启动芹菜celery worker -A proj
时,它仍然消耗相同的消息并开始处理它。为什么?
当通过flower
查看时,消息仍然在代理部分中。如何删除消息,使其不再被使用?
revoke
是如何工作的?
当调用revoke
方法时,任务不会立即从队列中删除,它所做的只是告诉芹菜(不是您的代理!)将task_id
保存在内存中的set
中(如果您像我一样喜欢阅读源代码,请查看这里)。
当任务到达队列顶部时,Celery将检查它是否在已撤销集合中,如果是,它将不执行它。
这样做是为了防止对每个revoke
调用进行O(n)搜索,其中检查task_id是否在内存集中只是O(1)
为什么在重新启动芹菜后,你撤销的任务执行?
理解事情是如何工作的,你现在知道set
只是一个普通的python集合,被保存在内存中-这意味着当你重新启动时,你会丢失这个集合,但任务(当然)是持久化的,当任务来临时,它将像往常一样执行。
你需要有一个持久化集,这是通过初始化你的worker来完成的:
celery worker -A proj --statedb=/var/run/celery/worker.state
这将把集合保存在文件系统上。
引用:
- 内存集芹菜源代码
- 吊销医生
- 持久化撤销文档