我刚开始用芹菜,有一个问题。我有这个简单的任务:
@app.task(name='test_install_queue')
def test_install_queue():
return subprocess.call("exit 0",shell=True)
,我稍后在像
这样的测试用例中调用这个任务result = tasks.test_default_queue.apply_async(queue="install")
任务在队列install
中成功运行(因为我在芹菜日志中看到它),并且它完成得很好。但是我想知道一种编程的方式,从存储在result
中的对象中找到任务test_install_queue
运行的队列。
谢谢!
编辑:我把任务改成:
@app.task(name='test_install_queue',bind=True)
def test_install_queue(self):
return self.request.__dict__
,然后我使用apply_async
的结果如下:
result = tasks.test_install_queue.apply_async(queue="install")
assert "install" in result.get()["hostname"]
,解决方法是worker (hostname)与worker中初始化的唯一队列具有相同的名称。
您可以尝试以下方法:
delivery_info = app.current_task.request.delivery_info
# by default celery uses the same name for queues and exchanges
original_queue = delivery_info['exchange']
for queue in app.amqp.queues.itervalues():
if queue.exchange.name == delivery_info['exchange']
and queue.routing_key == delivery_info['routing_key']:
original_queue = queue.name
break
这个方法是建立在你使用默认的芹菜设置和你的交换是直接的假设之上的。如果您需要更通用的fanout和主题交换解决方案,那么您将不得不检查app.amqp.queues
中每个声明队列的路由密钥。
我自己刚刚遇到过这个问题,我真的很怀疑是否需要一个复杂的解决方案,比如已经被接受的"lexabug"…因此,由于即使是芹菜文档也没有提供有效的替代方案,我自己使用反射进行了调查,以便了解哪个对象包含了我需要的信息,并且我提出了一个超级简单和直接的解决方案。具体来说,我正在编写一个钩子,或者更好地说,在芹菜术语中是一个信号,下面是我如何根据任务名称检索队列的名称:
@signals.after_task_publish.connect()
def on_task_publish(sender=None, headers=None, body=None, **kwargs):
# "sender" is a string containing task name
# ("celery" here is the celery app)
task: Task = celery.tasks.get(sender)
# once we have the task object, we can access the "queue" property
# which contains the name of the queue
# (it' a dynamic property so don't expect support by your IDE)
queue_name: str = task.queue if task is not None else 'unknown'
p。我用的是芹菜4.4
假设任务是绑定任务,可以使用self.request.delivery_info['routing_key']