Squl SQL确实通过重复表达式优化查询



给定以下

from pyspark.sql import functions, window
f = functions.rank()
w1 = window.Window.partitionBy("column")
w2 = window.Window.partitionBy("column")
col = functions.col("column * 42")

和一个dataframe df

的性能是否有任何差异
df.select(f.over(w1), f.over(w2))

vs

df.select(f.over(w1), f.over(w1))

df.select(col + 1, col + 2)

vs

df.select(functions.expr("column * 42 + 1"), functions.expr("column * 42 + 2")

(随意想象任意复杂的表达方式代替column * 42

即。重复使用列和窗口的现场与即时构建这些表达式有任何好处?

我希望Spark SQL能够适当地对此进行优化,但找不到结论性的答案。

另外,我应该通过检查df.explain()的结果来自己回答这个问题,如果是,我应该寻找什么?

随时可以想象任意复杂的表达式代替列 * 42

...甚至任何非确定性表达式(例如生成随机数或当前时间戳)。

每当您提出这样的问题时,请使用explain操作员查看SPARK SQL在封面下处理的内容(实际上应该与编程语言和功能或使用的功能或方法无关,不是吗?)

因此,在以下非确定性查询(或完全确定性的,但乍一看是非确定性)下发生的情况:

):
val q = spark.range(1)
 .select(
   current_timestamp as "now",  // <-- this should be the same as the following line?
   current_timestamp as "now_2",
   rand as "r1", // <-- what about this and the following lines?
   rand as "r2",
   rand as "r3")
scala> q.show(truncate = false)
+-----------------------+-----------------------+-------------------+------------------+------------------+
|now                    |now_2                  |r1                 |r2                |r3                |
+-----------------------+-----------------------+-------------------+------------------+------------------+
|2017-12-13 15:17:46.305|2017-12-13 15:17:46.305|0.33579358107333823|0.9478025260069644|0.5846726225651472|
+-----------------------+-----------------------+-------------------+------------------+------------------+

实际上,我很惊讶地注意到rand S都产生了不同的结果,因为我认为结果是相同的。答案是在... RAND的源代码中,您可以看到它使用不同的种子,如果未明确定义(今天就学会了!谢谢)。

)。
def rand(): Column = rand(Utils.random.nextLong)

答案是将rand的版本与显式seed一起使用,因为这将为您提供相同的Rand逻辑操作员,并在整个查询中使用相同的seed

val seed = 1
val q = spark.range(1)
 .select(
   current_timestamp as "now",  // <-- this should be the same as the following line?
   current_timestamp as "now_2",
   rand(seed) as "r1", // <-- what about this and the following lines?
   rand(seed) as "r2",
   rand(seed) as "r3")
scala> q.show(false)
+-----------------------+-----------------------+-------------------+-------------------+-------------------+
|now                    |now_2                  |r1                 |r2                 |r3                 |
+-----------------------+-----------------------+-------------------+-------------------+-------------------+
|2017-12-13 15:43:59.019|2017-12-13 15:43:59.019|0.06498948189958098|0.06498948189958098|0.06498948189958098|
+-----------------------+-----------------------+-------------------+-------------------+-------------------+

spark sql知道您在结构化查询中使用了什么,因为SPARK SQL的高级别API称为DataFrameDataset只是围绕逻辑运算符的包装器,它们跨语言相同(Python,Scala,Java,Java,R,R,SQL)。

只需查看任何函数的源代码,您就会看到催化剂表达式(例如rand)或数据集操作员(例如选择),您将看到一个或一棵逻辑运算符树。

最后,Spark SQL使用基于规则的优化器,该优化器使用规则来优化查询并查找重复。

所以,让我们看一下您的情况(比rand更确定性)。

(我正在使用Scala,但差异在语言不是优化级别)

import org.apache.spark.sql.expressions.Window
val w1 = Window.partitionBy("column").orderBy("column")
val w2 = Window.partitionBy("column").orderBy("column")

在您的情况下,您使用了需要订购数据集的rank,因此我添加orderBy子句以使窗口规范完成。

scala> w1 == w2
res1: Boolean = false

它们确实与Scala的观点不同

val df = spark.range(5).withColumnRenamed("id", "column")
scala> df.show
+------+
|column|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
+------+

使用数据集(与我们的讨论无关),让我们创建一个结构化查询并explain,以查看Spark SQL执行的物理计划。

val q = df.select(rank over w1, rank over w2)
scala> q.explain
== Physical Plan ==
*Project [RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#193, RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#194]
+- Window [rank(column#156L) windowspecdefinition(column#156L, column#156L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#193, rank(column#156L) windowspecdefinition(column#156L, column#156L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#194], [column#156L], [column#156L ASC NULLS FIRST]
   +- *Sort [column#156L ASC NULLS FIRST, column#156L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(column#156L, 200)
         +- *Project [id#153L AS column#156L]
            +- *Range (0, 5, step=1, splits=8)

让我们使用编号的输出,以便我们可以参考描述中的每一行。

val plan = q.queryExecution.executedPlan
scala> println(plan.numberedTreeString)
00 *Project [RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#193, RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#194]
01 +- Window [rank(column#156L) windowspecdefinition(column#156L, column#156L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#193, rank(column#156L) windowspecdefinition(column#156L, column#156L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY column ORDER BY column ASC NULLS FIRST unspecifiedframe$())#194], [column#156L], [column#156L ASC NULLS FIRST]
02    +- *Sort [column#156L ASC NULLS FIRST, column#156L ASC NULLS FIRST], false, 0
03       +- Exchange hashpartitioning(column#156L, 200)
04          +- *Project [id#153L AS column#156L]
05             +- *Range (0, 5, step=1, splits=8)

您可以查看查询是否与另一个相似,如果有的话有什么区别。那是您可以得到的最明确的答案,而且……令人惊讶...事情可能(通常会)在火花版本之间发生变化。

即。重复使用列和窗口的现场与即时构建这些表达式有任何好处?

我不会对此有太多考虑,因为我希望Spark能够内部处理它(而且您可能已经注意到我很惊讶rand的工作方式有所不同)。

只需使用explain查看物理计划,您就可以自己回答问题。

相关内容

  • 没有找到相关文章

最新更新