我在芹菜中有一个test
队列,我已经为其定义了一个任务:
@celery_app.task(queue='test', ignore_result=True)
def priority_test(priority):
print(priority)
只需打印参数即可。我想设置appy_async
定义的priority
属性。所以,我写了一个for loop
:
for i in range(100):
priority_test.apply_async((i%10,), queue="test", priority=i%10)
我示出了这样的结果:
[2017-12-26 17:21:37,309: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,311: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,314: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,317: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,319: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,321: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,323: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,326: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,329: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,332: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,334: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,336: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,341: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,344: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,346: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,349: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,351: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,353: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,355: WARNING/ForkPoolWorker-1] 4
[2017-12-26 17:21:37,358: WARNING/ForkPoolWorker-1] 4
[2017-12-26 17:21:37,360: WARNING/ForkPoolWorker-1] 4
表示彼此之后执行相同的优先级,但它以正常方式执行它们:
[2017-12-26 17:21:37,309: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,311: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,314: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,317: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,319: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,321: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,323: WARNING/ForkPoolWorker-1] 4
[2017-12-26 17:21:37,326: WARNING/ForkPoolWorker-1] 3
[2017-12-26 17:21:37,329: WARNING/ForkPoolWorker-1] 2
[2017-12-26 17:21:37,332: WARNING/ForkPoolWorker-1] 1
[2017-12-26 17:21:37,334: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,336: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,341: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,344: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,346: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,349: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,351: WARNING/ForkPoolWorker-1] 4
[2017-12-26 17:21:37,353: WARNING/ForkPoolWorker-1] 3
[2017-12-26 17:21:37,355: WARNING/ForkPoolWorker-1] 2
[2017-12-26 17:21:37,358: WARNING/ForkPoolWorker-1] 1
[2017-12-26 17:21:37,360: WARNING/ForkPoolWorker-1] 10
[2017-12-26 17:21:37,362: WARNING/ForkPoolWorker-1] 9
[2017-12-26 17:21:37,364: WARNING/ForkPoolWorker-1] 8
[2017-12-26 17:21:37,365: WARNING/ForkPoolWorker-1] 7
[2017-12-26 17:21:37,367: WARNING/ForkPoolWorker-1] 6
[2017-12-26 17:21:37,369: WARNING/ForkPoolWorker-1] 5
[2017-12-26 17:21:37,371: WARNING/ForkPoolWorker-1] 4
[2017-12-26 17:21:37,373: WARNING/ForkPoolWorker-1] 3
[2017-12-26 17:21:37,374: WARNING/ForkPoolWorker-1] 2
[2017-12-26 17:21:37,376: WARNING/ForkPoolWorker-1] 1
我应该如何用兔子在芹菜中应用priority
,上面文档中的priority
属性是什么?
为了使priority
正常工作,您需要正确配置几个设置,并且至少需要Rabbitmq的3.5.0版。
首先将您的队列的x-max-priority
设置为10。从文档中:
from kombu import Exchange, Queue
app.conf.task_queues = [
Queue('tasks', Exchange('tasks'), routing_key='tasks',
queue_arguments={'x-max-priority': 10},
]
可以使用task_queue_max_priority
设置设置所有队列的默认值:
app.conf.task_queue_max_priority = 10
然后配置以下设置:
CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1
默认情况下,预取乘数为4,在您的情况下,这将导致前4个任务具有优先级10、9、8和7的优先级,然后在队列中存在其他任务之前。CELERY_ACKS_LATE
设置将导致执行任务后确认任务。您可以尝试此设置以查看自己喜欢的行为。