我有一个芹菜任务,它可能会使其他子任务排队。如果工作线程从高优先级队列中取出该任务,然后该任务将其他任务排在队列中,我希望新任务返回进入高优先级队列。但是,我如何通过编程获得当前执行任务来自的队列?
我知道我可以做一些事情,比如传递一个额外的参数,到原始的my_task.apply_async()
调用,它指定了一个用于子任务的队列,然后我可以通过通过任务的方法/类链传递它,但这似乎很混乱,很难维护。似乎队列信息可以在某个地方获得,只需询问芹菜。
我发现队列信息可以通过current_task.request.delivery_info['exchange']
获得。
所以,我最终使用的解决方案如下:
def get_source_queue(default=None):
"""
Finds and returns the queue that the currently-executing task (if any) came from.
"""
from celery import current_task
if current_task is not None and 'exchange' in current_task.request.delivery_info:
source_queue = current_task.request.delivery_info['exchange']
if source_queue is not None:
return source_queue
return default
然后在子任务中使用,像这样:
my_task.apply_async(args=('my', 'args'), queue=get_source_queue(default='foo_queue'))
我不知道这是不是最好的方法。也许芹菜中内置了一些东西,说"使用与源队列相同的队列"(?)但是,上面的工作