在映射Javardd时获取Java.io.Notserializable Exception



以下是导致java.io.notserializable Exception的代码。

    JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD();
    JavaRDD<String> stringRdd = rddToWrite.map(new Function<Row, String>() {
        /**
         * Serial Version Id
         */
        private static final long serialVersionUID = 6766320395808127072L;
        @Override
        public String call(Row row) throws Exception {
            return row.mkString(dataFormat.getDelimiter());
        }
    });

但是,当我执行以下操作时,任务已成功地序列化:

JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD();
List<String> dataList = rddToWrite.collect().stream().parallel()
                           .map(row -> row.mkString(dataFormat.getDelimiter()))
                           .collect(Collectors.<String>toList());
JavaSparkContext javaSparkContext = new JavaSparkContext(sessionContext.getSparkContext());
JavaRDD<String> stringRDD = javaSparkContext.parallelize(dataList);

任何人都可以帮我指出我在这里做错了什么?

编辑:DataFormat是一类的私人成员字段,其中包含此代码的功能。这是类dataformat的对象,该对象定义了两个字段,即Spark DataFormat(例如" com.databricks.spark.csv")和定界符(例如" t")。

new Function ...创建的匿名类需要对封闭实例的引用,并且序列化该函数需要序列化封闭实例,包括dataFormat 所有其他字段。如果该类未标记为Serializable,或者具有任何不可序列化的非transient字段,则它将无法使用。即使这样做,它的表现也比必要的更差。

不幸的是,要完全解决此问题,您需要创建一个命名静态内部类(或仅是单独的类),甚至不能是局部性(因为Java中的匿名或本地类都不能静态):

static class MyFunction extends Function<Row, String> {
    private String delimiter;
    private static final long serialVersionUID = 6766320395808127072L;
    MyFunction(String delimiter) {
        this.delimiter = delimiter;
    }
    @Override
    public String call(Row row) throws Exception {
        return row.mkString(delimiter);
    }
}

,然后

JavaRDD<String> stringRdd = rddToWrite.map(new MyFunction(dataFormat.getDelimiter()));

访问dataFormat时,它表示this.dataFormat。因此,Spark将尝试序列化this并遇到NotSerializableException

尝试制作一个本地副本,例如:

DataFormat dataformat = this.dataformat;
JavaRDD<Row> rddToWrite = dataToWrite.toJavaRDD();
JavaRDD<String> stringRdd = rddToWrite.map(new Function<Row, String>() ...

有关更多信息,请参阅http://spark.apache.org/docs/latest/programprammming-guide.html#passing-paspasing-functions-to-spark

最新更新