在Spark数据框架中爆炸嵌套结构



我正在处理一个Databricks示例。数据框架的模式如下所示:

> parquetDF.printSchema
root
|-- department: struct (nullable = true)
|    |-- id: string (nullable = true)
|    |-- name: string (nullable = true)
|-- employees: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- firstName: string (nullable = true)
|    |    |-- lastName: string (nullable = true)
|    |    |-- email: string (nullable = true)
|    |    |-- salary: integer (nullable = true)

在示例中,他们展示了如何将employees列扩展为4个附加列:

val explodeDF = parquetDF.explode($"employees") { 
case Row(employee: Seq[Row]) => employee.map{ employee =>
  val firstName = employee(0).asInstanceOf[String]
  val lastName = employee(1).asInstanceOf[String]
  val email = employee(2).asInstanceOf[String]
  val salary = employee(3).asInstanceOf[Int]
  Employee(firstName, lastName, email, salary)
 }
}.cache()
display(explodeDF)

我如何对department列做类似的事情(即向数据框架添加两个名为"id"的额外列)和"name" ?方法不完全相同,我只能找出如何创建一个全新的数据框架使用:

val explodeDF = parquetDF.select("department.id","department.name")
display(explodeDF)

如果我尝试:

val explodeDF = parquetDF.explode($"department") { 
  case Row(dept: Seq[String]) => dept.map{dept => 
  val id = dept(0) 
  val name = dept(1)
  } 
}.cache()
display(explodeDF)

我得到了警告和错误:

<console>:38: warning: non-variable type argument String in type pattern Seq[String] is unchecked since it is eliminated by erasure
            case Row(dept: Seq[String]) => dept.map{dept => 
                           ^
<console>:37: error: inferred type arguments [Unit] do not conform to    method explode's type parameter bounds [A <: Product]
  val explodeDF = parquetDF.explode($"department") { 
                                   ^

在我看来,最优雅的解决方案是使用select操作符对Struct进行星形展开,如下所示:

var explodedDf2 = explodedDf.select("department.*","*")
https://docs.databricks.com/spark/latest/spark-sql/complex-types.html

你可以这样写:

var explodeDF = explodeDF.withColumn("id", explodeDF("department.id"))
explodeDeptDF = explodeDeptDF.withColumn("name", explodeDeptDF("department.name"))

你帮我做了这些问题:

  • 在Spark中平坦化行
  • Spark 1.4.1 JSON对象的DataFrame爆炸列表

这似乎行得通(尽管可能不是最优雅的解决方案)。

var explodeDF2 = explodeDF.withColumn("id", explodeDF("department.id"))
explodeDF2 = explodeDF2.withColumn("name", explodeDF2("department.name"))

相关内容

  • 没有找到相关文章

最新更新