我使用的是Spark 1.5.1。
在流式context
中,我得到如下的SQLContext
SQLContext sqlContext = SQLContext.getOrCreate(records.context());
DataFrame dataFrame = sqlContext.createDataFrame(record, SchemaRecord.class);
dataFrame.registerTempTable("records");
记录是一个JavaRDD每个记录具有以下结构
public class SchemaRecord implements Serializable {
private static final long serialVersionUID = 1L;
private String msisdn;
private String application_type;
//private long uplink_bytes = 0L;
}
当msisdn和application_type等字段类型仅为字符串时,一切正常。
当我添加另一个字段,如Long类型的uplink_bytes时,我得到createDataFrame 出现以下NullPointer异常
Exception in thread "main" java.lang.NullPointerException
at org.spark-project.guava.reflect.TypeToken.method(TypeToken.java:465)
at
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:103)
at
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:102)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.sql.
catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:102)
at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:47)
at org.apache.spark.sql.SQLContext.getSchema(SQLContext.scala:1031)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:519)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:548)
请建议
您的问题可能是您的模型类不是一个干净的JavaBean。目前,Spark没有代码来处理有setter但没有getter方法的属性。你可以简单地尝试这样的东西来检查Spark是如何理解你的类的:
PropertyDescriptor[] props = Introspector.getBeanInfo(YourClass.class).getPropertyDescriptors();
for(PropertyDescriptor prop:props) {
System.out.println(prop.getDisplayName());
System.out.println("t"+prop.getReadMethod());
System.out.println("t"+prop.getWriteMethod());
}
内省器还将只有setter的字段识别为preoperties,从而在Spark中抛出NullPointerException。
以下是我尝试过的,并且成功了:-
这里是POJO,它存储String、Long和Int值:-
import java.io.*;
public class TestingSQLPerson implements Serializable {
// Here is Data in a comma Separated file: -
// Sumit,20,123455
// Ramit,40,12345
private String name;
private int age;
private Long testL;
public Long getTestL() {
return testL;
}
public void setTestL(Long testL) {
this.testL = testL;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
下面是Java中的Spark SQL代码:-
import org.apache.spark.*;
import org.apache.spark.sql.*;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
public class TestingLongSQLTypes {
public static void main(String[] args) {
SparkConf javaConf = new SparkConf();
javaConf.setAppName("Test Long TTyypes");
JavaSparkContext javaCtx = new JavaSparkContext(javaConf);
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(javaCtx);
String dataFile = "file:///home/ec2-user/softwares/crime-data/testfile.txt";
JavaRDD<TestingSQLPerson> people = javaCtx.textFile(dataFile).map(
new Function<String, TestingSQLPerson>() {
public TestingSQLPerson call(String line) throws Exception {
String[] parts = line.split(",");
TestingSQLPerson person = new TestingSQLPerson();
person.setName(parts[0]);
person.setAge(Integer.parseInt(parts[1].trim()));
person.setTestL(Long.parseLong(parts[2].trim()));
return person;
}
});
// Apply a schema to an RDD of JavaBeans and register it as a table.
DataFrame schemaPeople = sqlContext.createDataFrame(people, TestingSQLPerson.class);
schemaPeople.registerTempTable("TestingSQLPerson");
schemaPeople.printSchema();
schemaPeople.show();
}
}
上面的所有工作,最后在驱动程序控制台上,我可以看到没有任何异常错误的结果。@Yukti-在你的情况下,如果你遵循上面例子中定义的相同步骤,它也应该起作用。如果有任何偏差,请告诉我,我可以试着帮助你。