为什么RDD.foreach在"SparkException: This RDD lacks a SparkContext"时失败?



我有一个数据集(作为RDD),我通过使用不同的filter运算符将其分为4个RDD。

val RSet = datasetRdd.
flatMap(x => RSetForAttr(x, alLevel, hieDict)).
map(x => (x, 1)).
reduceByKey((x, y) => x + y)
val Rp:RDD[(String, Int)] = RSet.filter(x => x._1.split(",")(0).equals("Rp"))
val Rc:RDD[(String, Int)] = RSet.filter(x => x._1.split(",")(0).equals("Rc"))
val RpSv:RDD[(String, Int)] = RSet.filter(x => x._1.split(",")(0).equals("RpSv"))
val RcSv:RDD[(String, Int)] = RSet.filter(x => x._1.split(",")(0).equals("RcSv"))

我将RpRpSV发送到以下功能calculateEntropy

def calculateEntropy(Rx: RDD[(String, Int)], RxSv: RDD[(String, Int)]): Map[Int, Map[String, Double]] = {
RxSv.foreach{item => {
val string = item._1.split(",")
val t = Rx.filter(x => x._1.split(",")(2).equals(string(2)))
.
.
}
}

我有两个问题:

1-当我循环操作时RxSv为:

RxSv.foreach{item=> { ... }}

它收集分区的所有项目,但我只想收集我所在的分区。如果你说那个用户map功能,但我不会改变RDD的任何内容。

因此,当我在具有 4 个工作线程和一个驱动程序的群集上运行代码时,数据集分为 4 个分区,每个工作线程运行代码。但是例如,我使用我在代码中指定的foreach循环。驱动程序从工作人员收集所有数据。

2-我在此代码上遇到了问题

val t = Rx.filter(x => x._1.split(",")(2).equals(abc(2)))

错误 :

org.apache.spark.SparkException: This RDD lacks a SparkContext.


它可能发生在以下情况下:

(1) 驱动程序不会调用RDDtransformationsactions,而是在其他转换中调用;
例如,rdd1.map(x => rdd2.values.count() * x)无效,因为无法在rdd1.maptransformation内执行值transformationcountaction。有关详细信息,请参阅 SPARK-5063。

(2) 当Spark Streaming作业从检查点恢复时,如果在DStream操作中使用对未由流作业定义的RDD的引用,则会命中此异常。有关详细信息,请参阅 SPARK-13758。

首先,我强烈建议使用cache运算符缓存第一个RDD。

RSet.cache

这将避免每次filter其他RDD时扫描和转换数据集:RpRcRpSvRcSv

引用缓存的 scaladoc:

cache()使用默认存储级别 (MEMORY_ONLY) 保留此 RDD。

性能应该提高。


其次,我会非常小心地使用术语"分区"来指代过滤的RDD,因为该术语在Spark中具有特殊含义。

分区表示 Spark 为一个操作执行了多少任务。它们是 Spark 的提示,因此您(Spark 开发人员)可以微调您的分布式管道。

管道分布在群集节点上,每个分区方案具有一个或多个 Spark 执行程序。如果您决定在 RDD 中拥有一个分区,一旦您在该 RDD 上执行操作,您将在一个执行器上拥有一个任务。

filter转换不会更改分区数(换句话说,它保留分区)。分区数,即任务数,正好是RSet的分区数。


1-当我在RxSv上循环操作时,它会收集分区的所有项目,但我只想在我所在的分区

你是。不用担心,因为Spark将在数据所在的执行器上执行任务。foreach是一个操作,它收集项目,但描述在执行程序上运行的计算,数据分布在群集中(作为分区)。

如果要在每个分区一次处理所有项目,请使用 foreachPartition:

foreachPartition将函数 f 应用于此 RDD 的每个分区。


2-我在此代码上遇到了问题

在以下代码行中:

RxSv.foreach{item => {
val string = item._1.split(",")
val t = Rx.filter(x => x._1.split(",")(2).equals(string(2)))

您正在执行foreach操作,该操作又使用RDD[(String, Int)]Rx。这是不允许的(如果可能的话,不应该被编译)。

该行为的原因是 RDD 是一种数据结构,它只是描述当执行操作并驻留在驱动程序(业务流程协调程序)上时数据集会发生什么。驱动程序使用数据结构来跟踪数据源、转换和分区数。

当驱动程序在执行器上生成任务时,作为实体的 RDD 消失 (= 消失)。

当任务运行时,没有任何可用的东西可以帮助他们知道如何运行作为其工作一部分的RDD。因此错误。Spark对此非常谨慎,并在执行任务后导致问题之前检查此类异常。

最新更新