我将用Celery替换一些土生土长的代码,但很难复制当前的行为。我想要的行为如下:
- 创建新用户时,应使用
user.created
路由密钥将消息发布到tasks
交换机 - 此消息应触发两个Celery任务,即
send_user_activate_email
和check_spam
我尝试通过定义一个带有ignore_result=True
参数的user_created
任务,以及一个用于send_user_activate_email
和check_spam
的任务来实现这一点。
在我的配置中,我添加了以下路由和队列定义。当消息被传递到user_created
队列时,它不会被传递到其他两个队列。
理想情况下,消息仅传递到send_user_activate_email
和check_spam
队列。当使用vanilla RabbitMQ时,消息被发布到交换机,队列可以绑定到该交换机,但Celery似乎直接向队列传递消息。
我将如何在Celery中实现上述行为?
CELERY_QUEUES = {
'user_created': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
'send_user_activate_email': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
'check_spam': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
}
CELERY_ROUTES = {
'user_created': {
'queue': 'user_created',
'routing_key': 'user.created',
'exchange': 'tasks',
'exchange_type': 'topic',
},
'send_user_activate_email': {
'queue': 'user_created',
'routing_key': 'user.created',
'exchange': 'tasks',
'exchange_type': 'topic',
},
'check_spam': {
'queue': 'user_created',
'routing_key': 'user.created',
'exchange': 'tasks',
'exchange_type': 'topic',
},
}
听起来像是期望一条消息触发/被两个队列消耗,但这不是Celery的工作方式。Exchange会将任务发布到符合条件的队列中,但一旦该任务被使用,其他队列就会忽略该消息。每个要触发的任务都需要一条消息。
由于"队列"在这个系统中有两种用法,所以新的Celery用户经常会感到困惑;Queues()和文档引用的Kombu队列,以及直接保存消息并由工作人员使用的AMQP队列。当我们发布到队列时,我们会想到AMQP,这是不正确的。(感谢下面链接的答案)。
回到你的问题,如果我理解正确的话,当user_created被消耗时,你希望它再生成两个任务;send_user_activate_email和check_spam。此外,这些不应相互依赖;它们可以在不同的机器上并行运行,不需要知道彼此的状态。
在这种情况下,您希望user_created"apply_async"这两个新任务并返回。这可以直接完成,也可以使用包含check_spam和send_user_activate_email的Celery"组"来实现。这个小组提供了一些很好的速记,并为你的任务提供了一些结构,所以就我个人而言,我会推动你朝着这个方向前进。
#pseudocode
group(check_spam.s(... checkspam kwargs ...), send_user_activate_email.s(... active email kwargs ...)).apply_async()
此设置将创建四条消息;要执行的每个Task一个,Group()一个,它本身会有一个结果。
在您的情况下,我不确定Exchange或ignore_result是否必要,但我需要查看Task代码并更多地了解系统才能做出判断。
http://docs.celeryproject.org/en/latest/userguide/canvas.html#groupshttp://celery.readthedocs.org/en/v2.2.6/userguide/routing.html#exchanges-队列和路由密钥为什么CELERY_ ROUTES同时具有";队列";和一个";routing_key";?
(如果我离开了,我会删除/删除答案…)
设计和解决问题的简单方法是使用Celery工作流
但首先,我会更改您的队列定义,为每个任务设置一个唯一的路由关键字,并将exchange_type设置为"direct"值
根据芹菜文档,直接交换通过精确的路由密钥匹配,因此我们将相同的交换设置为所有自定义任务和消费者队列,并映射routing_key(用于任务)和binding_key(对于队列),如下一个片段所示:
CELERY_QUEUES = {
'user_created': {'binding_key':'user_created', 'exchange': 'tasks', 'exchange_type': 'direct'},
'send_user_activate_email': {'binding_key':'send_user_activate_email', 'exchange': 'tasks', 'exchange_type': 'direct'},
'check_spam': {'binding_key':'check_spam', 'exchange': 'tasks', 'exchange_type': 'direct'},
}
CELERY_ROUTES = {
'user_created': {
'queue': 'user_created',
'routing_key': 'user_created',
'exchange': 'tasks',
'exchange_type': 'direct',
},
'send_user_activate_email': {
'queue': 'send_user_activate_email',
'routing_key': 'send_user_activate_email',
'exchange': 'tasks',
'exchange_type': 'direct',
},
'check_spam': {
'queue': 'check_spam',
'routing_key': 'check_spam',
'exchange': 'tasks',
'exchange_type': 'direct',
},
}
完成此更改后,您需要对可用列表使用正确的工作流(http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-基元)。读到你的问题,我认为你需要一条链条,因为秩序需要保持。
sequential_tasks = []
sequential_tasks.append(user_created.s(**user_created_kwargs))
sequential_tasks.append(send_user_activate_email.s(**send_user_activate_email_kwargs))
sequential_tasks.append(check_spam.s(**check_spam_kwargs))
#you can add more tasks to the chain
chain(*sequential_tasks)()
Celery将透明地处理与队列相关的工作。