我正在使用气流和芹菜。我想为芹菜工人指定一个 CPU 内核列表,芹菜工人应确保仅在该特定内核上分配进程。
我已经浏览了芹菜文档(这里(,并额外关注并发、优化和工作线程指南等部分。但是找不到解决方案。
那么,我如何确保芹菜仅将任务分配给特定的 CPU 内核。我不能使用taskset
.如果可以将每个ForkPoolWorker
绑定并处理它生成的特定 CPU 内核,那么芹菜将成为我们的完美解决方案。但是,如果芹菜管理一组核心,我们也对此感到满意。但是我们需要为芹菜工人专用 CPU 内核。
截至目前,Airflow(实际上是Celery(并没有提供一种简单的方法来做到这一点。 因此,我们在谢尔文顿的建议下,使用了一些技巧解决了这个问题@Iain。
我们在气流中放置了一行新行.cfg它为我们提供了 CPU 核心编号,芹菜应该运行它选择的进程。
然后,我们 grep 所有芹菜ForkPoolWorker
的进程 id,并使用taskset
设置对提供的核心的亲和力。
我们编写了一个 bash 脚本start_worker.sh来启动 worker。此外,我们将工作人员数量限制为 1 人,但增加了worker_concurrency
$AIRFLOW_HOME/airflow.cfg看起来像这样:
...
worker_concurrency = 36
# We added the following line
cores_available = 12,13,14,15...45,46,47
...
start_worker.sh的伪代码如下所示:
# start a worker in daemon mode
airflow worker -D
# get the list of cpu cores
CORES=$(grep "cores_available" | cut -d '=' -f 2)
# get the PID of celery ForkWorkers
PIDS=$(ps -eaf | grep celery.*ForkPoolWorker*)
# for every PID, use the core
for idx in len(workers_count); do
taskset -c CORES[idx] PIDS[idx]
你试过设置吗worker_concurrency = # of cores
?
http://docs.celeryproject.org/en/latest/userguide/configuration.html#worker-concurrency
这几乎总是一个坏主意,除非你在一个像大这样的花哨的架构上。LITTLE,并且您希望确保,例如,系统调度程序不会在错误的核心上运行计算成本高昂的任务(线程(。芹菜做不到,但你可以在 Linux 上使用taskset
手动做到。如果设置 Celery 工作线程的相关性,则所有工作进程都将在该特定核心上运行,这是一个糟糕的主意,除非您的并发设置为 1(只有一个工作进程(。
如果您仍然认为这是一个好主意,并且它解决了您的问题,那么我建议您尝试编写一个worker_process_init处理程序,该处理程序基本上在新创建的工作进程上调用os.sched_setaffinity
。请记住,此处理程序需要快速。