火花是否在内存中加载基本RDD



我是Spark的新手。需要帮助了解火花的工作原理。假设README.md存储在 3 个节点的 HDFS 128 块中,我正在使用 Spark shell 来处理它。

val textFile = sc.textFile("README.md")
val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark.first()

在上述情况下,执行将由第 3 行触发。

Spark 是否会在 HDFS 节点的 RAM 中完成 3 次README.md拆分,然后过滤linesWithSpark并在内存中保留片刻。并从linesWithSpark发送第一行(从第一次拆分)?或者它只会从HDFS Node磁盘的Split1中提取带有"Spark"的第一行并将其发送到驱动程序。

如果我将第二行更改为

val linesWithSpark = textFile.filter(line => line.contains("Spark")).cache()

让我们从一个简单的实验开始。首先让我们加载数据并检查其分布:

val textFile = sc.textFile("README.md", 2)
textFile.glom.map(_.size).collect
// Array[Int] = Array(54, 41)

正如我们可以怀疑的那样,简单的filter只生成一个任务:

textFile.filter(line => line.contains("Spark")).toDebugString
// String = 
// (2) MapPartitionsRDD[11] at filter at <console>:30 []
//  |  MapPartitionsRDD[8] at textFile at <console>:27 []
//  |  README.md HadoopRDD[7] at textFile at <console>:27 []

现在,让我们在没有 out cache的情况下运行此作业并收集一些诊断信息:

val cnt = sc.accumulator(0L, "cnt")
val linesWithSpark = textFile.filter(line => {
  cnt += 1L
  line.contains("Spark")
})
linesWithSpark.first()
// String = # Apache Spark
cnt.value
/// Long = 1

如您所见,没有缓存的作业将仅处理一条记录。发生这种情况是因为first是作为take(1)执行的。在第一次迭代中,take仅在一个分区上运行作业,并在其迭代器上使用it.take(left),其中left等于 1。

由于Iterators很懒,我们的程序在处理第一行后立即返回。如果第一个分区未提供所需的结果take则在每次迭代时迭代处理的分区数量递增。

接下来,让我们对缓存重复相同的实验:

val cacheCntCache = sc.accumulator(0L, "cacheCnt")
val linesWithSparkCached = textFile.filter(line => {
  cacheCntCache  += 1L
  line.contains("Spark")
}).cache()
linesWithSparkCached.first()
// String = # Apache Spark
cacheCntCache.value
// Long = 54

此外,让我们检查存储信息:

sc.getRDDStorageInfo
// Array[org.apache.spark.storage.RDDInfo] = Array(
//   RDD "MapPartitionsRDD" (12)
//   StorageLevel: StorageLevel(false, true, false, true, 1); 
//   CachedPartitions: 1; TotalPartitions: 2; MemorySize: 1768.0 B; 
//   ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B)

正如你所看到的,缓存Spark将完成处理分区并将其缓存在内存中。虽然我无法提供导致此行为的源代码的确切部分,但它看起来像是合理的优化。由于分区已加载,因此没有理由停止作业。

另请参阅:Spark RDD 上的 Lazy foreach

最新更新