如何确保芹菜任务的子任务与父任务进入相同的队列?



我有一个芹菜任务,它可能会使其他子任务排队。如果工作线程从高优先级队列中取出该任务,然后该任务将其他任务排在队列中,我希望新任务返回进入高优先级队列。但是,我如何通过编程获得当前执行任务来自的队列?

我知道我可以做一些事情,比如传递一个额外的参数,到原始的my_task.apply_async()调用,它指定了一个用于子任务的队列,然后我可以通过通过任务的方法/类链传递它,但这似乎很混乱,很难维护。似乎队列信息可以在某个地方获得,只需询问芹菜。

我发现队列信息可以通过current_task.request.delivery_info['exchange']获得。
所以,我最终使用的解决方案如下:

def get_source_queue(default=None):
    """
    Finds and returns the queue that the currently-executing task (if any) came from.
    """
    from celery import current_task
    if current_task is not None and 'exchange' in current_task.request.delivery_info:
        source_queue = current_task.request.delivery_info['exchange']
        if source_queue is not None:
            return source_queue
    return default

然后在子任务中使用,像这样:

my_task.apply_async(args=('my', 'args'), queue=get_source_queue(default='foo_queue'))


我不知道这是不是最好的方法。也许芹菜中内置了一些东西,说"使用与源队列相同的队列"(?)但是,上面的工作

相关内容

  • 没有找到相关文章

最新更新