火花加入最佳匹配效率问题



>我有 2 个数据帧:

  • df_1,包含大约 5 亿条记录和 ~100 列
  • df_2~5000 万条记录和 4 列

我需要将df_1df_2保持连接,其中两列完全匹配,第三列最佳匹配。 我所说的最佳匹配是指从左到右有一个一:多关系,但我只想在长度方面获得右侧的最佳匹配。

例如

# df_1
col1    col2    col3
---------------------------
a       b       abcde
# df_2
col1    col2    col3    col4
-------------------------------
a       b       a       90
a       b       ab      100
a       b       abc     150
a       c       abc     90

因此,当我匹配col1并完全col2col3所包含字符串的最佳匹配时,连接的预期结果是:

col1    col2    col3    col4
-------------------------------
a       b       abcde   150

这里有一些对我不利的观点:

  • 左侧的col3长度通常在 10 到 15 个字符之间,右侧的长度可以从 1 个字符到 9 个字符不等
  • df_1df_2col3上都同样偏斜

虽然我已经完成了这项工作,但我的表现很糟糕

我已经尝试了以下解决方案,但仍然一无所获:

  • 广播df_2(因太大而无法广播而倒下(
  • 完全加入col1col1,并在col3上使用like(可怕(
  • df_2中分解出col3上的值,以尝试对抗倾斜(改进但仍然缓慢(
  • 持久化数据并在右侧遍历每个长度,并完全连接col1col2col3的串联(其中左侧的串联是col3的子字符串((改进但仍然很慢(

使用 Spark 执行此联接的最高性能方法是什么?

更好的选择是在连接之前减小数据大小(我们无法根除连接(。我们可以减少如下:

一、加载数据

scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> df1.show
+---+---+-----+
| c1| c2|   c3|
+---+---+-----+
|  a|  b|abcde|
|  c|  d|   fd|
+---+---+-----+
scala> df2.show
+---+---+----+---+
| c1| c2|  c3| c4|
+---+---+----+---+
|  a|  b|   a| 90|
|  a|  b| abd|100|
|  a|  b|abcd|150|
|  c|  d|wewe| 79|
+---+---+----+---+

现在我们需要在连接之前减小 df2 的大小(这将减少连接所需的时间,因为数据大小比较较少(并使用窗口函数找出两列的最大值

scala> df2.withColumn("len", length($"c3")).withColumn("res", row_number().over(wind1)).filter($"res" === 1).withColumn("res2", row_number().over(wind2)).filter($"res2"=== 1).select("c1", "c2", "c3", "c4").show()
+---+---+----+---+
| c1| c2|  c3| c4|
+---+---+----+---+
|  c|  d|wewe| 79|
|  a|  b|abcd|150|
+---+---+----+---+

要尝试的事情 :

1> 您可以联接此缩减的数据帧并应用正在使用的逻辑

2> 尝试做联合df1.withColumn("c4", lit(0)).union(df2),然后应用上述逻辑。

希望这有帮助

最新更新