以编程方式向Spark DataFrame添加几列



我正在使用spark与scala。

我有一个数据框3列:ID,时间,RawHexdata。我有一个用户定义的函数,它接受RawHexData并将其扩展为X列。重要的是要声明,对于每一行X都是相同的(列没有变化)。但是,在收到第一个数据之前,我不知道列是什么。但一旦我有了头,我就能推断出来。

我想要第二个数据框的列:Id,时间,RawHexData,NewCol1,…,NewCol3.

我能想到的最简单的方法是:1. 将每一行反序列化为json(这里的每个数据类型都是可序列化的)2. 添加我的新列,3.从修改后的json中反序列化一个新的数据帧,

然而,这似乎是一种浪费,因为它涉及两个昂贵且冗余的json序列化步骤。我在寻找一个更干净的图案。

使用大小写类,似乎是一个坏主意,因为我不知道列的数量,或者提前知道列名。

动态扩展DataFrame的方法是对行RDD进行操作,通过调用dataFrame.rdd可以获得行RDD。有了Row实例,您可以访问RawHexdata列并解析包含的数据。通过将新解析的列添加到结果Row,您几乎解决了问题。要将RDD[Row]转换回DataFrame,唯一需要做的就是为新列生成模式数据。您可以通过收集驱动程序上的单个RawHexdata值,然后提取列类型来做到这一点。

下面的代码说明了这种方法。

object App {
  case class Person(name: String, age: Int)
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Test").setMaster("local[4]")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val input = sc.parallelize(Seq(Person("a", 1), Person("b", 2)))
    val dataFrame = input.df
    dataFrame.show()
    // create the extended rows RDD
    val rowRDD = dataFrame.rdd.map{
      row =>
        val blob = row(1).asInstanceOf[Int]
        val newColumns: Seq[Any] = Seq(blob, blob * 2, blob * 3)
        Row.fromSeq(row.toSeq.init ++ newColumns)
    }
    val schema = dataFrame.schema
    // we know that the new columns are all integers
    val newColumns = StructType{
      Seq(new StructField("1", IntegerType), new StructField("2", IntegerType), new StructField("3", IntegerType))
    }
    val newSchema = StructType(schema.init ++ newColumns)
    val newDataFrame = sqlContext.createDataFrame(rowRDD, newSchema)
    newDataFrame.show()
  }
}

SELECT是你的朋友在不回到RDD的情况下解决它。

case class Entry(Id: String, Time: Long)
val entries = Seq(
  Entry("x1", 100L),
  Entry("x2", 200L)
)
val newColumns = Seq("NC1", "NC2", "NC3")
val df = spark.createDataFrame(entries)
  .select(col("*") +: (newColumns.map(c => lit(null).as(c))): _*)
df.show(false)
+---+----+----+----+----+
|Id |Time|NC1 |NC2 |NC3 |
+---+----+----+----+----+
|x1 |100 |null|null|null|
|x2 |200 |null|null|null|
+---+----+----+----+----+

相关内容

  • 没有找到相关文章

最新更新