我正在使用芹菜的apply_async方法对任务进行排队。我预计每天会运行大约100000个这样的任务(这个数字只会上升)。我使用RabbitMQ作为代理。几天前我运行了代码,几个小时后RabbitMQ崩溃了。我注意到apply_async为每个任务创建了一个新队列,其中x-expires设置为1天。我的假设是,当创建这么多队列时,RabbitMQ会阻塞。如何才能阻止芹菜为每个任务创建这些额外的队列?
我还尝试将队列参数赋予apply_async,并为该队列分配了一个x-message-ttl。消息确实进入了这个新队列,但它们立即被消耗掉,而且从未达到我放入的30秒的ttl。这并没有阻止芹菜产生额外的队列。
这是我的代码:
views.py
from celery import task, chain
chain(task1.s(a), task2.s(b),)
.apply_async(link_error=error_handler.s(a), queue="async_tasks_queue")
tasks.py
from celery.result import AsyncResult
@shared_task
def error_handler(uuid, a):
#Handle error
@shared_task
def task1(a):
#Do something
return a
@shared_task
def task2(a, b):
#Do something more
celery.py
app = Celery(
'app',
broker=settings.QUEUE_URL,
backend=settings.QUEUE_URL,
)
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.amqp.queues.add("async_tasks_queue", queue_arguments={'durable' : True , 'x-message-ttl': 30000})
来自芹菜日志:
〔2016-01-05 01:17:24398:INFO/MainProcess〕收到的任务:project.tasks.task1[615e094c-2ec9-4568-9fe1-82ead 2cd303b]
〔2016-01-05 01:17:24834:INFO/MainProcess〕收到的任务:project.decorators.wrapper[bf9a0a94-8e71-4ad6-9eaa-359f93446a3f]
执行这些任务时,RabbitMQ有两个新队列,名称分别为"615e094c2ec945689fe182ead2cd303b"one_answers"bf9a0a948e714ad69eaa359f93446a3f"我的代码运行在Django 1.7.7、芹菜3.1.17和RabbitMQ 3.5.3上。
异步执行任务的任何其他建议也欢迎
尝试使用不同的后端-我推荐Redis。当我们尝试同时使用Rabbitmq作为代理和后端时,我们发现它不适合代理角色。