我使用Spark 1.5.2从scala对象使用以下语法创建一个数据框架。我的目的是为单元测试创建一个数据。
class Address (first:String = null, second: String = null, zip: String = null){}
class Person (id: String = null, name: String = null, address: Seq[Address] = null){}
def test () = {
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val persons = Seq(
new Person(id = "1", name = "Salim",
address = Seq(new Address(first = "1st street"))),
new Person(name = "Sana",
address = Seq(new Address(zip = "60088")))
)
// The code can't infer schema automatically
val claimDF = sqlContext.createDataFrame(sc.parallelize(persons, 2),classOf[Person])
claimDF.printSchema() // This prints "root" not the schema of Person.
}
相反,如果我将Person和Address转换为case类,那么Spark可以使用上述语法或使用sc.parallelize(persons, 2).toDF
或sqlContext.createDataFrame(sc.parallelize(persons, 2),StructType)
自动继承模式
我不能使用case类,因为它不能容纳超过20个字段,而我在类中有很多字段。使用StructType会带来很多不便。Case类是最方便的,但是不能包含太多的属性。
请帮忙,提前感谢。
非常感谢您的输入。
我们最终使用Scala 2.11迁移到Spark 2.1, Scala 2.11支持更大的case类,所以这个问题得到了解决。
对于Spark 1.6和Scala 2.10,我最终构建Row对象和Struct类型来构建Dataframe。
val rows = Seq(Row("data"))
val aRDD = sc.parallelize(rows)
val aDF = sqlContext.createDataFrame(aRDD,getSchema())
def getSchema(): StructType= {
StructType(
Array(
StructField("jobNumber", StringType, nullable = true))
)
}
对代码进行两处更改,将使printSchema()在不使用case类的情况下发出数据框架的完整结构。
首先,正如Daniel建议的,你需要让你的类扩展scala。产品特性(痛苦,但需要下面的.toDF
方法):
class Address (first:String = null, second: String = null, zip: String = null) extends Product with Serializable
{
override def canEqual(that: Any) = that.isInstanceOf[Address]
override def productArity: Int = 3
def productElement(n: Int) = n match {
case 0 => first; case 1 => second; case 2 => zip
}
}
class Person (id: String = null, name: String = null, address: Seq[Address] = null) extends Product with Serializable
{
override def canEqual(that: Any) = that.isInstanceOf[Person]
override def productArity: Int = 3
def productElement(n: Int) = n match {
case 0 => id; case 1 => name; case 2 => address
}
}
其次,您应该使用.toDF隐式方法创建数据框架,该方法将import sqlContext.implicits._
引入范围,而不是像这样使用sqlContext.createDataFrame(..)
:
val claimDF = sc.parallelize(persons, 2).toDF
则claimf . printschema()将打印:
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- address: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- first: string (nullable = true)
| | |-- second: string (nullable = true)
| | |-- zip: string (nullable = true)
或者,你可以使用Scala 2.11.0-M3,它删除了case类的22个字段限制。