停止芹菜工人从所有队列中食用



干杯,

我在生产环境(在 Linux 上)中运行了一个芹菜设置,我需要从两个专用队列(每个队列一个)中使用两种不同的任务类型。出现的问题是,所有工作线程始终绑定到两个队列,即使我指定它们仅从其中一个队列中消耗。

博士

  • 芹菜与 2 队列运行
  • 消息按设计发布在正确的队列中
  • 工作人员不断使用两个队列
  • 导致死锁

基本信息

将我的两种不同的任务类型视为分层设置:

    任务是常规的芹菜任务,
  • 可能需要相当长的时间,因为它会动态调度其他芹菜任务,并且可能需要通过各自的结果进行链接

  • 节点是一个动态调度的子任务,它也是一个常规的芹菜任务,但它本身可以被视为一个原子单元。

因此,我的任务可以是更复杂的节点设置,其中一个或多个节点的结果作为一个或多个后续节点的输入,依此类推。由于我的任务可能需要更长的时间,并且只有在部署了所有节点后才能完成,因此必须由专门的工作线程处理它们,以保持足够数量的工作线程空闲来使用节点否则,这可能会导致系统卡住,当调度大量任务时,每个任务都被另一个工作线程使用,并且它们各自的节点只排队但永远不会被消耗,因为所有工作线程都被阻止。

如果这通常是一个糟糕的设计,请就如何改进它提出任何建议。我还没有设法使用芹菜的内置画布基元构建这些过程之一。如果可以的话,帮帮我?!

配置/设置

我用amqp运行芹菜,并在芹菜配置中设置了以下队列和路由:

CELERY_QUERUES = (
    Queue('prod_nodes', Exchange('prod'), routing_key='prod.node'),
    Queue('prod_tasks', Exchange('prod'), routing_key='prod.task')
)
CELERY_ROUTES = (
    'deploy_node': {'queue': 'prod_nodes', 'routing_key': 'prod.node'},
    'deploy_task': {'queue': 'prod_tasks', 'routing_key': 'prod.task'}
)

当我启动辅助角色时,我会发出类似于以下内容的调用:

celery multi start w_task_01 w_node_01 w_node_02 -A my.deployment.system 
  -E -l INFO -P gevent -Q:1 prod_tasks -Q:2-3 prod_nodes -c 4 --autoreload 
  --logfile=/my/path/to/log/%N.log --pidfile=/my/path/to/pid/%N.pid

问题所在

我的队列和路由设置似乎工作正常,因为我可以在 RabbitMQ 管理 Web UI 中看到消息正确排队。

但是,所有工作线程始终使用两个队列中的芹菜任务。当我启动并打开花卉 Web UI 并检查其中一个已部署的任务时,我可以看到这一点,例如w_node_01开始使用来自prod_tasks队列的消息,即使它不应该。

RabbitMQ 管理 Web UI 进一步告诉我,所有启动的工作线程都设置为两个队列的使用者

因此,我问你...

。我做错了什么?

我的设置或工作人员启动呼叫的问题在哪里;我怎样才能避免工人总是从两个队列中消耗的问题;我真的必须在运行时进行其他设置(我当然不想要)吗?

感谢您的时间和答案!

您可以

为每个队列创建 2 个单独的工作线程,每个工作线程使用 -Q 命令行参数定义它应该从哪个队列获取任务。

如果要保持进程数不变,默认情况下,会为每个工作线程的每个核心打开一个进程,您可以使用 --concurrency 标志(有关详细信息,请参阅 Celery 文档)

Celery 允许配置具有特定队列的工人。

1) 为不同类型的作业指定具有"queue"属性的队列名称

celery.send_task('job_type1', args=[], kwargs={}, queue='queue_name_1')
celery.send_task('job_type2', args=[], kwargs={}, queue='queue_name_2')

2) 在配置文件中添加以下条目

CELERY_CREATE_MISSING_QUEUES = True

3) 在启动工作线程时,传递 -Q 'queue_name' 作为参数,用于从所需的队列中消费。

celery -A proj worker -l info -Q queue_name_1 -n worker1
celery -A proj worker -l info -Q queue_name_2 -n worker2

相关内容

  • 没有找到相关文章

最新更新