现在,我想将register
事件发布到某个特殊的交换机,我可以使用芹菜远程检索和处理它。
事实上,我已经使用了send_task
函数来实现这一点,但它必须通过task_name
来指示应该执行哪个任务并使用它。所以它似乎不太适合我的目标。
我想要的就是:
- 将
register
消息发布到某个Exchange
-
远程机器1订阅此
topic
或route_key
并捕获消息,使用is执行任务 -
远程机器2-与机器1相同,但执行另一个任务-接收该任务(可能需要回复特定的
queue
(
例如,就像这个工作流一样:
寄存器:
- send_email
-
generate_info
如果我需要非标准交换,我会这样做。
在我的celeryconfig中,我指定了那个交换,并将队列分配给它,如下所示(在我的情况下,我需要fanout
交换(:
from kombu.common import Broadcast
from kombu import Exchange, Queue
CELERY_QUEUES = (
Broadcast(name='queue_name', exchange=Exchange('queue_name', type='fanout')),
)
然后,我用芹菜multi生成worker,并将其分配给我的特定队列,如下所示:
celery multi start 1 -A my_project -Q:1 queue_name -c:1 1 (other options go here)
然后我可以将我的任务插入到队列中,如下所示:
from my_project import my_fancy_task
my_fancy_task.apply_async(args=(x, y, z), queue="my_queue")
我不太理解您的具体用例,如果您需要一台主机上的工作人员来使用一个队列中的任务,然后需要另一台主机的工作人员使用另一个队列的任务,那么只需将您的任务拆分为两个队列,并将每个主机配置为启动工作人员,并将其分配给对您有意义的任何队列。也许这会有所帮助:RabbitMQ 的主题交换歧义