从芹菜中撤销一个任务



我想明确地从芹菜中撤销一个任务。这是我目前正在做的:-

from celery.task.control import revoke
revoke(task_id, terminate=True)

其中task_id是string(也尝试将其转换为UUID uuid.UUID(task_id).hex))。

在上述过程之后,当我再次启动芹菜celery worker -A proj时,它仍然消耗相同的消息并开始处理它。为什么?

当通过flower查看时,消息仍然在代理部分中。如何删除消息,使其不再被使用?

revoke是如何工作的?

当调用revoke方法时,任务不会立即从队列中删除,它所做的只是告诉芹菜(不是您的代理!)将task_id保存在内存中的set中(如果您像我一样喜欢阅读源代码,请查看这里)。

当任务到达队列顶部时,Celery将检查它是否在已撤销集合中,如果是,它将不执行它。

这样做是为了防止对每个revoke调用进行O(n)搜索,其中检查task_id是否在内存集中只是O(1)

为什么在重新启动芹菜后,你撤销的任务执行?

理解事情是如何工作的,你现在知道set只是一个普通的python集合,被保存在内存中-这意味着当你重新启动时,你会丢失这个集合,但任务(当然)是持久化的,当任务来临时,它将像往常一样执行。

你能做什么?

你需要有一个持久化集,这是通过初始化你的worker来完成的:

celery worker -A proj --statedb=/var/run/celery/worker.state

这将把集合保存在文件系统上。

引用:

  • 内存集芹菜源代码
  • 吊销医生
  • 持久化撤销文档

相关内容

  • 没有找到相关文章

最新更新