如何在芹菜任务中使用优先级.apply_async



我在芹菜中有一个test队列,我已经为其定义了一个任务:

@celery_app.task(queue='test', ignore_result=True)
def priority_test(priority):
    print(priority)

只需打印参数即可。我想设置appy_async定义的priority属性。所以,我写了一个for loop

for i in range(100):
    priority_test.apply_async((i%10,), queue="test", priority=i%10)

我示出了这样的结果:

[2017-12-26 17:21:37,309: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,311: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,314: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,317: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,319: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,321: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,323: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,326: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,329: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,332: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,334: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,336: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,341: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,344: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,346: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,349: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,351: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,353: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,355: WARNING/ForkPoolWorker-1] 4
[2017-12-26 17:21:37,358: WARNING/ForkPoolWorker-1] 4
[2017-12-26 17:21:37,360: WARNING/ForkPoolWorker-1] 4

表示彼此之后执行相同的优先级,但它以正常方式执行它们:

[2017-12-26 17:21:37,309: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,311: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,314: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,317: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,319: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,321: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,323: WARNING/ForkPoolWorker-1] 4
[2017-12-26 17:21:37,326: WARNING/ForkPoolWorker-1] 3
[2017-12-26 17:21:37,329: WARNING/ForkPoolWorker-1] 2
[2017-12-26 17:21:37,332: WARNING/ForkPoolWorker-1] 1
[2017-12-26 17:21:37,334: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,336: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,341: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,344: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,346: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,349: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,351: WARNING/ForkPoolWorker-1] 4
[2017-12-26 17:21:37,353: WARNING/ForkPoolWorker-1] 3
[2017-12-26 17:21:37,355: WARNING/ForkPoolWorker-1] 2
[2017-12-26 17:21:37,358: WARNING/ForkPoolWorker-1] 1
[2017-12-26 17:21:37,360: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,362: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,364: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,365: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,367: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,369: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,371: WARNING/ForkPoolWorker-1] 4
[2017-12-26 17:21:37,373: WARNING/ForkPoolWorker-1] 3
[2017-12-26 17:21:37,374: WARNING/ForkPoolWorker-1] 2
[2017-12-26 17:21:37,376: WARNING/ForkPoolWorker-1] 1

我应该如何用兔子在芹菜中应用priority,上面文档中的priority属性是什么?

为了使priority正常工作,您需要正确配置几个设置,并且至少需要Rabbitmq的3.5.0版。

首先将您的队列的x-max-priority设置为10。从文档中:

from kombu import Exchange, Queue
app.conf.task_queues = [
    Queue('tasks', Exchange('tasks'), routing_key='tasks',
          queue_arguments={'x-max-priority': 10},
]

可以使用task_queue_max_priority设置设置所有队列的默认值:

app.conf.task_queue_max_priority = 10

然后配置以下设置:

CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1

默认情况下,预取乘数为4,在您的情况下,这将导致前4个任务具有优先级10、9、8和7的优先级,然后在队列中存在其他任务之前。CELERY_ACKS_LATE设置将导致执行任务后确认任务。您可以尝试此设置以查看自己喜欢的行为。

相关内容

  • 没有找到相关文章

最新更新