1.5.1| Spark Streaming | NullPointerException with SQL creat



我使用的是Spark 1.5.1。

在流式context中,我得到如下的SQLContext

SQLContext sqlContext = SQLContext.getOrCreate(records.context()); DataFrame dataFrame = sqlContext.createDataFrame(record, SchemaRecord.class); dataFrame.registerTempTable("records");

记录是一个JavaRDD每个记录具有以下结构

public class SchemaRecord implements Serializable {
private static final long serialVersionUID = 1L; 
private String msisdn; 
private String application_type; 
//private long uplink_bytes = 0L;
}

当msisdn和application_type等字段类型仅为字符串时,一切正常。

当我添加另一个字段,如Long类型的uplink_bytes时,我得到createDataFrame 出现以下NullPointer异常

Exception in thread "main" java.lang.NullPointerException
at org.spark-project.guava.reflect.TypeToken.method(TypeToken.java:465)
at 
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:103)
at 
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:102)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.sql.
catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:102)
at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:47)
at org.apache.spark.sql.SQLContext.getSchema(SQLContext.scala:1031)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:519)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:548)

请建议

您的问题可能是您的模型类不是一个干净的JavaBean。目前,Spark没有代码来处理有setter但没有getter方法的属性。你可以简单地尝试这样的东西来检查Spark是如何理解你的类的:

PropertyDescriptor[] props = Introspector.getBeanInfo(YourClass.class).getPropertyDescriptors();
for(PropertyDescriptor prop:props) {
    System.out.println(prop.getDisplayName());
    System.out.println("t"+prop.getReadMethod());
    System.out.println("t"+prop.getWriteMethod());
}

内省器还将只有setter的字段识别为preoperties,从而在Spark中抛出NullPointerException。

以下是我尝试过的,并且成功了:-

这里是POJO,它存储String、Long和Int值:-

import java.io.*;
public class TestingSQLPerson implements Serializable {
// Here is Data in a comma Separated file: -
// Sumit,20,123455
// Ramit,40,12345
private String name;
private int age;
private Long testL;
public Long getTestL() {
    return testL;
}
public void setTestL(Long testL) {
    this.testL = testL;
}
public String getName() {
    return name;
}
public void setName(String name) {
    this.name = name;
}
public int getAge() {
    return age;
}
public void setAge(int age) {
    this.age = age;
}
}

下面是Java中的Spark SQL代码:-

import org.apache.spark.*;
import org.apache.spark.sql.*;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
public class TestingLongSQLTypes {
public static void main(String[] args) {
    SparkConf javaConf = new SparkConf();
    javaConf.setAppName("Test Long TTyypes");
    JavaSparkContext javaCtx = new JavaSparkContext(javaConf);

    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(javaCtx);
    String dataFile = "file:///home/ec2-user/softwares/crime-data/testfile.txt";
    JavaRDD<TestingSQLPerson> people = javaCtx.textFile(dataFile).map(
      new Function<String, TestingSQLPerson>() {
        public TestingSQLPerson call(String line) throws Exception {
          String[] parts = line.split(",");
          TestingSQLPerson person = new TestingSQLPerson();
          person.setName(parts[0]);
          person.setAge(Integer.parseInt(parts[1].trim()));
          person.setTestL(Long.parseLong(parts[2].trim()));
          return person;
        }
      });
    // Apply a schema to an RDD of JavaBeans and register it as a table.
    DataFrame schemaPeople = sqlContext.createDataFrame(people, TestingSQLPerson.class);
    schemaPeople.registerTempTable("TestingSQLPerson");
    schemaPeople.printSchema();
    schemaPeople.show();

}
}

上面的所有工作,最后在驱动程序控制台上,我可以看到没有任何异常错误的结果。@Yukti-在你的情况下,如果你遵循上面例子中定义的相同步骤,它也应该起作用。如果有任何偏差,请告诉我,我可以试着帮助你。

相关内容

  • 没有找到相关文章

最新更新