如何估计联接两个或多个表后 Spark 数据帧的行数?



我正在开发一个功能,支持动态sql作为输入,然后使用输入提交一个spark作业。但是输入是不可预测的,有些输入可能会超过限制,这对我来说是一个危险。我想检查sql的成本提交作业之前,是一种方式,我可以准确地估计成本?

我的Spark配置:

Spark Version: 3.3.1
conf:  
spark.sql.cbo.enabled: true
spark.sql.statistics.histogram.enabled:true

示例:
我有一个像这样的dataFramedf1

n   x   y   z
'A' 1   2   3
'A' 4   5   6
'A' 7   8   9
'A' 10  11  12
'A' 13  14  15
'A' 16  17  18
'A' 19  20  21
'A' 22  23  24
'A' 25  26  27
'A' 28  29  30

df1.join(df1,"n","left").join(df1,"n","left")的行数应为1000

df1.join(df1,"n","left").join(df1,"n","left")的行数应为10

dataFrame.queryExecution.optimizedPlan.stats的结果总是1000以上的例子。

我已经试过了:

  1. dataFrame.queryExecution.optimizedPlan.stats,但是es的行数比实际的行数要大得多,特别是当join操作存在时。
  2. 使用dataFrame.rdd.countApprox。问题是,当dataFrame很大时,需要很长时间才能得到实际结果
  3. 我也尝试使用org.apache.spark.sql.execution.command.CommandUtils#calculateMultipleLocationSizesInParallel,它比dataFrame.rdd.countApprox好,但在一些极端的情况下,它也花费了几十分钟以上。

首先让我们计算每个表中的行数

df1_count = df1.count()
df2_count = df2.count()
df3_count = df3.count()

然后使用cogroup创建包含每个表的行计数的DataFrame

counts_df = df1.cogroup(df2, df3)

将行计数相加以获得连接的DataFrame中的估计总行数

estimated_row_count = counts_df.sum()

最后,当您加入时,您可以尝试这种方法

joined_df = df1.join(df2, on=..., how=...).join(df3, on=..., how=...)
exact_row_count = joined_df.count()

相关内容

  • 没有找到相关文章

最新更新