>我有 2 个数据帧:
df_1
,包含大约 5 亿条记录和 ~100 列df_2
~5000 万条记录和 4 列
我需要将df_1
与df_2
保持连接,其中两列完全匹配,第三列最佳匹配。 我所说的最佳匹配是指从左到右有一个一:多关系,但我只想在长度方面获得右侧的最佳匹配。
例如
# df_1
col1 col2 col3
---------------------------
a b abcde
# df_2
col1 col2 col3 col4
-------------------------------
a b a 90
a b ab 100
a b abc 150
a c abc 90
因此,当我匹配col1
并完全col2
并col3
所包含字符串的最佳匹配时,连接的预期结果是:
col1 col2 col3 col4
-------------------------------
a b abcde 150
这里有一些对我不利的观点:
- 左侧的
col3
长度通常在 10 到 15 个字符之间,右侧的长度可以从 1 个字符到 9 个字符不等 df_1
和df_2
在col3
上都同样偏斜
虽然我已经完成了这项工作,但我的表现很糟糕。
我已经尝试了以下解决方案,但仍然一无所获:
- 广播
df_2
(因太大而无法广播而倒下( - 完全加入
col1
和col1
,并在col3
上使用like
(可怕(
在 df_2
中分解出col3
上的值,以尝试对抗倾斜(改进但仍然缓慢(- 持久化数据并在右侧遍历每个长度,并完全连接
col1
、col2
和col3
的串联(其中左侧的串联是col3
的子字符串((改进但仍然很慢(
使用 Spark 执行此联接的最高性能方法是什么?
更好的选择是在连接之前减小数据大小(我们无法根除连接(。我们可以减少如下:
一、加载数据
scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> df1.show
+---+---+-----+
| c1| c2| c3|
+---+---+-----+
| a| b|abcde|
| c| d| fd|
+---+---+-----+
scala> df2.show
+---+---+----+---+
| c1| c2| c3| c4|
+---+---+----+---+
| a| b| a| 90|
| a| b| abd|100|
| a| b|abcd|150|
| c| d|wewe| 79|
+---+---+----+---+
现在我们需要在连接之前减小 df2 的大小(这将减少连接所需的时间,因为数据大小比较较少(并使用窗口函数找出两列的最大值
scala> df2.withColumn("len", length($"c3")).withColumn("res", row_number().over(wind1)).filter($"res" === 1).withColumn("res2", row_number().over(wind2)).filter($"res2"=== 1).select("c1", "c2", "c3", "c4").show()
+---+---+----+---+
| c1| c2| c3| c4|
+---+---+----+---+
| c| d|wewe| 79|
| a| b|abcd|150|
+---+---+----+---+
要尝试的事情 :
1> 您可以联接此缩减的数据帧并应用正在使用的逻辑
2> 尝试做联合df1.withColumn("c4", lit(0)).union(df2)
,然后应用上述逻辑。
希望这有帮助