为什么UDF不在可用的执行器上并行运行?



我有一个很小的spark Dataframe,它将字符串推入UDF。由于.repartition(3)的长度与targets相同,我期望run_sequential内部的处理应用于可用的执行器-即应用于3个不同的执行器。

问题是只使用了一个执行器。我如何并行此处理,以迫使我的pyspark脚本将target的每个元素分配给不同的执行器?

import pandas as pd
import pyspark.sql.functions as F
def run_parallel(config):

def run_sequential(target):

#process with target variable
pass

return F.udf(run_sequential)
targets = ["target_1", "target_2", "target_3"]
config = {}
pdf = spark.createDataFrame(pd.DataFrame({"targets": targets})).repartition(3)
pdf.withColumn(
"apply_udf", run_training_parallel(config)("targets")
).collect()

这里的问题是,对DataFrame重新分区并不能保证所有创建的分区都具有相同的大小。对于如此少量的记录,它们中的一些很有可能映射到同一个分区。Spark不适合处理这么小的数据集,它的算法是为处理大量数据而量身定制的——如果你的数据集有300万条记录,你把它分成3个分区,每个分区大约有100万条记录,在大多数情况下,每个分区的几条记录的差异是微不足道的。这显然不是重新分区3条记录的情况。

您可以使用df.rdd.glom().map(len).collect()检查重分区前后的分区大小,以查看分布如何变化。

$ pyspark --master "local[3]"
...
>>> pdf = spark.createDataFrame([("target_1",), ("target_2",), ("target_3",)]).toDF("targets")
>>> pdf.rdd.glom().map(len).collect()
[1, 1, 1]
>>> pdf.repartition(3).rdd.glom().map(len).collect()
[0, 2, 1]

可以看到,生成的分区是不均匀的,在我的例子中,第一个分区实际上是空的。这里具有讽刺意味的是,原始数据帧具有所需的属性,而其中一个被repartition()销毁。

虽然您的特殊情况不是Spark的典型目标,但仍然可以将三个记录强制分布在三个分区中。您所需要做的就是提供一个显式分区键。rdd有zipWithIndex()方法,用它的ID扩展每条记录。该ID是完美的分区键,因为它的值从0开始,递增1。

>>> new_df = (pdf
.coalesce(1)  # not part of the solution - see below
.rdd                         # Convert to RDD
.zipWithIndex()              # Append ID to each record
.map(lambda x: (x[1], x[0])) # Make record ID come first
.partitionBy(3)              # Repartition
.map(lambda x: x[1])         # Remove record ID
.toDF())                     # Turn back into a dataframe
>>> new_df.rdd.glom().map(len).collect()
[1, 1, 1]

在上面的代码中,添加coalesce(1)只是为了证明最终的分区不受pdf最初在每个分区中有一条记录的影响。

只有dataframe的解决方案是首先将pdf合并到单个分区,然后使用repartition(3)。在没有提供分区列的情况下,DataFrame.repartition()使用循环分区器,因此可以实现所需的分区。您不能简单地执行pdf.coalesce(1).repartition(3),因为Catalyst (Spark查询优化引擎)优化了合并操作,因此必须在两者之间插入分区相关的操作。添加一个包含F.monotonically_increasing_id()的列是一个很好的候选操作。
>>> new_df = (pdf
.coalesce(1)
.withColumn("id", F.monotonically_increasing_id())
.repartition(3))
>>> new_df.rdd.glom().map(len).collect()
[1, 1, 1]

请注意,与基于rdd的解决方案不同,coalesce(1)是解决方案的一部分。

最新更新