我对AMQP的理解是消息只有以下组件:
- 消息正文
- 路由密钥
- 交易所
队列附加到交换机。消息无法了解队列。它们只是发布到交换机,然后根据交换机类型和路由密钥,将消息路由到一个或多个队列。
在Celery中,推荐的路由任务的方式是通过CELERY_ROUTES
设置。从文档来看,CELERY_ROUTES
是…
路由器列表,或用于将任务路由到队列的单个路由器。http://celery.readthedocs.org/en/latest/configuration.html#message-路由
其中包括一个示例。。。
要将任务路由到feed_tasks队列,可以在
CELERY_ROUTES
设置:CELERY_ROUTES = { 'feeds.tasks.import_feed': { 'queue': 'feed_tasks', 'routing_key': 'feed.import', }, }
但请稍等--根据AMQP,消息只带有路由密钥!"队列"到底在干什么?
此外,还有默认队列的概念。如果您调用了一个未被CELERY_ROUTES
捕获的任务,它将回退到CELERY_DEFAULT_QUEUE
。但是,在AMQP中,消息不知道队列。那不应该是默认的路由密钥吗?
的确,在Celery上,当您进入Queues时会有一些混乱,您必须记住的一件事是,队列参数指的是Celery Kombu队列对象,而不是直接指的AMQP队列,您可以通过阅读文档中的摘录来理解这一点。当然,芹菜创建了具有相同名称的队列和交换,这是混淆队列参数用法的原因。在文档中,您可以阅读以下段落:
如果您有另一个队列,但在另一个要添加的交易所上,只需指定一个自定义交易所和交易所类型:
CELERY_QUEUES = ( Queue('feed_tasks', routing_key='feed.#'), Queue('regular_tasks', routing_key='task.#'), Queue('image_tasks', exchange=Exchange('mediatasks', type='direct'), routing_key='image.compress'), )
因此,通过这种方式,您可以在同一交换机上绑定两个不同的队列。之后,只使用交换机和密钥路由任务,您可以使用路由器类
class MyRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
if task == 'myapp.tasks.compress_video':
return {'exchange': 'video',
'exchange_type': 'topic',
'routing_key': 'video.compress'}
return None
点击此处了解更多信息http://celery.readthedocs.org/en/latest/userguide/routing.html#routers
在其中声明队列的目的是让芹菜创建这些队列并使用RabbitMQ设置配置。
对于较低级别的AMQP客户端,您需要首先声明队列,然后声明交换,最后将交换绑定到队列。稍后发布消息时,您只需将消息发布到交易所即可。
芹菜似乎是利用这种结构自动完成的。