apachespark中类(任务对象)的kryo序列化在反序列化时返回null



我正在使用java spark API编写一些测试应用程序。我正在使用一个不扩展可序列化接口的类。因此,为了使应用程序正常工作,我正在使用kryo序列化程序来序列化类。但我在调试时观察到的问题是,在反序列化过程中,返回的类对象变为null,进而引发null指针异常。这似乎是一个收尾问题,事情出了问题,但不确定。由于我是这种序列化的新手,我不知道从哪里开始挖掘。

这是我正在测试的代码:

package org.apache.spark.examples;

import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;


/**
 * Spark application to test the Serialization issue in spark
 */
public class Test {
    static PrintWriter outputFileWriter;
    static FileWriter file;
    static JavaSparkContext ssc;
    public static void main(String[] args) {

        String inputFile = "/home/incubator-spark/examples/src/main/scala/org/apache/spark/examples/InputFile.txt";
        String master = "local";
        String jobName = "TestSerialization";
        String sparkHome = "/home/test/Spark_Installation/spark-0.7.0";
        String sparkJar = "/home/test/TestSerializationIssesInSpark/TestSparkSerIssueApp/target/TestSparkSerIssueApp-0.0.1-SNAPSHOT.jar";

        SparkConf conf = new SparkConf();
        conf.set("spark.closure.serializer","org.apache.spark.serializer.KryoSerializer");
        conf.set("spark.kryo.registrator", "org.apache.spark.examples.MyRegistrator");
        // create the Spark context
        if(master.equals("local")){
            ssc = new JavaSparkContext("local", jobName,conf);
            //ssc = new JavaSparkContext("local", jobName);
        } else {
            ssc = new JavaSparkContext(master, jobName, sparkHome, sparkJar);
        }
        JavaRDD<String> testData = ssc.textFile(inputFile).cache();
        final NotSerializableJavaClass notSerializableTestObject= new NotSerializableJavaClass("Hi ");
        @SuppressWarnings({ "serial", "unchecked"})
        JavaRDD<String> classificationResults = testData.map(
                new Function<String, String>() {
                    @Override
                    public String call(String inputRecord) throws Exception {                   
                        if(!inputRecord.isEmpty()) {
                            //String[] pointDimensions = inputRecord.split(",");
                            String result = "";
                            try {
                                FileWriter file = new FileWriter("/home/test/TestSerializationIssesInSpark/results/test_result_" + (int) (Math.random() * 100));
                                PrintWriter outputFile = new PrintWriter(file); 
                                InetAddress ip;
                                ip = InetAddress.getLocalHost();
                                outputFile.println("IP of the server: " + ip);
                                result = notSerializableTestObject.testMethod(inputRecord);
                                outputFile.println("Result: " + result);
                                outputFile.flush();
                                outputFile.close();
                                file.close();
                            } catch (UnknownHostException e) {
                                e.printStackTrace();
                            }
                            catch (IOException e1) {
                                e1.printStackTrace();
                            } 
                            return result;
                        } else {
                            System.out.println("End of elements in the stream.");
                            String result = "End of elements in the input data";
                            return result;
                        }
                    }
                }).cache(); 
        long processedRecords = classificationResults.count();
        ssc.stop();
        System.out.println("sssssssssss"+processedRecords);
    }
}

这是KryoRegistor类

package org.apache.spark.examples;
import org.apache.spark.serializer.KryoRegistrator;
import com.esotericsoftware.kryo.Kryo;
public class MyRegistrator implements KryoRegistrator {
    public void registerClasses(Kryo kryo) {
        kryo.register(NotSerializableJavaClass.class);
    }
}

这是我正在序列化的类:

package org.apache.spark.examples;
public class NotSerializableJavaClass {
    public String testVariable;
    public NotSerializableJavaClass(String testVariable) {
        super();
        this.testVariable = testVariable;
    }
    public String testMethod(String vartoAppend){
        return this.testVariable + vartoAppend;
    }
}
这是因为spark.closure.serializer只支持Java序列化程序。看见http://spark.apache.org/docs/latest/configuration.html关于spark.closure.serializer

最新更新