为什么CELERY_ROUTES既有"queue"又有"routing_key"?



我对AMQP的理解是消息只有以下组件:

  1. 消息正文
  2. 路由密钥
  3. 交易所

队列附加到交换机。消息无法了解队列。它们只是发布到交换机,然后根据交换机类型和路由密钥,将消息路由到一个或多个队列。

在Celery中,推荐的路由任务的方式是通过CELERY_ROUTES设置。从文档来看,CELERY_ROUTES是…

路由器列表,或用于将任务路由到队列的单个路由器。http://celery.readthedocs.org/en/latest/configuration.html#message-路由

其中包括一个示例。。。

要将任务路由到feed_tasks队列,可以在CELERY_ROUTES设置:

CELERY_ROUTES = {
'feeds.tasks.import_feed': {
'queue': 'feed_tasks',
'routing_key': 'feed.import',
},
}

但请稍等--根据AMQP,消息只带有路由密钥!"队列"到底在干什么?

此外,还有默认队列的概念。如果您调用了一个未被CELERY_ROUTES捕获的任务,它将回退到CELERY_DEFAULT_QUEUE。但是,在AMQP中,消息不知道队列。那不应该是默认的路由密钥吗?

的确,在Celery上,当您进入Queues时会有一些混乱,您必须记住的一件事是,队列参数指的是Celery Kombu队列对象,而不是直接指的AMQP队列,您可以通过阅读文档中的摘录来理解这一点。当然,芹菜创建了具有相同名称的队列和交换,这是混淆队列参数用法的原因。在文档中,您可以阅读以下段落:

如果您有另一个队列,但在另一个要添加的交易所上,只需指定一个自定义交易所和交易所类型:

CELERY_QUEUES = (
Queue('feed_tasks',    routing_key='feed.#'),
Queue('regular_tasks', routing_key='task.#'),
Queue('image_tasks',   exchange=Exchange('mediatasks', type='direct'),
routing_key='image.compress'),
)

因此,通过这种方式,您可以在同一交换机上绑定两个不同的队列。之后,只使用交换机和密钥路由任务,您可以使用路由器类

class MyRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
if task == 'myapp.tasks.compress_video':
return {'exchange': 'video',
'exchange_type': 'topic',
'routing_key': 'video.compress'}
return None

点击此处了解更多信息http://celery.readthedocs.org/en/latest/userguide/routing.html#routers

在其中声明队列的目的是让芹菜创建这些队列并使用RabbitMQ设置配置。

对于较低级别的AMQP客户端,您需要首先声明队列,然后声明交换,最后将交换绑定到队列。稍后发布消息时,您只需将消息发布到交易所即可。

芹菜似乎是利用这种结构自动完成的。

相关内容

  • 没有找到相关文章

最新更新