我尝试根据一列对DataFrame进行重新分区。DataFrame在分区列x
中具有不同的N
(比如N=3
)值,例如:
val myDF = sc.parallelize(Seq(1,1,2,2,3,3)).toDF("x") // create dummy data
我喜欢实现的是通过x
重新分配myDF
,而不产生空分区。有比这更好的方法吗?
val numParts = myDF.select($"x").distinct().count.toInt
myDF.repartition(numParts,$"x")
(如果我没有在repartiton
中指定numParts
,我的大多数分区都是空的(因为repartition
创建了200个分区)…)
我想到的解决方案是在df
分区上迭代,并在其中获取记录计数以查找非空分区。
val nonEmptyPart = sparkContext.longAccumulator("nonEmptyPart")
df.foreachPartition(partition =>
if (partition.length > 0) nonEmptyPart.add(1))
由于我们得到了非空分区(nonEmptyPart
),我们可以通过使用coalesce()
来清理空分区(check coalize()vs repartition())。
val finalDf = df.coalesce(nonEmptyPart.value.toInt) //coalesce() accepts only Int type
它可能是最好的,也可能不是最好的,但这个解决方案将避免混洗,因为我们没有使用repartition()
地址注释示例
val df1 = sc.parallelize(Seq(1, 1, 2, 2, 3, 3)).toDF("x").repartition($"x")
val nonEmptyPart = sc.longAccumulator("nonEmptyPart")
df1.foreachPartition(partition =>
if (partition.length > 0) nonEmptyPart.add(1))
val finalDf = df1.coalesce(nonEmptyPart.value.toInt)
println(s"nonEmptyPart => ${nonEmptyPart.value.toInt}")
println(s"df.rdd.partitions.length => ${df1.rdd.partitions.length}")
println(s"finalDf.rdd.partitions.length => ${finalDf.rdd.partitions.length}")
输出
nonEmptyPart => 3
df.rdd.partitions.length => 200
finalDf.rdd.partitions.length => 3