芹菜一个代理多个队列和工人



我有一个名为tasks.py的python文件,其中我定义了4个单个任务。我想配置芹菜以使用 4 个队列,因为每个队列都会分配不同数量的工作人员。我正在阅读我应该使用route_task属性,但我尝试了几个选项,但没有成功。

我正在关注这个文档芹菜route_tasks文档

我的目标是运行 4 个工作线程,每个任务一个,并且不要在不同的队列中混合来自不同工作人员的任务。有可能?这是个好办法?

如果我做错了什么,我很乐意更改我的代码以使其工作

这是我到目前为止的配置

tasks.py

app = Celery('tasks', broker='pyamqp://guest@localhost//')
app.conf.task_default_queue = 'default'
app.conf.task_queues = (
Queue('queueA',    routing_key='tasks.task_1'),
Queue('queueB',    routing_key='tasks.task_2'),
Queue('queueC',    routing_key='tasks.task_3'),
Queue('queueD',    routing_key='tasks.task_4')
)

@app.task
def task_1():
print "Task of level 1"

@app.task
def task_2():
print "Task of level 2"

@app.task
def task_3():
print "Task of level 3"

@app.task
def task_4():
print "Task of level 4"

为每个队列运行一个芹菜

celery -A tasks worker --loglevel=debug -Q queueA --logfile=celery-A.log -n W1&
celery -A tasks worker --loglevel=debug -Q queueB --logfile=celery-B.log -n W2&
celery -A tasks worker --loglevel=debug -Q queueC --logfile=celery-C.log -n W3&
celery -A tasks worker --loglevel=debug -Q queueD --logfile=celery-D.log -n W4&

无需进入复杂的路由即可将任务提交到不同的队列中。像往常一样定义任务。

from celery import celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def task_1():
print "Task of level 1"

@app.task
def task_2():
print "Task of level 2"

现在,在排队任务时,将任务放入正确的队列中。这是有关如何执行此操作的示例。

In [12]: from tasks import *
In [14]: result = task_1.apply_async(queue='queueA')
In [15]: result = task_2.apply_async(queue='queueB')

这会将task_1放入名为queueA的队列中,并将task_2放在queueB中。

现在,您可以启动您的工人来消费它们。

celery -A tasks worker --loglevel=debug -Q queueA --logfile=celery-A.log -n W1&
celery -A tasks worker --loglevel=debug -Q queueB --logfile=celery-B.log -n W2&

注意:taskmessage在答案中可以互换使用。它基本上是producer发送到 RabbitMQ 的有效负载

。您可以遵循 Chillar 建议的方法,也可以定义并使用task_routes配置将消息路由到适当的队列。这样,您就不需要在每次调用apply_async时都指定队列名称。

示例:将任务 1路由到QueueA并将任务 2路由到QueueB

app = Celery('my_app')
app.conf.update(
task_routes={
'task1': {'queue': 'QueueA'},
'task2': {'queue': 'QueueB'}
}
)

将任务发送到多个队列有点棘手。您必须声明交换,然后使用适当的routing_key路由您的任务。您可以在此处获取有关交换类型的更多信息。为了说明的目的,让我们使用direct

  1. 创建交易所

    from kombu import Exchange, Queue, binding
    exchange_for_queueA_and_B = Exchange('exchange_for_queueA_and_B', type='direct')
    
  2. 在队列上创建与该交换的绑定

    app.conf.update(
    task_queues=(
    Queue('QueueA', [
    binding(exchange_for_queueA_and_B, routing_key='queue_a_and_b')
    ]),
    Queue('QueueB', [
    binding(exchange_for_queueA_and_B, routing_key='queue_a_and_b')
    ])
    )
    )
    
  3. 定义将任务 1发送到交易所task_route

    app.conf.update(
    task_routes={
    'task1': {'exchange': 'exchange_for_queueA_and_B', 'routing_key': 'queue_a_and_b'}
    }
    )
    

您还可以按照 Chillar 在上述答案中的建议,在apply_async方法中声明这些exchangerouting_key选项。

之后,您可以在同一台计算机或不同计算机上定义工作线程,以从这些队列中使用。

celery -A my_app worker -n consume_from_QueueA_and_QueueB -Q QueueA,QueueB
celery -A my_app worker -n consume_from_QueueA_only -Q QueueA

相关内容

  • 没有找到相关文章

最新更新