突然抛出这个RDD缺少一个SparkContext,它在每个代码都在main方法中之前工作



这是一段工作代码,但在我尝试从不同的scala object创建Sparksession后突然它不起作用

val b = a.filter { x => (!x._2._1.isEmpty) && (!x._2._2.isEmpty) } 
val primary_ke = b.map(rec => (rec._1.split(",")(0))).distinct 
for (i <- primary_key_distinct) {    
b.foreach(println)
}

错误:

ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases: 
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.

即使我撤销了它,也没有使用任何对象,也无法正常工作。

代码更新:

object try {
def main(args: Array[String]) {

val spark = SparkSession.builder().master("local").appName("50columns3nodes").getOrCreate()
var s = spark.read.csv("/home/hadoopuser/Desktop/input/source.csv").rdd.map(_.mkString(","))
var k = spark.read.csv("/home/hadoopuser/Desktop/input/destination.csv").rdd.map(_.mkString(","))
val source_primary_key = s.map(rec => (rec.split(",")(0), rec))
val destination_primary_key = k.map(rec => (rec.split(",")(0), rec))
val a = source_primary_key.cogroup(destination_primary_key).filter { x => ((x._2._1) != (x._2._2)) }
val b = a.filter { x => (!x._2._1.isEmpty) && (!x._2._2.isEmpty) } 
var extra_In_Dest = a.filter(x => x._2._1.isEmpty && !x._2._2.isEmpty).map(rec => (rec._2._2.mkString(""))) 
var extra_In_Src = a.filter(x => !x._2._1.isEmpty && x._2._2.isEmpty).map(rec => (rec._2._1.mkString(""))) 
val primary_key_distinct = b.map(rec => (rec._1.split(",")(0))).distinct 
for (i <- primary_key_distinct) {
var lengthofarray = 0
println(i)
b.foreach(println)
}
}
}

输入数据如下

s=1,david 2,ajay 3,jijo 4,abi 5,surendhar

k=1,david 2,ajay 3,jijoaa 4,abisdsdd 5,surendhar

Val a 包含 {3,(jijo,jijoaa),5(abi,abisdsdd)}

如果您仔细阅读第一条消息

(1) RDD 转换和操作不是由驱动程序调用的,而是在其他转换内部调用的;例如,rdd1.map(x

=> rdd2.values.count() * x) 无效,因为值转换和计数操作无法在 rdd1.map 转换中执行。有关详细信息,请参阅 SPARK-5063。

它明确指出,操作和转换不能在转换中执行。

primary_key_distinct是在b上完成的转换b本身就是在a上完成的转换b.foreach(println)是在primary_key_distinct转型中完成的行动

因此,如果您在驱动程序中收集bprimary_key_distinct,则代码应该正常运行

val b = a.filter { x => (!x._2._1.isEmpty) && (!x._2._2.isEmpty) }.collect

val primary_key_distinct = b.map(rec => (rec._1.split(",")(0))).distinct.collect

或者,如果您没有在另一个转换中使用操作,那么代码也应该正常运行

for (i <- 1 to 2) {
var lengthofarray = 0
println(i)
b.foreach(println)
}

我希望解释清楚。

相关内容

最新更新