芹菜任务优先



我想使用芹菜管理任务。我想拥有一个任务队列(并发1(,并能够以不同的优先级将任务推到队列上,以便更高的优先级任务可以抢占其他任务。

我将三个任务添加到这样的队列:

add_tasks.py

from tasks import example_task
example_task.apply_async((1), priority=1)
example_task.apply_async((2), priority=3)
example_task.apply_async((3), priority=2)

我有以下配置:

tasks.py

from __future__ import absolute_import, unicode_literals
from celery import Celery
from kombu import Queue, Exchange
import time
app = Celery('tasks', backend='rpc://', broker='pyamqp://')
app.conf.task_queues = [Queue('celery', Exchange('celery'), routing_key='celery', queue_arguments={'x-max-priority': 10})]

@app.task
def example_task(task_num):
    time.sleep(3)
    print('Started {}'.format(task_num)
    return True

我希望我添加的第二个任务在第三个任务之前运行,因为它具有更高的优先级,但事实并非如此。他们按添加的顺序运行。

我正在关注文档,以为我已经正确配置了该应用程序。

我是在做错事还是我误解了优先功能?

有可能队列没有机会优先考虑消息(因为它们在分类发生之前已下载(。尝试这两个设置(根据需要适应您的项目(:

CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1

预取乘数默认为4。

我已经开发了一个示例应用程序,以实现芹菜的优先级任务。请在这里看看。在开发它的过程中,我遇到了一个非常相似的问题,设置的这种变化实际上解决了。

请注意,您还需要RabbitMQ版本3.5.0或更高版本。

如果您在6.0上方使用芹菜,请使用以下内容:

        celery_instance.conf.task_acks_late = True
        celery_instance.conf.worker_prefetch_multiplier = 1

相关内容

  • 没有找到相关文章