在芹菜中,我如何防止长时间延迟的任务阻塞新任务



我有两种任务。任务A由celerybeat每小时生成一次。它立即运行,并生成一千个(或数千个)任务B的实例,每个实例都有一个未来一天的ETA。

在启动时,任务A的一个实例运行并生成一千个b。从那以后,什么都没发生。我应该看到每小时又有一个A,还有另外1000个b。但事实上我什么也没看见。

在冻结时,rabbitmqctl显示1000条消息,其中968条已准备好,32条未确认。一小时后,有1001条信息,969条已发送,32条未确认。以此类推,每小时就有一条新消息被归类为准备就绪。可能发生的情况是,worker正在预取32条消息,但无法对它们进行操作,因为它们的ETA仍在未来。与此同时,现在应该运行的新任务无法运行。

处理这个的正确方法是什么?我猜我需要多个工人,也许还需要多个队列(但我不确定后一点)。有更简单的方法吗?我试过摆弄CELERYD_PREFETCH_MULTIPLIER和-Ofail(这里讨论过:http://celery.readthedocs.org/en/latest/userguide/optimizing.html),但是无法让它运行。我的问题和这个一样吗:[[Django芹菜]]芹菜阻塞了做IO任务?

在任何情况下:我可以解决这个问题,只是因为我知道很多关于任务的性质和他们的时间安排。如果未来的ETA任务足够多,就会锁定整个系统,这难道不是一个设计缺陷吗?如果我等待几个小时,然后杀死并重新启动worker,它会再次捕获前32个任务并冻结,即使此时队列中有任务已经准备好运行。难道不应该有一些组件足够聪明,能够查看eta并忽略不可运行的任务吗?

附录:我现在认为这个问题是一个已知的bug,当RabbitMQ 3.3与芹菜3.1.0一起使用时。更多信息请点击这里:https://groups.google.com/forum/!searchin/celery-users/倒计时|分类:日期/celery-users/FiAAESOzezA/499 oh-pylacj

更新到芹菜3.1.1后,情况似乎有所好转。任务A每小时运行一次(好吧,它已经运行了几个小时),并调度任务b的副本。这些似乎填满了worker:未确认消息的数量继续增长。我得看看它能不能不受约束地生长

这似乎是一个可以通过路由解决的问题:http://celery.readthedocs.org/en/latest/userguide/routing.html

在使用路由时,可以有多个队列,其中填充了不同类型的任务。如果你想让任务B不阻塞更多的任务A,你可以把它们分成不同优先级的单独的工人队列,这样你的工人将在满是任务B的大队列上工作,但是当一个任务A到达时,它被下一个可用的工人拉走。

这样做的额外好处是,您还可以将更多的工人分配到严重填充的队列,这些工人将只从指定的高容量队列中提取。

当前有多少工作线程以什么并发运行?

增加工作线程的并发性可能有助于解决问题。如果线程x卡在任务A上,而任务A花费了很长时间或者处于等待状态,那么另一个线程可以处理其他预取的任务

http://celery.readthedocs.io/en/latest/reference/celery.bin.worker.html cmdoption-celery-worker-c

相关内容

  • 没有找到相关文章

最新更新