假设我有某种转换,作为下面的片段,我想在其中连接从spark中的同一父帧派生的两个数据帧。DAG将如何针对这些计算进行优化或不进行优化,并且是否保持任何用途的初始读取值?
val dataFrame = readDataframe() // .persist() ?
val derived1 = dataFrame.transform(/* tranformation1 */)
val derived2 = dataFrame.transform(/* tranformation2 */)
val result = derived1.join(derived2, /* condition*/)
result.show()
persist
在这里没有用处,因为由于延迟求值,在整个代码中没有实际操作。下面的物理计划显示persist
ing根本没有优化物理计划。
但是,如果在转换过程中调用类似.count()
或.show()
的东西,则强制Spark评估查询,persist
在这种情况下会有所帮助。
无持久性:
scala> val df = spark.range(10)
df: org.apache.spark.sql.Dataset[Long] = [id: bigint]
scala> val df1 = df.transform(x => x.select($"id", $"id" * 2))
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, (id * 2): bigint]
scala> val df2 = df.transform(x => x.select($"id", $"id" + 2))
df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, (id + 2): bigint]
scala> val result = df1.join(df2, "id")
result: org.apache.spark.sql.DataFrame = [id: bigint, (id * 2): bigint ... 1 more field]
scala> result.explain()
== Physical Plan ==
*(2) Project [id#8L, (id * 2)#15L, (id + 2)#18L]
+- *(2) BroadcastHashJoin [id#8L], [id#21L], Inner, BuildRight
:- *(2) Project [id#8L, (id#8L * 2) AS (id * 2)#15L]
: +- *(2) Range (0, 10, step=1, splits=24)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#39]
+- *(1) Project [id#21L, (id#21L + 2) AS (id + 2)#18L]
+- *(1) Range (0, 10, step=1, splits=24)
带有persistent:
scala> val df0 = df.persist()
df0: df.type = [id: bigint]
scala> val df1 = df0.transform(x => x.select($"id", $"id" * 2))
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, (id * 2): bigint]
scala> val df2 = df0.transform(x => x.select($"id", $"id" + 2))
df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, (id + 2): bigint]
scala> val result = df1.join(df2, "id")
result: org.apache.spark.sql.DataFrame = [id: bigint, (id * 2): bigint ... 1 more field]
scala> result.explain()
== Physical Plan ==
*(2) Project [id#8L, (id * 2)#50L, (id + 2)#53L]
+- *(2) BroadcastHashJoin [id#8L], [id#56L], Inner, BuildRight
:- *(2) Project [id#8L, (id#8L * 2) AS (id * 2)#50L]
: +- *(2) ColumnarToRow
: +- InMemoryTableScan [id#8L]
: +- InMemoryRelation [id#8L], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *(1) Range (0, 10, step=1, splits=24)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#100]
+- *(1) Project [id#56L, (id#56L + 2) AS (id + 2)#53L]
+- *(1) ColumnarToRow
+- InMemoryTableScan [id#56L]
+- InMemoryRelation [id#56L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Range (0, 10, step=1, splits=24)