正在删除Apache Spark中的空DataFrame分区



我尝试根据一列对DataFrame进行重新分区。DataFrame在分区列x中具有不同的N(比如N=3)值,例如:

val myDF = sc.parallelize(Seq(1,1,2,2,3,3)).toDF("x") // create dummy data

我喜欢实现的是通过x重新分配myDF,而不产生空分区。有比这更好的方法吗?

val numParts = myDF.select($"x").distinct().count.toInt
myDF.repartition(numParts,$"x")

(如果我没有在repartiton中指定numParts,我的大多数分区都是空的(因为repartition创建了200个分区)…)

我想到的解决方案是df分区上迭代,并在其中获取记录计数以查找非空分区。

val nonEmptyPart = sparkContext.longAccumulator("nonEmptyPart") 
df.foreachPartition(partition =>
if (partition.length > 0) nonEmptyPart.add(1))

由于我们得到了非空分区(nonEmptyPart),我们可以通过使用coalesce()来清理空分区(check coalize()vs repartition())。

val finalDf = df.coalesce(nonEmptyPart.value.toInt) //coalesce() accepts only Int type

它可能是最好的,也可能不是最好的,但这个解决方案将避免混洗,因为我们没有使用repartition()


地址注释示例

val df1 = sc.parallelize(Seq(1, 1, 2, 2, 3, 3)).toDF("x").repartition($"x")
val nonEmptyPart = sc.longAccumulator("nonEmptyPart")
df1.foreachPartition(partition =>
if (partition.length > 0) nonEmptyPart.add(1))
val finalDf = df1.coalesce(nonEmptyPart.value.toInt)
println(s"nonEmptyPart => ${nonEmptyPart.value.toInt}")
println(s"df.rdd.partitions.length => ${df1.rdd.partitions.length}")
println(s"finalDf.rdd.partitions.length => ${finalDf.rdd.partitions.length}")

输出

nonEmptyPart => 3
df.rdd.partitions.length => 200
finalDf.rdd.partitions.length => 3

相关内容

  • 没有找到相关文章

最新更新