Spark SQL - 重命名列会影响分区吗?



我编写了一个显式的加入 API,它使用 l_ 或 r_ 前缀重命名数据集中的列,以消除歧义并解决火花谱系的问题,即列名称 1#77 在列名称 1#123 中找不到,列名称 2#55....

部分代码如下所示:

def explicitJoin(other: Dataset[_], joinExpr: Column, joinType: String): ExplicitJoinExt = {
val left = dataset.toDF(dataset.columns.map("l_" + _): _*)
val right = other.toDF(other.columns.map("r_" + _): _*)
new ExplicitJoinExt(left.join(right, joinExpr, joinType))
}

然后,用户可以传递一个连接表达式,例如 $"l_columnName1" === $"r_columnName1" && ...这样他们就可以 100% 明确地说明他们正在加入哪些列。

我遇到了一个新问题,分区太大而无法加载到内存中(org.apache.spark.shuffle.FetchFailedException:太大的帧...(,但读取输入(分区(数据集没有问题。

重命名列是否会影响输入数据集/数据帧的基础分割?

编辑

示例 1 - 常规联接

case class A(a: Int, b: String)
val l = (0 to 1000000).map(i => A(i, i.toString))
val r = (0 to 1000000).map(i => A(i, i.toString))
val ds1 = l.toDF.as[A].repartition(100, $"a")
val ds2 = r.toDF.as[A].repartition(100, $"a")
val joined = ds1.join(ds2, Seq("a"), "inner")
joined.explain
== Physical Plan ==
*Project [a#2, b#3, b#15]
+- *SortMergeJoin [a#2], [a#14], Inner
:- *Sort [a#2 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(a#2, 100)
:     +- LocalTableScan [a#2, b#3]
+- *Sort [a#14 ASC NULLS FIRST], false, 0
+- ReusedExchange [a#14, b#15], Exchange hashpartitioning(a#2, 100)

示例 2 - 使用我的(可能被误导的(涉及重命名的 ExplicitJoinExt

val joined = ds1
.explicitJoin(ds2, $"l_a" === $"r_a", "inner") // Pimped on conversion to ExplicitJoin type, columns prefixed by l_ or r_. DS joined by expr and join type
.selectLeft                                    // Select just left prefixed columns
.toDF                                          // Convert back from ExplicitJoinExpr to DF
.as[A]
joined.explain

== Physical Plan ==
*Project [l_a#24 AS a#53, l_b#25 AS b#54]
+- *BroadcastHashJoin [l_a#24], [r_a#29], Inner, BuildRight
:- *Project [a#2 AS l_a#24, b#3 AS l_b#25]
:  +- Exchange hashpartitioning(a#2, 100)
:     +- LocalTableScan [a#2, b#3]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- *Project [a#14 AS r_a#29]
+- Exchange hashpartitioning(a#14, 100)
+- LocalTableScan [a#14]

因此,对于第二次加入,我们很可能会再次重新分区 - 对吗?

不,我检查了 SPARK 2.3.1。重命名不会影响分区,至少在此方法中不会影响:

val ds11 = ds1.repartition(4) 

不,我也检查过这个。重命名不会影响分区,至少在此方法中不会影响:

val ds11 = ds1.repartition(2, $"cityid")

解释输出:

val j = left.join(right, $"l_personid" === $"r_personid", "inner").explain

在我的例子中,显示 2 和 4 作为分区数:

== Physical Plan ==
*(2) BroadcastHashJoin [l_personid#641], [r_personid#647], Inner, 
BuildRight, false
:- *(2) Project [personid#612 AS l_personid#641, personname#613 AS 
l_personname#642, cityid#614 AS l_cityid#643]
:  +- Exchange hashpartitioning(cityid#614, 2)
:     +- LocalTableScan [personid#612, personname#613, cityid#614]   
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- *(1) Project [personid#612 AS r_personid#647, personname#613 AS r_personname#648, cityid#614 AS r_cityid#649]
+- Exchange hashpartitioning(personid#612, 4)
+- LocalTableScan [personid#612, personname#613, cityid#614]

可以看到重命名的 col 被映射回其原始名称。

在对其他地方的帖子进行测试时,我们能够确定依赖于聚合或 JOIN 的新操作将默认为 200,除非

sqlContext.setConf("spark.sql.shuffle.partitions", "some val")

在代码中发出,将此设置为所需的值。如果是一小组正在联接的数据等,那么结果可能会有所不同。

对于那些仍然遇到此问题的人:重命名列确实会影响 Spark <3.0 中的分区。

Seq((1, 2))
.toDF("a", "b")
.repartition($"b")
.withColumnRenamed("b", "c")
.repartition($"c")
.explain()

给出以下计划:

== Physical Plan ==
Exchange hashpartitioning(c#40, 10)
+- *(1) Project [a#36, b#37 AS c#40]
+- Exchange hashpartitioning(b#37, 10)
+- LocalTableScan [a#36, b#37]

这在此 PR 中已修复。

最新更新