我正在开发一个功能,支持动态sql作为输入,然后使用输入提交一个spark作业。但是输入是不可预测的,有些输入可能会超过限制,这对我来说是一个危险。我想检查sql的成本提交作业之前,是一种方式,我可以准确地估计成本?
我的Spark配置:
Spark Version: 3.3.1
conf:
spark.sql.cbo.enabled: true
spark.sql.statistics.histogram.enabled:true
示例:
我有一个像这样的dataFramedf1
n x y z
'A' 1 2 3
'A' 4 5 6
'A' 7 8 9
'A' 10 11 12
'A' 13 14 15
'A' 16 17 18
'A' 19 20 21
'A' 22 23 24
'A' 25 26 27
'A' 28 29 30
df1.join(df1,"n","left").join(df1,"n","left")
的行数应为1000
df1.join(df1,"n","left").join(df1,"n","left")
的行数应为10
但dataFrame.queryExecution.optimizedPlan.stats
的结果总是1000以上的例子。
我已经试过了:
dataFrame.queryExecution.optimizedPlan.stats
,但是es的行数比实际的行数要大得多,特别是当join
操作存在时。- 使用
dataFrame.rdd.countApprox
。问题是,当dataFrame很大时,需要很长时间才能得到实际结果 - 我也尝试使用
org.apache.spark.sql.execution.command.CommandUtils#calculateMultipleLocationSizesInParallel
,它比dataFrame.rdd.countApprox
好,但在一些极端的情况下,它也花费了几十分钟以上。
首先让我们计算每个表中的行数
df1_count = df1.count()
df2_count = df2.count()
df3_count = df3.count()
然后使用cogroup创建包含每个表的行计数的DataFrame
counts_df = df1.cogroup(df2, df3)
将行计数相加以获得连接的DataFrame中的估计总行数
estimated_row_count = counts_df.sum()
最后,当您加入时,您可以尝试这种方法
joined_df = df1.join(df2, on=..., how=...).join(df3, on=..., how=...)
exact_row_count = joined_df.count()