干杯,
我在生产环境(在 Linux 上)中运行了一个芹菜设置,我需要从两个专用队列(每个队列一个)中使用两种不同的任务类型。出现的问题是,所有工作线程始终绑定到两个队列,即使我指定它们仅从其中一个队列中消耗。
博士
- 芹菜与 2 队列运行
- 消息按设计发布在正确的队列中
- 工作人员不断使用两个队列
- 导致死锁
基本信息
将我的两种不同的任务类型视为分层设置:
- 任务是常规的芹菜任务,
可能需要相当长的时间,因为它会动态调度其他芹菜任务,并且可能需要通过各自的结果进行链接
节点是一个动态调度的子任务,它也是一个常规的芹菜任务,但它本身可以被视为一个原子单元。
因此,我的任务可以是更复杂的节点设置,其中一个或多个节点的结果作为一个或多个后续节点的输入,依此类推。由于我的任务可能需要更长的时间,并且只有在部署了所有节点后才能完成,因此必须由专门的工作线程处理它们,以保持足够数量的工作线程空闲来使用节点。否则,这可能会导致系统卡住,当调度大量任务时,每个任务都被另一个工作线程使用,并且它们各自的节点只排队但永远不会被消耗,因为所有工作线程都被阻止。
如果这通常是一个糟糕的设计,请就如何改进它提出任何建议。我还没有设法使用芹菜的内置画布基元构建这些过程之一。如果可以的话,帮帮我?!
配置/设置
我用amqp
运行芹菜,并在芹菜配置中设置了以下队列和路由:
CELERY_QUERUES = (
Queue('prod_nodes', Exchange('prod'), routing_key='prod.node'),
Queue('prod_tasks', Exchange('prod'), routing_key='prod.task')
)
CELERY_ROUTES = (
'deploy_node': {'queue': 'prod_nodes', 'routing_key': 'prod.node'},
'deploy_task': {'queue': 'prod_tasks', 'routing_key': 'prod.task'}
)
当我启动辅助角色时,我会发出类似于以下内容的调用:
celery multi start w_task_01 w_node_01 w_node_02 -A my.deployment.system
-E -l INFO -P gevent -Q:1 prod_tasks -Q:2-3 prod_nodes -c 4 --autoreload
--logfile=/my/path/to/log/%N.log --pidfile=/my/path/to/pid/%N.pid
问题所在
我的队列和路由设置似乎工作正常,因为我可以在 RabbitMQ 管理 Web UI 中看到消息正确排队。
但是,所有工作线程始终使用两个队列中的芹菜任务。当我启动并打开花卉 Web UI 并检查其中一个已部署的任务时,我可以看到这一点,例如w_node_01开始使用来自prod_tasks队列的消息,即使它不应该。
RabbitMQ 管理 Web UI 进一步告诉我,所有启动的工作线程都设置为两个队列的使用者。
因此,我问你...
。我做错了什么?
我的设置或工作人员启动呼叫的问题在哪里;我怎样才能避免工人总是从两个队列中消耗的问题;我真的必须在运行时进行其他设置(我当然不想要)吗?
感谢您的时间和答案!
为每个队列创建 2 个单独的工作线程,每个工作线程使用 -Q
命令行参数定义它应该从哪个队列获取任务。
如果要保持进程数不变,默认情况下,会为每个工作线程的每个核心打开一个进程,您可以使用 --concurrency
标志(有关详细信息,请参阅 Celery 文档)
Celery 允许配置具有特定队列的工人。
1) 为不同类型的作业指定具有"queue"属性的队列名称
celery.send_task('job_type1', args=[], kwargs={}, queue='queue_name_1')
celery.send_task('job_type2', args=[], kwargs={}, queue='queue_name_2')
2) 在配置文件中添加以下条目
CELERY_CREATE_MISSING_QUEUES = True
3) 在启动工作线程时,传递 -Q 'queue_name' 作为参数,用于从所需的队列中消费。
celery -A proj worker -l info -Q queue_name_1 -n worker1
celery -A proj worker -l info -Q queue_name_2 -n worker2