如何将Dataframe列名与Scala case类属性匹配



spark-sql示例中的列名来自case class Person .

case class Person(name: String, age: Int)
val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")
https://spark.apache.org/docs/1.1.0/sql-programming-guide.html

但是,在许多情况下,参数名可以更改。如果文件没有更新以反映更改,这将导致找不到列。

如何指定合适的映射?

我在想:

  val schema = StructType(Seq(
    StructField("name", StringType, nullable = false),
    StructField("age", IntegerType, nullable = false)
  ))

  val ps: Seq[Person] = ???
  val personRDD = sc.parallelize(ps)
  // Apply the schema to the RDD.
  val personDF: DataFrame = sqlContext.createDataFrame(personRDD, schema)

基本上,您需要做的所有映射都可以用DataFrame.select(...)实现。(这里,我假设不需要进行类型转换。)将前向和后向映射作为映射,关键部分是

val mapping = from.map{ (x:(String, String)) => personsDF(x._1).as(x._2) }.toArray
// personsDF your original dataframe  
val mappedDF = personsDF.select( mapping: _* )

其中映射是一个带有别名的Column s数组。

示例代码

object Example {   
  import org.apache.spark.rdd.RDD
  import org.apache.spark.{SparkContext, SparkConf}
  case class Person(name: String, age: Int)
  object Mapping {
    val from = Map("name" -> "a", "age" -> "b")
    val to = Map("a" -> "name", "b" -> "age")
  }
  def main(args: Array[String]) : Unit = {
    // init
    val conf = new SparkConf()
      .setAppName( "Example." )
      .setMaster( "local[*]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    // create persons
    val persons = Seq(Person("bob", 35), Person("alice", 27))
    val personsRDD = sc.parallelize(persons, 4)
    val personsDF = personsRDD.toDF
    writeParquet( personsDF, "persons.parquet", sc, sqlContext)
    val otherPersonDF = readParquet( "persons.parquet", sc, sqlContext )
  }
  def writeParquet(personsDF: DataFrame, path:String, sc: SparkContext, sqlContext: SQLContext) : Unit = {
    import Mapping.from
    val mapping = from.map{ (x:(String, String)) => personsDF(x._1).as(x._2) }.toArray
    val mappedDF = personsDF.select( mapping: _* )
    mappedDF.write.parquet("/output/path.parquet") // parquet with columns "a" and "b"
  }
  def readParquet(path: String, sc: SparkContext, sqlContext: SQLContext) : Unit = {
    import Mapping.to
    val df = sqlContext.read.parquet(path) // this df has columns a and b
    val mapping = to.map{ (x:(String, String)) => df(x._1).as(x._2) }.toArray
    df.select( mapping: _* )
  }
}
<<h2>评论/h2>

如果您需要将数据帧转换回RDD[Person],则

val rdd : RDD[Row] = personsDF.rdd
val personsRDD : Rdd[Person] = rdd.map { r: Row => 
  Person( r.getAs("person"), r.getAs("age") )
}

选择

也看看如何将spark SchemaRDD转换为我的case类的RDD ?

相关内容

  • 没有找到相关文章

最新更新