>假设我每个执行程序有 36 个内核,每个节点有一个执行程序,每个节点有 3 个节点,每个节点有 48 个可用内核。 我注意到的基本要点是,当我将每个任务设置为使用 1 个内核(默认值)时,我对工作线程的 CPU 利用率约为 70%,每个执行程序将同时执行 36 个任务(正如我所期望的那样)。 但是,当我将配置更改为每个任务有 6 个内核 (--conf spark.task.cpus=6
) 时,每个执行程序一次下降到 6 个任务(如预期的那样),但我的 CPU 利用率也降至 10% 以下(意外)。 我本以为 Spark 会知道如何在 6 个内核上并行化工作负载。
重要的实现细节是,我在DataFrame
的列上运行 UDF 函数,并将结果作为新列附加到该数据帧上。 此 UDF 函数使用一个@transient
对象,该对象提供我正在使用的机器学习算法。 此 UDF 函数不是聚合或合并操作的一部分,它只是对实现的列的map
操作,如下所示:
def myUdf = udf { ... }
val resultSet = myUdf(dataFrame.col("originalCol"))
val dataFrameWithResults = dataFrame.withColumn("originalColMetric", resultSet)
我原以为 Spark 会执行 6myUdf
,一次处理 6 条记录,每个内核一条,但事实似乎并非如此。 有没有办法解决这个问题(无需向 Spark 项目提交 PR),或者至少有人可以解释为什么会发生这种情况吗?
考虑到这个问题,我正在尝试增加每个任务的核心数量,以减少每个执行程序所需的 RAM 量。 在这种情况下,一次执行太多任务会成倍增加 RAM 使用率。
spark.task.cpus
是为每个任务分配的内核数。它用于在用户代码是多线程的情况下将多个内核分配给单个任务。如果您的udf
不使用多个(不会在单个函数调用中生成多个线程)线程,那么内核就会被浪费。
一次处理 6 条记录
分配 6 个内核,spark.task.cpus
设置为 1。如果要限制节点上的任务数,请减少每个节点提供的核心数。
从本质上讲,Spark 可以通过在每个任务之间拆分记录(根据分区)并确定每个执行器可以处理多少个并发任务来自行确定如何在多个记录上拆分映射 UDF。 但是,Spark 不能自动拆分每个任务的每个核心的工作。要利用每个任务的多个内核,需要编写 UDF 中的代码(每次(按顺序)在每个任务上执行一条记录,以并行化该 UDF 中对单个记录的计算。