我正在使用Spark来处理CSV文件。最近,我用OpenCSV替换了手动CSV线路。这是简化的代码
public class Main {
public static void main(String[] args) {
CSVParser parser = new CSVParserBuilder()
.withSeparator(';')
.build();
SparkConf cfg = new SparkConf()
.setMaster("local[4]")
.setAppName("Testapp");
JavaSparkContext sc = new JavaSparkContext(cfg);
JavaRDD<String> textFile = sc.textFile("testdata.csv", 1);
List<String> categories = textFile
.map(line -> parser.parseLine(line)[10])
.collect();
System.out.println(categories);
}
}
不幸的是,代码不起作用。它产生例外
Caused by: java.io.NotSerializableException: com.opencsv.CSVParser
Serialization stack:
- object not serializable (class: com.opencsv.CSVParser, value: com.opencsv.CSVParser@1290c49)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class test.Main, functionalInterfaceMethod=org/apache/spark/api/java/function/Function.call:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic test/Main.lambda$main$49bd2722$1:(Lcom/opencsv/CSVParser;Ljava/lang/String;)Ljava/lang/String;, instantiatedMethodType=(Ljava/lang/String;)Ljava/lang/String;, numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class test.Main$$Lambda$19/429639728, test.Main$$Lambda$19/429639728@72456279)
- field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 12 more
似乎Spark试图序列化lambda表达式,而Lamba表达式一直参考parser
,这会导致上述错误。
问题是:有什么方法可以避免例外并使用lambda表达式中的不可隔离的库来激发?我真的不想实现自己的CSV解析器。
Spark支持CSV文件
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Dataset;
Dataset<Row> df = spark.read().format("csv")
.option("sep", ";")
.option("header", "true") //or "false" if no headers
.load("filename.csv");
编辑(促进评论到主要答案)
如果您真的需要它,则可以使用df.javaRDD()
从数据框架获得RDD虽然最好使用数据集/dataFrame API(例如,请参见此处)
我意识到我的问题有非常简单的解决方案。任何引起序列化问题的外部库用法都可以用静态方法包裹。对parser
的引用是通过方法parse
隐藏的。这种方法显然不是一个完美的解决方案,而是起作用。
public class Main {
private static CSVParser parser = new CSVParserBuilder()
.withSeparator(';')
.build();
public static void main(String[] args) {
SparkConf cfg = new SparkConf()
.setMaster("local[4]")
.setAppName("Testapp");
JavaSparkContext sc = new JavaSparkContext(cfg);
JavaRDD<String> textFile = sc.textFile("testdata.csv", 1);
List<String> categories = textFile
.map(line -> parse(line)[0])
.collect();
System.out.println(categories);
}
static String[] parse(String line) throws IOException {
return parser.parseLine(line);
}
}