如何按名称从Celery队列中删除任务



我正试图找到一种方法,从Celery队列中删除所有当前排队的具有特定名称的任务。从官方文件中,我知道我可以通过查找工人的姓名,然后获取他们的身份证来检查他们并撤销任务,比如:

def drop_celery_task(options):
def _get_tasks_id(workers: list, tasks_ids: list, task_name: str):
"""
Get task ids with the given name included inside the given `workers` tasks.
{'worker1.example.com': [
{'name': 'tasks.sleeptask', 'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf',
'args': '(8,)', 'kwargs': '{}'}]
}
"""
for worker in workers:
if not workers[worker]:
continue
for _task in workers[worker]:
if _task["name"].split(".")[-1] == task_name:
tasks_ids.append(_task["id"])
task_name = options.drop_celery_task["name"]
i = Inspect(app=celery_app)  # Inspect all nodes.
registered = i.registered()
if not registered:
raise Exception("No registered tasks found")
if not any(task_name == worker.split(".")[-1] for worker in chain(*list(registered.values()))):
raise Exception(f"Task not registered: {task_name}")
tasks_ids = []
_get_tasks_id(i.active(), tasks_ids, task_name)
_get_tasks_id(i.scheduled(), tasks_ids, task_name)
_get_tasks_id(i.reserved(), tasks_ids, task_name)
if tasks_ids:
for task_id in tasks_ids:
Control(app=celery_app).revoke(task_id)
else:
logging.info(f"No active/scheduled/registered task found with the name {task_name}")

但是,这段代码只撤销了芹菜工作者提取或预提取的任务,而不是仍在队列中的任务(使用Redis作为后端(。关于如何使用芹菜命令删除Redis中的任务,或者防止工作人员接受指定名称的任务,有什么建议吗?

我最终在Redis中用我想要的名称识别了任务的ID(使用Redis客户端,而不是芹菜命令(,然后通过Control(app=celery_app).revoke(task_id)命令撤销了这些ID。在Redis中,队列是带有队列名称的键下的列表对象。

相关内容

  • 没有找到相关文章

最新更新