为什么连接结构标识数据帧会产生不同的结果



更新:根本问题是Spark 3.2.0中修复的一个错误。


两次运行中的输入df结构相同,但输出不同。只有第二次运行返回所需的结果(df6)。我知道我可以为数据帧使用别名,这将返回所需的结果。

这个问题创建df3的基础Spark机制是什么?Spark在joinon子句中读取df1.c1 == df2.c2,但很明显,它没有注意到所提供的dfs。引擎盖下面是什么?如何预测这种行为?

首次运行(df3结果不正确):

data = [
(1, 'bad', 'A'),
(4, 'ok', None)]
df1 = spark.createDataFrame(data, ['ID', 'Status', 'c1'])
df1 = df1.withColumn('c2', F.lit('A'))
df1.show()
#+---+------+----+---+
#| ID|Status|  c1| c2|
#+---+------+----+---+
#|  1|   bad|   A|  A|
#|  4|    ok|null|  A|
#+---+------+----+---+
df2 = df1.filter((F.col('Status') == 'ok'))
df2.show()
#+---+------+----+---+
#| ID|Status|  c1| c2|
#+---+------+----+---+
#|  4|    ok|null|  A|
#+---+------+----+---+
df3 = df2.join(df1, (df1.c1 == df2.c2), 'full')
df3.show()
#+----+------+----+----+----+------+----+----+
#|  ID|Status|  c1|  c2|  ID|Status|  c1|  c2|
#+----+------+----+----+----+------+----+----+
#|   4|    ok|null|   A|null|  null|null|null|
#|null|  null|null|null|   1|   bad|   A|   A|
#|null|  null|null|null|   4|    ok|null|   A|
#+----+------+----+----+----+------+----+----+

第二次运行(正确的df6结果):

data = [
(1, 'bad', 'A', 'A'),
(4, 'ok', None, 'A')]
df4 = spark.createDataFrame(data, ['ID', 'Status', 'c1', 'c2'])
df4.show()
#+---+------+----+---+
#| ID|Status|  c1| c2|
#+---+------+----+---+
#|  1|   bad|   A|  A|
#|  4|    ok|null|  A|
#+---+------+----+---+
df5 = spark.createDataFrame(data, ['ID', 'Status', 'c1', 'c2']).filter((F.col('Status') == 'ok'))
df5.show()
#+---+------+----+---+
#| ID|Status|  c1| c2|
#+---+------+----+---+
#|  4|    ok|null|  A|
#+---+------+----+---+
df6 = df5.join(df4, (df4.c1 == df5.c2), 'full')
df6.show()
#+----+------+----+----+---+------+----+---+
#|  ID|Status|  c1|  c2| ID|Status|  c1| c2|
#+----+------+----+----+---+------+----+---+
#|null|  null|null|null|  4|    ok|null|  A|
#|   4|    ok|null|   A|  1|   bad|   A|  A|
#+----+------+----+----+---+------+----+---+

我可以看到物理计划的不同之处在于内部使用了不同的联接(BroadcastNestedLoopJoinSortMergeJoin

df3.explain()
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, FullOuter, (c1#23335 = A)
:- *(1) Project [ID#23333L, Status#23334, c1#23335, A AS c2#23339]
:  +- *(1) Filter (isnotnull(Status#23334) AND (Status#23334 = ok))
:     +- *(1) Scan ExistingRDD[ID#23333L,Status#23334,c1#23335]
+- BroadcastExchange IdentityBroadcastMode, [id=#9250]
+- *(2) Project [ID#23379L, Status#23380, c1#23381, A AS c2#23378]
+- *(2) Scan ExistingRDD[ID#23379L,Status#23380,c1#23381]
df6.explain()
== Physical Plan ==
SortMergeJoin [c2#23459], [c1#23433], FullOuter
:- *(2) Sort [c2#23459 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(c2#23459, 200), ENSURE_REQUIREMENTS, [id=#9347]
:     +- *(1) Filter (isnotnull(Status#23457) AND (Status#23457 = ok))
:        +- *(1) Scan ExistingRDD[ID#23456L,Status#23457,c1#23458,c2#23459]
+- *(4) Sort [c1#23433 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(c1#23433, 200), ENSURE_REQUIREMENTS, [id=#9352]
+- *(3) Scan ExistingRDD[ID#23431L,Status#23432,c1#23433,c2#23434]

连接取决于连接的数据帧的结构,但如何构建这些数据帧也会产生影响。如果您连接的两个数据帧共享相同的沿袭,那么在连接条件中可能会出现不明确的列问题,从而导致您在问题中描述的内容。

在第一次运行中,当您从df1构建df2时,这两个数据帧共享相同的沿袭。当你连接这两个数据帧时,你实际上是在进行自连接,Spark选择了只属于其中一个连接数据帧的错误列作为连接条件,导致笛卡尔乘积后面跟着一个始终为false的过滤器。

在第二次运行中,由于两个数据帧是独立构建的,因此正确地定义了两列之间相等的联接条件,每列都属于不同的数据帧。因此Spark执行了一个经典的联接。


详细说明

正如pltc在他的回答中所解释的,在您的第一次运行中,Spark没有为您的联接选择正确的列。让我们找出原因。

引擎盖下面是什么

让我们从使用explain获取df1df2的物理计划开始。以下是df1的物理计划:

== Physical Plan ==
*(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
+- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]

这是df2的物理计划:

== Physical Plan ==
*(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
+- *(1) Filter (isnotnull(Status#1) AND (Status#1 = ok))
+- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]

在以(1) Project开头的第一行中,您可以看到两个数据帧df1df2具有相同的列名和ID:[ID#0L, Status#1, c1#2, A AS c2#6]。这并不奇怪,因为df2是从df1创建的,所以您可以将df2视为具有附加转换的df1。因此,我们有以下参考资料:

  • df1.c1<>CCD_ 24<>c1#2
  • CCD_ 26<>CCD_ 27<>A AS c2#6

当您加入df1df2时,意味着您进行了自加入。您的条件的所有以下组合将被翻译为c1#2 = A AS c2#6,这将给您留下简化的联接条件c1#2 = A:

  • df1.c1 = df2.c2
  • df1.c2 = df2.c1
  • df2.c1 = df1.c2
  • df2.c2 = df1.c1

当您在Spark中执行自联接时,Spark将重新生成正确数据帧的列ID,以避免在最终数据帧中具有相同的列ID。因此,在您的情况下,它将重写df1的列id所以列c1#2将参考df2的列c1

现在您的条件不包含df1中的任何列,那么Spark将选择执行笛卡尔乘积作为联接策略。由于两个数据帧中的一个数据帧小到足以被广播,所以所选择的算法将是BroadcastNestedLoopJoin。这就是df3的物理计划所显示的:

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, FullOuter, (c1#2 = A)
:- *(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
:  +- *(1) Filter (isnotnull(Status#1) AND (Status#1 = ok))
:     +- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]
+- BroadcastExchange IdentityBroadcastMode, [id=#75]
+- *(2) Project [ID#46L, Status#47, c1#48, A AS c2#45]
+- *(2) Scan ExistingRDD[ID#46L,Status#47,c1#48]

注意,df1的四个新列id现在是[ID#46L, Status#47, c1#48, A AS c2#45]

当您执行此计划时,对于df2的唯一行,c1的值为null,与A不同,因此联接条件始终为false。当您选择完全外部联接时,您将获得三行(两行来自df1,一行来自df2),其中null列来自另一个数据帧:

+----+------+----+----+----+------+----+----+
|  ID|Status|  c1|  c2|  ID|Status|  c1|  c2|
+----+------+----+----+----+------+----+----+
|   4|    ok|null|   A|null|  null|null|null|
|null|  null|null|null|   1|   bad|   A|   A|
|null|  null|null|null|   4|    ok|null|   A|
+----+------+----+----+----+------+----+----+

为什么第二次运行时我有想要的输出

对于第二次运行,您将创建两个独立的数据帧。因此,如果我们查看df4df5的物理计划,您可以看到列ID是不同的。以下是df4:的物理计划

== Physical Plan ==
*(1) Scan ExistingRDD[ID#98L,Status#99,c1#100,c2#101]

这是df5:的物理计划

== Physical Plan ==
*(1) Filter (isnotnull(Status#124) AND (Status#124 = ok))
+- *(1) Scan ExistingRDD[ID#123L,Status#124,c1#125,c2#126]

您的加入条件是c1#100 = c2#126c1#100df4c1列,c2#126df5c2列。联接条件中相等的每一端都来自不同的数据帧,因此Spark可以按预期执行联接。

为什么这没有被检测为模糊自联接

自Spark 3.0以来,Spark会检查用于联接的列是否不明确。如果您在加入df2df1时将它们的顺序颠倒如下:

df3 = df1.join(df2, (df1.c1 == df2.c2), 'full')

你会得到以下错误:

pyspark.sql.utils.AnalysisException:列c2#6不明确。

那么,为什么在执行df2.join(df1, ...)时不出现此错误呢

你的答案在Spark代码中的DetectAmbigiousSelfJoin文件中:

//发生自联接时,分析器会要求右侧计划生成带有新exprId的
//属性。如果数据集的计划输出的属性
//由列引用引用,并且此属性的exprId与
//列引用的属性,则列引用是不明确的,因为它
//指通过自联接重新生成的列。

这意味着在执行df2.join(df1, ...)时,我们将仅针对df1检查联接条件中使用的列。在我们的案例中,我们没有对df1执行任何转换,与筛选的df2相反,df1列的exprId没有更改,因此不会引发不明确的列错误。

我在Spark Jira上创建了一个关于这种行为的问题,请参阅Spark-36874(该错误在3.2.0版本中修复)

如何预测这种行为

你必须非常小心你的加入是否是自加入。如果从数据帧df1开始,对其执行一些转换以获得df2,然后加入df1df2,则可能会出现这种行为。为了减轻这种情况,在执行联接时,应该始终将原始数据帧作为第一个数据帧,因此使用df1.join(df2, ...)而不是df2.join(df1, ...)。这样一来,如果Spark无法选择正确的列,您将得到一个Analysis Exception: Column x are ambiguous

由于某些原因,Spark无法正确区分c1c2列。这是df3的修复程序,以获得预期结果:
df3 = df2.alias('df2').join(df1.alias('df1'), (F.col('df1.c1') == F.col('df2.c2')), 'full')
df3.show()
# Output
# +----+------+----+----+---+------+----+---+
# |  ID|Status|  c1|  c2| ID|Status|  c1| c2|
# +----+------+----+----+---+------+----+---+
# |   4|    ok|null|   A|  1|   bad|   A|  A|
# |null|  null|null|null|  4|    ok|null|  A|
# +----+------+----+----+---+------+----+---+

最新更新