在 Spark-Scala 中将列表或 RDD 列表转换为数据帧



所以基本上我想要实现的是 - 我有一个有 4 列的表(比如说(,我把它暴露给数据帧 - DF1。现在我想将 DF1 的每一行存储到另一个配置单元表(基本上是 DF2,其架构为 - 列 1、列 2、列 3(,而列 3 值将是数据帧 DF1 的"-"分隔行。

val df = hiveContext.sql("from hive_table SELECT *")
val writeToHiveDf = df.filter(new Column("id").isNotNull)
var builder : List[(String, String, String)] = Nil
    var finalOne  =  new ListBuffer[List[(String, String, String)]]()
    writeToHiveDf.rdd.collect().foreach {
      row =>
        val item = row.mkString("-@")
        builder = List(List("dummy", "NEVER_NULL_CONSTRAINT", "some alpha")).map{case List(a,b,c) => (a,b,c)}
        finalOne += builder
    }

现在我有了finalOne作为列表列表,我想直接或通过RDD将其转换为数据帧。

var listRDD = sc.parallelize(finalOne) //Converts to RDD - It works. 
val dataFrameForHive : DataFrame = listRDD.toDF("table_name", "constraint_applied", "data") //Doesn't work

错误:

java.lang.ClassCastException: org.apache.spark.sql.types.ArrayType cannot be cast to org.apache.spark.sql.types.StructType
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:414)
    at org.apache.spark.sql.SQLImplicits.rddToDataFrameHolder(SQLImplicits.scala:94)

有人可以帮助我了解将其转换为数据帧的正确方法吗?提前感谢您的支持。

如果你在数据帧中想要 3 列 String 类型,你应该将List[List[(String,String,String)]]展平为List[(String,String,String)]

var listRDD = sc.parallelize(finalOne.flatten) // makes List[(String,String,String)]
val dataFrameForHive : DataFrame = listRDD.toDF("table_name", "constraint_applied", "data") 
我相信

在将"finalOne"数据帧传递给sc.parallelize((函数之前将其展平化应该会给出符合您期望的结果。

var listRDD = sc.parallelize(finalOne)

val dataFrameForHive : DataFrame = listRDD.toDF("table_name", "constraint_applied", "data")

最新更新