火花计数与采取和长度



我在运行齐柏林飞艇笔记本时使用com.datastax.spark:spark-cassandra-connector_2.11:2.4.0,但不了解 Spark 中两个操作之间的区别。一个操作需要大量时间进行计算,第二个操作立即执行。有人可以向我解释两个操作之间的区别:

import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._
case class SomeClass(val someField:String)
val timelineItems = spark.read.format("org.apache.spark.sql.cassandra").options(scala.collection.immutable.Map("spark.cassandra.connection.host" -> "127.0.0.1", "table" -> "timeline_items", "keyspace" -> "timeline" )).load()
//some simplified code:
val timelineRow = timelineItems
.map(x => {SomeClass("test")})
.filter(x => x != null)
.toDF()
.limit(4)
//first operation (takes a lot of time. It seems spark iterates through all items in Cassandra and doesn't use laziness with limit 4)
println(timelineRow.count()) //return: 4
//second operation (executes immediately); 300 - just random number which doesn't affect the result
println(timelineRow.take(300).length) //return: 4

您看到的是Limit(类似转换的操作)和CollectLimit(类似操作的操作)的实现之间的差异。然而,时间的差异具有高度误导性,在一般情况下不是你能预料到的。

首先,让我们创建一个 MCVE

spark.conf.set("spark.sql.files.maxPartitionBytes", 500)
val ds = spark.read
.text("README.md")
.as[String]
.map{ x => {
Thread.sleep(1000)
x
}}
val dsLimit4 = ds.limit(4)

确保我们从干净的石板开始:

spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
Boolean = true

调用count

dsLimit4.count()

并查看执行计划(来自Spark UI):

== Parsed Logical Plan ==
Aggregate [count(1) AS count#12L]
+- GlobalLimit 4
+- LocalLimit 4
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
+- DeserializeToObject cast(value#0 as string).toString, obj#5: java.lang.String
+- Relation[value#0] text
== Analyzed Logical Plan ==
count: bigint
Aggregate [count(1) AS count#12L]
+- GlobalLimit 4
+- LocalLimit 4
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
+- DeserializeToObject cast(value#0 as string).toString, obj#5: java.lang.String
+- Relation[value#0] text
== Optimized Logical Plan ==
Aggregate [count(1) AS count#12L]
+- GlobalLimit 4
+- LocalLimit 4
+- Project
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
+- DeserializeToObject value#0.toString, obj#5: java.lang.String
+- Relation[value#0] text
== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#12L])
+- *(2) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#15L])
+- *(2) GlobalLimit 4
+- Exchange SinglePartition
+- *(1) LocalLimit 4
+- *(1) Project
+- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- *(1) MapElements <function1>, obj#6: java.lang.String
+- *(1) DeserializeToObject value#0.toString, obj#5: java.lang.String
+- *(1) FileScan text [value#0] Batched: false, Format: Text, Location: InMemoryFileIndex[file:/path/to/README.md], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>

核心组件是

+- *(2) GlobalLimit 4
+- Exchange SinglePartition
+- *(1) LocalLimit 4

这表明我们可以期待具有多个阶段的广泛操作。我们可以看到单个作业

spark.sparkContext.statusTracker.getJobIdsForGroup(null)
Array[Int] = Array(0)

具有两个阶段

spark.sparkContext.statusTracker.getJobInfo(0).get.stageIds
Array[Int] = Array(0, 1)

与八个

spark.sparkContext.statusTracker.getStageInfo(0).get.numTasks
Int = 8

和一

spark.sparkContext.statusTracker.getStageInfo(1).get.numTasks
Int = 1

任务分别。

现在让我们将其与

dsLimit4.take(300).size

产生以下

== Parsed Logical Plan ==
GlobalLimit 300
+- LocalLimit 300
+- GlobalLimit 4
+- LocalLimit 4
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
+- DeserializeToObject cast(value#0 as string).toString, obj#5: java.lang.String
+- Relation[value#0] text
== Analyzed Logical Plan ==
value: string
GlobalLimit 300
+- LocalLimit 300
+- GlobalLimit 4
+- LocalLimit 4
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
+- DeserializeToObject cast(value#0 as string).toString, obj#5: java.lang.String
+- Relation[value#0] text
== Optimized Logical Plan ==
GlobalLimit 4
+- LocalLimit 4
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#6: java.lang.String
+- DeserializeToObject value#0.toString, obj#5: java.lang.String
+- Relation[value#0] text
== Physical Plan ==
CollectLimit 4
+- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#7]
+- *(1) MapElements <function1>, obj#6: java.lang.String
+- *(1) DeserializeToObject value#0.toString, obj#5: java.lang.String
+- *(1) FileScan text [value#0] Batched: false, Format: Text, Location: InMemoryFileIndex[file:/path/to/README.md], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>

虽然全球和局部限制仍然存在,但中间没有交换。因此,我们可以期待单阶段操作。请注意,规划师将限制范围缩小到更严格的值。

正如预期的那样,我们看到了一个新工作:

spark.sparkContext.statusTracker.getJobIdsForGroup(null)
Array[Int] = Array(1, 0)

仅生成一个阶段:

spark.sparkContext.statusTracker.getJobInfo(1).get.stageIds
Array[Int] = Array(2)

只有一个任务

spark.sparkContext.statusTracker.getStageInfo(2).get.numTasks
Int = 1

这对我们意味着什么?

  • count的情况下,Spark 使用了广泛的转换,实际上在每个分区上应用了LocalLimit,并随机播放部分结果以执行GlobalLimit
  • take情况下,Spark 使用窄转换并仅在第一个分区上评估LocalLimit

显然,后一种方法不适用于第一个分区中的值数低于请求的限制。

val dsLimit105 = ds.limit(105) // There are 105 lines

在这种情况下,第一个count将使用与以前完全相同的逻辑(我鼓励您根据经验确认这一点),但take将采取完全不同的路径。到目前为止,我们只触发了两个作业:

spark.sparkContext.statusTracker.getJobIdsForGroup(null)
Array[Int] = Array(1, 0)

现在如果我们执行

dsLimit105.take(300).size

你会看到它还需要 3 个工作:

spark.sparkContext.statusTracker.getJobIdsForGroup(null)
Array[Int] = Array(4, 3, 2, 1, 0)

这到底是怎么回事呢?如前所述,在一般情况下,评估单个分区不足以满足限制。在这种情况下,Spark 会迭代评估分区上的LocalLimit,直到满足GlobalLimit,每次迭代中占用的分区数都会增加。

此类策略可能会对性能产生重大影响。单独启动 Spark 作业并不便宜,在这种情况下,当上游对象是广泛转换的结果时,事情可能会变得非常丑陋(在最好的情况下,您可以读取随机文件,但如果这些文件由于某种原因丢失,Spark 可能会被迫重新执行所有依赖项)。

总结一下

  • take是一个动作,在上游进程较窄的特定情况下可能会短路,并且使用前几个分区可以满足LocalLimitsGlobalLimits
  • limit是一个变换,并且总是评估所有LocalLimits,因为没有迭代的逃生舱口。

虽然在特定情况下,一个可以表现得比另一个更好,但通常不可交换,也不能保证更好的性能。

相关内容

  • 没有找到相关文章

最新更新