我在运行齐柏林飞艇笔记本时使用com.datastax.spark:spark-cassandra-connector_2.11:2.4.0
,但不了解 Spark 中两个操作之间的区别。一个操作需要大量时间进行计算,第二个操作立即执行。有人可以向我解释两个操作之间的区别:
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._
case class SomeClass(val someField:String)
val timelineItems = spark.read.format("org.apache.spark.sql.cassandra").options(scala.collection.immutable.Map("spark.cassandra.connection.host" -> "127.0.0.1", "table" -> "timeline_items", "keyspace" -> "timeline" )).load()
//some simplified code:
val timelineRow = timelineItems
.map(x => {SomeClass("test")})
.filter(x => x != null)
.toDF()
.limit(4)
//first operation (takes a lot of time. It seems spark iterates through all items in Cassandra and doesn't use laziness with limit 4)
println(timelineRow.count()) //return: 4
//second operation (executes immediately); 300 - just random number which doesn't affect the result
println(timelineRow.take(300).length) //return: 4
您看到的是Limit
(类似转换的操作)和CollectLimit
(类似操作的操作)的实现之间的差异。然而,时间的差异具有高度误导性,在一般情况下不是你能预料到的。
首先,让我们创建一个 MCVE
spark.conf.set("spark.sql.files.maxPartitionBytes", 500)
val ds = spark.read
.text("README.md")
.as[String]
.map{ x => {
Thread.sleep(1000)
x
}}
val dsLimit4 = ds.limit(4)
确保我们从干净的石板开始:
spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
Boolean = true
调用count
:
dsLimit4.count()
并查看执行计划(来自Spark UI):
== Parsed Logical Plan ==
Aggregate [count(1) AS count#12L]
+- GlobalLimit 4
+- LocalLimit 4
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
+- DeserializeToObject cast(value#0 as string).toString, obj#5: java.lang.String
+- Relation[value#0] text
== Analyzed Logical Plan ==
count: bigint
Aggregate [count(1) AS count#12L]
+- GlobalLimit 4
+- LocalLimit 4
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
+- DeserializeToObject cast(value#0 as string).toString, obj#5: java.lang.String
+- Relation[value#0] text
== Optimized Logical Plan ==
Aggregate [count(1) AS count#12L]
+- GlobalLimit 4
+- LocalLimit 4
+- Project
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
+- DeserializeToObject value#0.toString, obj#5: java.lang.String
+- Relation[value#0] text
== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#12L])
+- *(2) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#15L])
+- *(2) GlobalLimit 4
+- Exchange SinglePartition
+- *(1) LocalLimit 4
+- *(1) Project
+- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- *(1) MapElements <function1>, obj#6: java.lang.String
+- *(1) DeserializeToObject value#0.toString, obj#5: java.lang.String
+- *(1) FileScan text [value#0] Batched: false, Format: Text, Location: InMemoryFileIndex[file:/path/to/README.md], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>
核心组件是
+- *(2) GlobalLimit 4
+- Exchange SinglePartition
+- *(1) LocalLimit 4
这表明我们可以期待具有多个阶段的广泛操作。我们可以看到单个作业
spark.sparkContext.statusTracker.getJobIdsForGroup(null)
Array[Int] = Array(0)
具有两个阶段
spark.sparkContext.statusTracker.getJobInfo(0).get.stageIds
Array[Int] = Array(0, 1)
与八个
spark.sparkContext.statusTracker.getStageInfo(0).get.numTasks
Int = 8
和一
spark.sparkContext.statusTracker.getStageInfo(1).get.numTasks
Int = 1
任务分别。
现在让我们将其与
dsLimit4.take(300).size
产生以下
== Parsed Logical Plan ==
GlobalLimit 300
+- LocalLimit 300
+- GlobalLimit 4
+- LocalLimit 4
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
+- DeserializeToObject cast(value#0 as string).toString, obj#5: java.lang.String
+- Relation[value#0] text
== Analyzed Logical Plan ==
value: string
GlobalLimit 300
+- LocalLimit 300
+- GlobalLimit 4
+- LocalLimit 4
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
+- DeserializeToObject cast(value#0 as string).toString, obj#5: java.lang.String
+- Relation[value#0] text
== Optimized Logical Plan ==
GlobalLimit 4
+- LocalLimit 4
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
+- DeserializeToObject value#0.toString, obj#5: java.lang.String
+- Relation[value#0] text
== Physical Plan ==
CollectLimit 4
+- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- *(1) MapElements <function1>, obj#6: java.lang.String
+- *(1) DeserializeToObject value#0.toString, obj#5: java.lang.String
+- *(1) FileScan text [value#0] Batched: false, Format: Text, Location: InMemoryFileIndex[file:/path/to/README.md], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>
虽然全球和局部限制仍然存在,但中间没有交换。因此,我们可以期待单阶段操作。请注意,规划师将限制范围缩小到更严格的值。
正如预期的那样,我们看到了一个新工作:
spark.sparkContext.statusTracker.getJobIdsForGroup(null)
Array[Int] = Array(1, 0)
仅生成一个阶段:
spark.sparkContext.statusTracker.getJobInfo(1).get.stageIds
Array[Int] = Array(2)
只有一个任务
spark.sparkContext.statusTracker.getStageInfo(2).get.numTasks
Int = 1
这对我们意味着什么?
- 在
count
的情况下,Spark 使用了广泛的转换,实际上在每个分区上应用了LocalLimit
,并随机播放部分结果以执行GlobalLimit
。 - 在
take
情况下,Spark 使用窄转换并仅在第一个分区上评估LocalLimit
。
显然,后一种方法不适用于第一个分区中的值数低于请求的限制。
val dsLimit105 = ds.limit(105) // There are 105 lines
在这种情况下,第一个count
将使用与以前完全相同的逻辑(我鼓励您根据经验确认这一点),但take
将采取完全不同的路径。到目前为止,我们只触发了两个作业:
spark.sparkContext.statusTracker.getJobIdsForGroup(null)
Array[Int] = Array(1, 0)
现在如果我们执行
dsLimit105.take(300).size
你会看到它还需要 3 个工作:
spark.sparkContext.statusTracker.getJobIdsForGroup(null)
Array[Int] = Array(4, 3, 2, 1, 0)
这到底是怎么回事呢?如前所述,在一般情况下,评估单个分区不足以满足限制。在这种情况下,Spark 会迭代评估分区上的LocalLimit
,直到满足GlobalLimit
,每次迭代中占用的分区数都会增加。
此类策略可能会对性能产生重大影响。单独启动 Spark 作业并不便宜,在这种情况下,当上游对象是广泛转换的结果时,事情可能会变得非常丑陋(在最好的情况下,您可以读取随机文件,但如果这些文件由于某种原因丢失,Spark 可能会被迫重新执行所有依赖项)。
总结一下:
take
是一个动作,在上游进程较窄的特定情况下可能会短路,并且使用前几个分区可以满足LocalLimits
GlobalLimits
。limit
是一个变换,并且总是评估所有LocalLimits
,因为没有迭代的逃生舱口。
虽然在特定情况下,一个可以表现得比另一个更好,但通常不可交换,也不能保证更好的性能。