以下是导致java.io.notserializable Exception的代码。
JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD();
JavaRDD<String> stringRdd = rddToWrite.map(new Function<Row, String>() {
/**
* Serial Version Id
*/
private static final long serialVersionUID = 6766320395808127072L;
@Override
public String call(Row row) throws Exception {
return row.mkString(dataFormat.getDelimiter());
}
});
但是,当我执行以下操作时,任务已成功地序列化:
JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD();
List<String> dataList = rddToWrite.collect().stream().parallel()
.map(row -> row.mkString(dataFormat.getDelimiter()))
.collect(Collectors.<String>toList());
JavaSparkContext javaSparkContext = new JavaSparkContext(sessionContext.getSparkContext());
JavaRDD<String> stringRDD = javaSparkContext.parallelize(dataList);
任何人都可以帮我指出我在这里做错了什么?
编辑:DataFormat是一类的私人成员字段,其中包含此代码的功能。这是类dataformat的对象,该对象定义了两个字段,即Spark DataFormat(例如" com.databricks.spark.csv")和定界符(例如" t")。
new Function ...
创建的匿名类需要对封闭实例的引用,并且序列化该函数需要序列化封闭实例,包括dataFormat
和所有其他字段。如果该类未标记为Serializable
,或者具有任何不可序列化的非transient
字段,则它将无法使用。即使这样做,它的表现也比必要的更差。
不幸的是,要完全解决此问题,您需要创建一个命名静态内部类(或仅是单独的类),甚至不能是局部性(因为Java中的匿名或本地类都不能静态):
static class MyFunction extends Function<Row, String> {
private String delimiter;
private static final long serialVersionUID = 6766320395808127072L;
MyFunction(String delimiter) {
this.delimiter = delimiter;
}
@Override
public String call(Row row) throws Exception {
return row.mkString(delimiter);
}
}
,然后
JavaRDD<String> stringRdd = rddToWrite.map(new MyFunction(dataFormat.getDelimiter()));
访问dataFormat
时,它表示this.dataFormat
。因此,Spark将尝试序列化this
并遇到NotSerializableException
。
尝试制作一个本地副本,例如:
DataFormat dataformat = this.dataformat;
JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD();
JavaRDD<String> stringRdd = rddToWrite.map(new Function<Row, String>() ...
有关更多信息,请参阅http://spark.apache.org/docs/latest/programprammming-guide.html#passing-paspasing-functions-to-spark