更新:根本问题是Spark 3.2.0中修复的一个错误。
两次运行中的输入df结构相同,但输出不同。只有第二次运行返回所需的结果(df6
)。我知道我可以为数据帧使用别名,这将返回所需的结果。
这个问题创建df3
的基础Spark机制是什么?Spark在join
的on
子句中读取df1.c1 == df2.c2
,但很明显,它没有注意到所提供的dfs。引擎盖下面是什么?如何预测这种行为?
首次运行(df3
结果不正确):
data = [
(1, 'bad', 'A'),
(4, 'ok', None)]
df1 = spark.createDataFrame(data, ['ID', 'Status', 'c1'])
df1 = df1.withColumn('c2', F.lit('A'))
df1.show()
#+---+------+----+---+
#| ID|Status| c1| c2|
#+---+------+----+---+
#| 1| bad| A| A|
#| 4| ok|null| A|
#+---+------+----+---+
df2 = df1.filter((F.col('Status') == 'ok'))
df2.show()
#+---+------+----+---+
#| ID|Status| c1| c2|
#+---+------+----+---+
#| 4| ok|null| A|
#+---+------+----+---+
df3 = df2.join(df1, (df1.c1 == df2.c2), 'full')
df3.show()
#+----+------+----+----+----+------+----+----+
#| ID|Status| c1| c2| ID|Status| c1| c2|
#+----+------+----+----+----+------+----+----+
#| 4| ok|null| A|null| null|null|null|
#|null| null|null|null| 1| bad| A| A|
#|null| null|null|null| 4| ok|null| A|
#+----+------+----+----+----+------+----+----+
第二次运行(正确的df6
结果):
data = [
(1, 'bad', 'A', 'A'),
(4, 'ok', None, 'A')]
df4 = spark.createDataFrame(data, ['ID', 'Status', 'c1', 'c2'])
df4.show()
#+---+------+----+---+
#| ID|Status| c1| c2|
#+---+------+----+---+
#| 1| bad| A| A|
#| 4| ok|null| A|
#+---+------+----+---+
df5 = spark.createDataFrame(data, ['ID', 'Status', 'c1', 'c2']).filter((F.col('Status') == 'ok'))
df5.show()
#+---+------+----+---+
#| ID|Status| c1| c2|
#+---+------+----+---+
#| 4| ok|null| A|
#+---+------+----+---+
df6 = df5.join(df4, (df4.c1 == df5.c2), 'full')
df6.show()
#+----+------+----+----+---+------+----+---+
#| ID|Status| c1| c2| ID|Status| c1| c2|
#+----+------+----+----+---+------+----+---+
#|null| null|null|null| 4| ok|null| A|
#| 4| ok|null| A| 1| bad| A| A|
#+----+------+----+----+---+------+----+---+
我可以看到物理计划的不同之处在于内部使用了不同的联接(BroadcastNestedLoopJoin和SortMergeJoindf3.explain()
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, FullOuter, (c1#23335 = A)
:- *(1) Project [ID#23333L, Status#23334, c1#23335, A AS c2#23339]
: +- *(1) Filter (isnotnull(Status#23334) AND (Status#23334 = ok))
: +- *(1) Scan ExistingRDD[ID#23333L,Status#23334,c1#23335]
+- BroadcastExchange IdentityBroadcastMode, [id=#9250]
+- *(2) Project [ID#23379L, Status#23380, c1#23381, A AS c2#23378]
+- *(2) Scan ExistingRDD[ID#23379L,Status#23380,c1#23381]
df6.explain()
== Physical Plan ==
SortMergeJoin [c2#23459], [c1#23433], FullOuter
:- *(2) Sort [c2#23459 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(c2#23459, 200), ENSURE_REQUIREMENTS, [id=#9347]
: +- *(1) Filter (isnotnull(Status#23457) AND (Status#23457 = ok))
: +- *(1) Scan ExistingRDD[ID#23456L,Status#23457,c1#23458,c2#23459]
+- *(4) Sort [c1#23433 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(c1#23433, 200), ENSURE_REQUIREMENTS, [id=#9352]
+- *(3) Scan ExistingRDD[ID#23431L,Status#23432,c1#23433,c2#23434]
连接取决于连接的数据帧的结构,但如何构建这些数据帧也会产生影响。如果您连接的两个数据帧共享相同的沿袭,那么在连接条件中可能会出现不明确的列问题,从而导致您在问题中描述的内容。
在第一次运行中,当您从df1
构建df2
时,这两个数据帧共享相同的沿袭。当你连接这两个数据帧时,你实际上是在进行自连接,Spark选择了只属于其中一个连接数据帧的错误列作为连接条件,导致笛卡尔乘积后面跟着一个始终为false的过滤器。
在第二次运行中,由于两个数据帧是独立构建的,因此正确地定义了两列之间相等的联接条件,每列都属于不同的数据帧。因此Spark执行了一个经典的联接。
详细说明
正如pltc在他的回答中所解释的,在您的第一次运行中,Spark没有为您的联接选择正确的列。让我们找出原因。
引擎盖下面是什么
让我们从使用explain
获取df1
和df2
的物理计划开始。以下是df1
的物理计划:
== Physical Plan ==
*(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
+- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]
这是df2
的物理计划:
== Physical Plan ==
*(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
+- *(1) Filter (isnotnull(Status#1) AND (Status#1 = ok))
+- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]
在以(1) Project
开头的第一行中,您可以看到两个数据帧df1
和df2
具有相同的列名和ID:[ID#0L, Status#1, c1#2, A AS c2#6]
。这并不奇怪,因为df2
是从df1
创建的,所以您可以将df2
视为具有附加转换的df1
。因此,我们有以下参考资料:
df1.c1
<>CCD_ 24<>c1#2
- CCD_ 26<>CCD_ 27<>
A AS c2#6
当您加入df1
和df2
时,意味着您进行了自加入。您的条件的所有以下组合将被翻译为c1#2 = A AS c2#6
,这将给您留下简化的联接条件c1#2 = A
:
df1.c1 = df2.c2
df1.c2 = df2.c1
df2.c1 = df1.c2
df2.c2 = df1.c1
当您在Spark中执行自联接时,Spark将重新生成正确数据帧的列ID,以避免在最终数据帧中具有相同的列ID。因此,在您的情况下,它将重写df1
的列id所以列c1#2
将参考df2
的列c1
现在您的条件不包含df1
中的任何列,那么Spark将选择执行笛卡尔乘积作为联接策略。由于两个数据帧中的一个数据帧小到足以被广播,所以所选择的算法将是BroadcastNestedLoopJoin
。这就是df3
的物理计划所显示的:
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, FullOuter, (c1#2 = A)
:- *(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
: +- *(1) Filter (isnotnull(Status#1) AND (Status#1 = ok))
: +- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]
+- BroadcastExchange IdentityBroadcastMode, [id=#75]
+- *(2) Project [ID#46L, Status#47, c1#48, A AS c2#45]
+- *(2) Scan ExistingRDD[ID#46L,Status#47,c1#48]
注意,df1
的四个新列id现在是[ID#46L, Status#47, c1#48, A AS c2#45]
当您执行此计划时,对于df2
的唯一行,c1
的值为null
,与A
不同,因此联接条件始终为false。当您选择完全外部联接时,您将获得三行(两行来自df1
,一行来自df2
),其中null
列来自另一个数据帧:
+----+------+----+----+----+------+----+----+
| ID|Status| c1| c2| ID|Status| c1| c2|
+----+------+----+----+----+------+----+----+
| 4| ok|null| A|null| null|null|null|
|null| null|null|null| 1| bad| A| A|
|null| null|null|null| 4| ok|null| A|
+----+------+----+----+----+------+----+----+
为什么第二次运行时我有想要的输出
对于第二次运行,您将创建两个独立的数据帧。因此,如果我们查看df4
和df5
的物理计划,您可以看到列ID是不同的。以下是df4
:的物理计划
== Physical Plan ==
*(1) Scan ExistingRDD[ID#98L,Status#99,c1#100,c2#101]
这是df5
:的物理计划
== Physical Plan ==
*(1) Filter (isnotnull(Status#124) AND (Status#124 = ok))
+- *(1) Scan ExistingRDD[ID#123L,Status#124,c1#125,c2#126]
您的加入条件是c1#100 = c2#126
,c1#100
是df4
的c1
列,c2#126
是df5
的c2
列。联接条件中相等的每一端都来自不同的数据帧,因此Spark可以按预期执行联接。
为什么这没有被检测为模糊自联接
自Spark 3.0以来,Spark会检查用于联接的列是否不明确。如果您在加入df2
和df1
时将它们的顺序颠倒如下:
df3 = df1.join(df2, (df1.c1 == df2.c2), 'full')
你会得到以下错误:
pyspark.sql.utils.AnalysisException:列c2#6不明确。
那么,为什么在执行df2.join(df1, ...)
时不出现此错误呢
你的答案在Spark代码中的DetectAmbigiousSelfJoin文件中:
//发生自联接时,分析器会要求右侧计划生成带有新exprId的
//属性。如果数据集的计划输出的属性
//由列引用引用,并且此属性的exprId与
//列引用的属性,则列引用是不明确的,因为它
//指通过自联接重新生成的列。
这意味着在执行df2.join(df1, ...)
时,我们将仅针对df1
检查联接条件中使用的列。在我们的案例中,我们没有对df1
执行任何转换,与筛选的df2
相反,df1
列的exprId没有更改,因此不会引发不明确的列错误。
我在Spark Jira上创建了一个关于这种行为的问题,请参阅Spark-36874(该错误在3.2.0版本中修复)
如何预测这种行为
你必须非常小心你的加入是否是自加入。如果从数据帧df1
开始,对其执行一些转换以获得df2
,然后加入df1
和df2
,则可能会出现这种行为。为了减轻这种情况,在执行联接时,应该始终将原始数据帧作为第一个数据帧,因此使用df1.join(df2, ...)
而不是df2.join(df1, ...)
。这样一来,如果Spark无法选择正确的列,您将得到一个Analysis Exception: Column x are ambiguous
。
c1
和c2
列。这是df3
的修复程序,以获得预期结果:
df3 = df2.alias('df2').join(df1.alias('df1'), (F.col('df1.c1') == F.col('df2.c2')), 'full')
df3.show()
# Output
# +----+------+----+----+---+------+----+---+
# | ID|Status| c1| c2| ID|Status| c1| c2|
# +----+------+----+----+---+------+----+---+
# | 4| ok|null| A| 1| bad| A| A|
# |null| null|null|null| 4| ok|null| A|
# +----+------+----+----+---+------+----+---+