我想使用芹菜管理任务。我想拥有一个任务队列(并发1(,并能够以不同的优先级将任务推到队列上,以便更高的优先级任务可以抢占其他任务。
我将三个任务添加到这样的队列:
add_tasks.py
from tasks import example_task
example_task.apply_async((1), priority=1)
example_task.apply_async((2), priority=3)
example_task.apply_async((3), priority=2)
我有以下配置:
tasks.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
from kombu import Queue, Exchange
import time
app = Celery('tasks', backend='rpc://', broker='pyamqp://')
app.conf.task_queues = [Queue('celery', Exchange('celery'), routing_key='celery', queue_arguments={'x-max-priority': 10})]
@app.task
def example_task(task_num):
time.sleep(3)
print('Started {}'.format(task_num)
return True
我希望我添加的第二个任务在第三个任务之前运行,因为它具有更高的优先级,但事实并非如此。他们按添加的顺序运行。
我正在关注文档,以为我已经正确配置了该应用程序。
我是在做错事还是我误解了优先功能?
有可能队列没有机会优先考虑消息(因为它们在分类发生之前已下载(。尝试这两个设置(根据需要适应您的项目(:
CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1
预取乘数默认为4。
我已经开发了一个示例应用程序,以实现芹菜的优先级任务。请在这里看看。在开发它的过程中,我遇到了一个非常相似的问题,设置的这种变化实际上解决了。
请注意,您还需要RabbitMQ版本3.5.0或更高版本。
如果您在6.0上方使用芹菜,请使用以下内容:
celery_instance.conf.task_acks_late = True
celery_instance.conf.worker_prefetch_multiplier = 1