sparks执行计划中 Project
节点的含义是什么?
我有一个包含以下内容的计划:
+- Project [dm_country#population#6a1ad864-235f-4761-9a6d-0ca2a2b40686#834, dm_country#population#country#839, population#17 AS dm_country#population#population#844]
+- Project [dm_country#population#6a1ad864-235f-4761-9a6d-0ca2a2b40686#834, country#12 AS dm_country#population#country#839, population#17]
+- Project [6a1ad864-235f-4761-9a6d-0ca2a2b40686#22 AS dm_country#population#6a1ad864-235f-4761-9a6d-0ca2a2b40686#834, country#12, population#17]
+- RepartitionByExpression [country#12], 1000
+- Union
:- Project [ind#7 AS 6a1ad864-235f-4761-9a6d-0ca2a2b40686#22, country#12, population#17]
: +- Project [ind#7, country#12, population#2 AS population#17]
: +- Project [ind#7, country#1 AS country#12, population#2]
: +- Project [ind#0 AS ind#7, country#1, population#2]
: +- Relation[ind#0,country#1,population#2] JDBCRelation(schema_dbadmin.t_350) [numPartitions=100]
+- LogicalRDD [ind#45, country#46, population#47]
注意:由于计划使用RepartitionByExpression
节点,因此必须是逻辑查询计划。
逻辑查询计划中的项目节点代表Project
Unary逻辑运算符,并且只要您明确或隐式使用某种投影。
引用Wikipedia的投影(关系代数):
实际上,可以大致认为是选择所有可用列的子集。
Project
节点可以明确出现在以下逻辑查询计划中:
- 数据集操作员,即
joinWith
,select
,unionByName
-
KeyValueGroupedDataset
操作员,即keys
,mapValues
- SQL的
SELECT
查询
Project
节点也可以出现分析和优化阶段。
在Spark SQL中,数据集API给出了高级操作员,例如select
,filter
或groupBy
,最终构建了结构化查询的催化剂逻辑计划。换句话说,这个看起来简单的数据集。选择操作员只是用Project
节点创建LogicalPlan
。
val query = spark.range(4).select("id")
scala> println(query.queryExecution.logical)
'Project [unresolvedalias('id, None)]
+- Range (0, 4, step=1, splits=Some(8))
(您可以使用query.explain(extended = true)
用于上述,但这会给您所有可能隐藏的4个计划)
您可以查看Dataset.select
操作员的代码。
def select(cols: Column*): DataFrame = withPlan {
Project(cols.map(_.named), logicalPlan)
}
这个外观简单的select
操作员是围绕催化剂操作员的包装器,可以建造逻辑操作员的催化剂树,以提供逻辑计划。
NOTE Spark SQL的催化剂的好是,它使用了代表逻辑运算符或逻辑运算符树的递归逻辑平面抽象。
Note 同样适用于Good Ol'SQL,在解析后,SQL文本转换为逻辑运算符的AST。请参阅下面的示例。
Project
可以来来去去,因为投影是针对输出中的列数,并且可能出现在您的计划和查询中。
催化剂DSL
您可以使用Spark SQL的 Catalyst DSL (在org.apache.spark.sql.catalyst.dsl
软件包对象中)用于使用Scala隐式转换来构建催化剂数据结构。如果您参加火花测试,那可能非常有用。
scala> spark.version
res0: String = 2.3.0-SNAPSHOT
import org.apache.spark.sql.catalyst.dsl.plans._ // <-- gives table and select
import org.apache.spark.sql.catalyst.dsl.expressions.star
val plan = table("a").select(star())
scala> println(plan.numberedTreeString)
00 'Project [*]
01 +- 'UnresolvedRelation `a`
好ol'sql
scala> spark.range(4).createOrReplaceTempView("nums")
scala> spark.sql("SHOW TABLES").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| | nums| true|
+--------+---------+-----------+
scala> spark.sql("SELECT * FROM nums").explain
== Physical Plan ==
*Range (0, 4, step=1, splits=8)
scala> spark.sql("SELECT * FROM nums").explain(true)
== Parsed Logical Plan ==
'Project [*]
+- 'UnresolvedRelation `nums`
== Analyzed Logical Plan ==
id: bigint
Project [id#40L]
+- SubqueryAlias nums
+- Range (0, 4, step=1, splits=Some(8))
== Optimized Logical Plan ==
Range (0, 4, step=1, splits=Some(8))
== Physical Plan ==
*Range (0, 4, step=1, splits=8)