我使用芹菜(3.0.15)与Redis作为代理。
是否有一种简单的方法来查询存在于芹菜队列中的给定名称的任务数量?
并且,作为后续问题,是否有一种方法可以取消芹菜队列中存在的具有给定名称的所有任务?
我已经看过了《监控和管理指南》,但没有看到解决方案。
# Retrieve tasks
# Reference: http://docs.celeryproject.org/en/latest/reference/celery.events.state.html
query = celery.events.state.tasks_by_type(your_task_name)
# Kill tasks
# Reference: http://docs.celeryproject.org/en/latest/userguide/workers.html#revoking-tasks
for uuid, task in query:
celery.control.revoke(uuid, terminate=True)
有一个问题是以前的答案没有解决的,如果人们没有意识到这一点,可能会让他们感到困惑。
在已经发布的解决方案中,我将使用Danielle的解决方案进行一个小修改:我将任务导入到我的文件中,并使用其.name
属性来获取任务名称以传递给.tasks_by_type()
。
app.control.revoke(
[uuid for uuid, _ in
celery.events.state.State().tasks_by_type(task.name)])
然而,此解决方案将忽略那些已计划为将来执行的任务。像一些评论其他答案的人一样,当我检查.tasks_by_type()
返回时,我有一个空列表。事实上,我的队伍是空的。但我知道有任务计划在未来执行,这些是我的主要目标。我可以通过执行celery -A [app] inspect scheduled
看到它们,但是它们不受上面代码的影响。
我设法撤销计划任务,执行以下命令:
app.control.revoke(
[scheduled["request"]["id"] for scheduled in
chain.from_iterable(app.control.inspect().scheduled()
.itervalues())])
app.control.inspect().scheduled()
返回一个字典,它的键是工人名,值是调度信息列表(因此,需要从itertools
导入chain.from_iterable
)。任务信息在调度信息的"request"
字段中,"id"
为任务id。请注意,即使在撤消之后,计划任务仍将显示在计划任务中。被撤销的计划任务不会从计划任务列表中删除,直到它们的计时器到期或直到Celery执行一些清理操作。(重启worker会触发这样的清理)
和芹菜一样,这里的答案都不适合我,所以我做了我通常的事情,并把一个解决方案直接检查redis。
# First, get a list of tasks from redis:
import redis, json
r = redis.Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
# Now import the task you want so you can get its name
from my_django.tasks import my_task
# Now, import your celery app and iterate over all tasks
# from redis and nuke the ones that have a matching name.
from my_django.celery_init import app
for task in l:
task_headers = json.loads(task)['headers']
task_name = task_headers["task"]
if task_name == my_task.name:
task_id = task_headers['id']
print("Terminating: %s" % task_id)
app.control.revoke(task_id, terminate=True)
请注意,以这种方式撤销可能不会撤销预取的任务,因此您可能不会立即看到结果。
同样,这个答案不支持优先级任务。如果你想修改它,你会想要一些技巧在我的另一个答案,hack redis。
您可以在一个请求中完成此操作:
app.control.revoke([
uuid
for uuid, _ in
celery.events.state.State().tasks_by_type(task_name)
])
看起来flower
提供了监控:
使用芹菜事件进行实时监控
任务进度和历史记录显示任务详细信息(参数、图和统计远程控制
查看worker状态和统计信息关闭并重启worker控制工作池大小和自动缩放设置修改工作实例当前从View中消耗的队列running tasks查看计划任务(ETA/countdown)查看预留任务和撤销的任务应用时间和速率限制配置查看器撤销或终止任务HTTP API
OpenID身份验证