Apache Spark map function org.apache.spark.SparkException:任务



我正在学习Apache Spark,我正在使用Java 8和Spark Core 2.3.2。

我发现当我在RDD上使用map函数时,它仅在使用Lambda表达式时才有效。

所以这有效:

JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
JavaRDD<Integer> result = rdd.map(x -> x*x );

但这不会,并抛出一个org.apache.spark.SparkException:任务不可序列化

JavaRDD<Integer> result = rdd.map(new Function<Integer, Integer>() {
public Integer call(Integer x) { return x*x; }
});

有人可以解释一下为什么吗? 谢谢

当您声明该new Function时,它包含对包含它的类的引用。当 Spark 尝试将新的匿名函数实例发送给工作线程时,它也尝试序列化包含类,但显然该类没有实现可序列化或具有其他不可序列化的成员。您可能会遇到类似object not serializable (class: YourClass, value: YourClass@e49bf8a)的错误,其中"YourClass"是包含函数声明的类。

如果改为将函数声明为类的静态成员:

static Function<Integer, Integer> f = new Function<Integer, Integer>() {
public Integer call(Integer x) {
return x * x;
}
};

并将其传递给您的地图函数:

JavaRDD<Integer> result = rdd.map(f);

那你可能没事了。我通常尝试将我将在此类转换中使用的所有函数声明为静态函数(如果它们太大而无法使用 lambda 形式(,因此当我只想要一个函数时,我不会意外地序列化整个类。

相关内容

最新更新