通过spark-csv README运行的示例Java代码类似于import org.apache.spark.sql.SQLContext;导入org.apache.spark.sql.types.*;
SQLContext sqlContext = new SQLContext(sc);
StructType customSchema = new StructType(
new StructField("year", IntegerType, true),
new StructField("make", StringType, true),
new StructField("model", StringType, true),
new StructField("comment", StringType, true),
new StructField("blank", StringType, true));
DataFrame df = sqlContext.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.load("cars.csv");
df.select("year", "model").write()
.format("com.databricks.spark.csv")
.option("header", "true")
.save("newcars.csv");
它并不是开箱即用编译的,所以经过一些争论,我将不正确的FooType
语法改为DataTypes.FooType
,并将StructFields作为new StructField[]
传递;编译器在StructField
的构造函数中为metadata
请求了第四个参数,但我很难找到关于它的含义的文档(javadocs描述了它的用例,但并没有真正说明如何决定在StructField构造过程中传递什么)。使用以下代码,它现在运行,直到出现任何副作用方法,如collect()
:
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
// Read features.
System.out.println("Reading features from " + args[0]);
StructType featuresSchema = new StructType(new StructField[] {
new StructField("case_id", DataTypes.StringType, false, null),
new StructField("foo", DataTypes.DoubleType, false, null)
});
DataFrame features = sqlContext.read()
.format("com.databricks.spark.csv")
.schema(featuresSchema)
.load(args[0]);
for (Row r : features.collect()) {
System.out.println("Row: " + r);
}
我得到以下异常:
Exception in thread "main" java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.AttributeReference.hashCode(namedExpressions.scala:202)
at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210)
at scala.collection.immutable.HashSet.elemHashCode(HashSet.scala:65)
at scala.collection.immutable.HashSet.computeHash(HashSet.scala:74)
at scala.collection.immutable.HashSet.$plus(HashSet.scala:56)
at scala.collection.immutable.HashSet.$plus(HashSet.scala:59)
at scala.collection.immutable.Set$Set4.$plus(Set.scala:127)
at scala.collection.immutable.Set$Set4.$plus(Set.scala:121)
at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:24)
at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:22)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
at scala.collection.SetLike$class.map(SetLike.scala:93)
at scala.collection.AbstractSet.map(Set.scala:47)
at org.apache.spark.sql.catalyst.expressions.AttributeSet.foreach(AttributeSet.scala:114)
at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:105)
at org.apache.spark.sql.catalyst.expressions.AttributeSet.size(AttributeSet.scala:56)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:307)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:282)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:56)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:926)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:924)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:930)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:930)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
...
知道怎么了吗?
README似乎已经过时了,需要对Java示例进行一些重要的编辑。我找到了添加了元数据字段的实际JIRA,它指出了Scala案例中默认Map.empty
值的使用,尽管输入参数缺乏相同的默认值,但编写文档的人一定只是直接将Scala翻译成了Java。
在SparkSQL代码的1.5分支中,我们可以看到它引用metadata.hashCode()
而不进行检查,这就是导致NullPointerException
的原因。Metadata.empty()方法的存在,再加上关于在Scala中使用空映射作为默认映射的讨论,似乎意味着正确的实现是继续并传递Metadata.empty()
,如果你不在乎的话
SQLContext sqlContext = new SQLContext(sc);
StructType customSchema = new StructType(new StructField[] {
new StructField("year", DataTypes.IntegerType, true, Metadata.empty()),
new StructField("make", DataTypes.StringType, true, Metadata.empty()),
new StructField("model", DataTypes.StringType, true, Metadata.empty()),
new StructField("comment", DataTypes.StringType, true, Metadata.empty()),
new StructField("blank", DataTypes.StringType, true, Metadata.empty())
});
DataFrame df = sqlContext.read()
.format("com.databricks.spark.csv")
.schema(customSchema)
.option("header", "true")
.load("cars.csv");
df.select("year", "model").write()
.format("com.databricks.spark.csv")
.option("header", "true")
.save("newcars.csv");
即使我也会遇到同样的异常。我通过提供元数据修复了它。
所以像一样更改代码
StructType customSchema = new StructType(
new StructField("year", IntegerType, true,Metadata.empty()),
new StructField("make", StringType, true,Metadata.empty()),
new StructField("model", StringType, true,Metadata.empty()),
new StructField("comment", StringType, true,Metadata.empty()),
new StructField("blank", StringType, true,Metadata.empty()));
这将修复