Spark: SQL Context:从Scala对象中创建数据框架



我使用Spark 1.5.2从scala对象使用以下语法创建一个数据框架。我的目的是为单元测试创建一个数据。

class Address (first:String = null, second: String = null, zip: String = null){}
class Person (id: String = null, name: String = null, address: Seq[Address] = null){}

def test () = {
  val sqlContext = new SQLContext(sc)
  import sqlContext.implicits._
  val persons = Seq(
    new Person(id = "1", name = "Salim", 
      address = Seq(new Address(first = "1st street"))),
    new Person(name = "Sana",
      address = Seq(new Address(zip = "60088")))
  )
  // The code can't infer schema automatically
  val claimDF = sqlContext.createDataFrame(sc.parallelize(persons, 2),classOf[Person])
  claimDF.printSchema() // This prints "root" not the schema of Person.
}

相反,如果我将Person和Address转换为case类,那么Spark可以使用上述语法或使用sc.parallelize(persons, 2).toDFsqlContext.createDataFrame(sc.parallelize(persons, 2),StructType)自动继承模式

我不能使用case类,因为它不能容纳超过20个字段,而我在类中有很多字段。使用StructType会带来很多不便。Case类是最方便的,但是不能包含太多的属性。

请帮忙,提前感谢。

非常感谢您的输入。

我们最终使用Scala 2.11迁移到Spark 2.1, Scala 2.11支持更大的case类,所以这个问题得到了解决。

对于Spark 1.6和Scala 2.10,我最终构建Row对象和Struct类型来构建Dataframe。

val rows = Seq(Row("data"))
val aRDD = sc.parallelize(rows)
val aDF = sqlContext.createDataFrame(aRDD,getSchema())
def getSchema(): StructType= {
    StructType(
        Array(
            StructField("jobNumber", StringType, nullable = true))
    )
}

对代码进行两处更改,将使printSchema()在不使用case类的情况下发出数据框架的完整结构。

首先,正如Daniel建议的,你需要让你的类扩展scala。产品特性(痛苦,但需要下面的.toDF方法):

class Address (first:String = null, second: String = null, zip: String = null) extends Product with Serializable
{
  override def canEqual(that: Any) = that.isInstanceOf[Address]
  override def productArity: Int = 3
  def productElement(n: Int) = n match {
    case 0 => first; case 1 => second; case 2 => zip
  }
}
class Person (id: String = null, name: String = null, address: Seq[Address] = null) extends Product with Serializable
{
  override def canEqual(that: Any) = that.isInstanceOf[Person]
  override def productArity: Int = 3
  def productElement(n: Int) = n match {
    case 0 => id; case 1 => name; case 2 => address
  }
}

其次,您应该使用.toDF隐式方法创建数据框架,该方法将import sqlContext.implicits._引入范围,而不是像这样使用sqlContext.createDataFrame(..):

val claimDF = sc.parallelize(persons, 2).toDF

则claimf . printschema()将打印:

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- address: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- first: string (nullable = true)
 |    |    |-- second: string (nullable = true)
 |    |    |-- zip: string (nullable = true)

或者,你可以使用Scala 2.11.0-M3,它删除了case类的22个字段限制。

相关内容

  • 没有找到相关文章

最新更新