删除celery / rabbitmq中所有挂起的任务



如何在不知道每个任务的task_id的情况下删除所有挂起的任务?

From the docs:

$ celery -A proj purge

from proj.celery import app
app.control.purge()

(编辑:更新当前方法)

对于芹菜3.0+:

$ celery purge

清除特定队列:

$ celery -Q queue_name purge

For芹菜2。X和3.x:

当使用带有-Q参数的worker定义队列时,例如

celery worker -Q queue1,queue2,queue3

celery purge将无法工作,因为您无法将队列参数传递给它。它只会删除默认队列。解决方案是启动您的工作与--purge参数如下:

celery worker -Q queue1,queue2,queue3 --purge

这将运行worker。

另一个选项是使用芹菜的amqp子命令

celery amqp queue.delete queue1
celery amqp queue.delete queue2
celery amqp queue.delete queue3

In芹菜3+:

CLI:

$ celery -A proj purge

编程:

>>> from proj.celery import app
>>> app.control.purge()

http://docs.celeryproject.org/en/latest/faq.html#how-do-i-purge-all-waiting-tasks

我发现celery purge不适合我更复杂的芹菜配置。我使用多个命名队列用于不同的目的:

$ sudo rabbitmqctl list_queues -p celery name messages consumers
Listing queues ...  # Output sorted, whitespaced for readability
celery                                          0   2
celery@web01.celery.pidbox                      0   1
celery@web02.celery.pidbox                      0   1
apns                                            0   1
apns@web01.celery.pidbox                        0   1
analytics                                       1   1
analytics@web01.celery.pidbox                   0   1
bcast.361093f1-de68-46c5-adff-d49ea8f164c0      0   1
bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1      0   1
celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54   0   1
celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866   0   1
celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99   0   1
celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e   0   1

第一列是队列名称,第二列是队列中等待的消息数量,第三列是该队列的侦听器数量。队列如下:

  • celery -标准的,幂等的芹菜任务队列
  • apns - Apple推送通知服务任务队列,不完全是幂等的
  • analytics -长时间运行的夜间分析队列
  • *。pidbox -工人命令队列,如关闭和重置,每个工人一个(2个芹菜工人,一个apns工人,一个分析工人)
  • 广播地址。广播队列,用于向侦听队列的所有工作人员发送消息(而不仅仅是第一个获取它)
  • celeryev。芹菜事件队列,用于报告任务分析

分析任务是一个蛮力任务,在小数据集上工作得很好,但现在需要超过24小时来处理。偶尔,会出现一些问题,它会在等待数据库时卡住。它需要重写,但在此之前,当它卡住时,我将终止任务,清空队列,然后再次尝试。我通过查看分析队列的消息计数来检测"阻塞",该计数应该是0(已完成的分析)或1(等待昨晚的分析完成)。2或更高的是不好的,我会收到一封电子邮件。

celery purge提供从一个广播队列中擦除任务,并且我没有看到选择不同命名队列的选项。

这是我的过程:

$ sudo /etc/init.d/celeryd stop  # Wait for analytics task to be last one, Ctrl-C
$ ps -ef | grep analytics  # Get the PID of the worker, not the root PID reported by celery
$ sudo kill <PID>
$ sudo /etc/init.d/celeryd stop  # Confim dead
$ python manage.py celery amqp queue.purge analytics
$ sudo rabbitmqctl list_queues -p celery name messages consumers  # Confirm messages is 0
$ sudo /etc/init.d/celeryd start

如果你想删除所有挂起的任务,以及活动和保留的任务,以完全停止芹菜,这是对我有用的:

from proj.celery import app
from celery.task.control import inspect, revoke
# remove pending tasks
app.control.purge()
# remove active tasks
i = inspect()
jobs = i.active()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)
# remove reserved tasks
jobs = i.reserved()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)

In芹菜3+

http://docs.celeryproject.org/en/3.1/faq.html how-do-i-purge-all-waiting-tasks

CLI

清除命名队列:

 celery -A proj amqp queue.purge <queue name>

清除配置队列

celery -A proj purge

我已经清除了消息,但是队列中仍然有消息?答:任务在实际执行时就会得到确认(从队列中删除)。当worker接收到一个任务后,它需要一段时间才能真正执行,特别是当有很多任务已经在等待执行时。未被确认的消息由worker保存,直到它关闭与代理(AMQP服务器)的连接。当该连接关闭时(例如,因为worker被停止),任务将由broker重新发送到下一个可用的worker(或重新启动时的同一个worker),因此要正确清除等待任务队列,必须停止所有worker,然后使用celery.control.purge()清除任务。

因此,要清除整个队列,必须停止工作线程。

对于芹菜5.0+,从CLI执行此操作并针对特定队列:

celery -A APP_NAME purge --queues QUEUE_NAME

添加-f选项,以跳过确认步骤,如果你试图做它在一个步骤像我是。

芹菜4 + 芹菜清除命令清除所有配置的任务队列

celery -A *APPNAME* purge

编程:

from proj.celery import app
app.control.purge()

所有挂起的任务将被清除。参考文献:celerydoc

要正确清除等待任务队列,必须停止所有工作(http://celery.readthedocs.io/en/latest/faq.html#i-ve-purged-messages-but-there-are-still-messages-left-in-the-queue):

)
$ sudo rabbitmqctl stop

或者(如果RabbitMQ/消息代理是由Supervisor管理的):

$ sudo supervisorctl stop all

2。…然后从特定队列中清除任务:

$ cd <source_dir>
$ celery amqp queue.purge <queue name>

3。开始RabbitMQ:

$ sudo rabbitmqctl start

或者(如果RabbitMQ由Supervisor管理):

$ sudo supervisorctl start all

对于芹菜版本5.0+与RabbitMQ作为代理

我们需要首先建立一个从程序到broker的新连接,并将连接与要清除的队列绑定。

# proj/celery.py
from celery import Celery
app = Celery('proj')
from proj.celery import app
queues = ['queue_A', 'queue_B', 'queue_C']
with app.connection_for_write() as conn:
    conn.connect()
    for queue in queues:
        count = app.amqp.queues[queue].bind(conn).purge()
        print(f'Purge {queue} with {count} message(s)')

我想这可能已经解决了。旧的任务还在django- celry -beat>periodic tasks在我的django admin中,所以去django admin页面删除它们,然后如果你在docker容器中,重新启动它所有的问题都解决了

相关内容

  • 没有找到相关文章

最新更新