将Spark数据框架中的列拆分为新行[Scala]



我有一个火花数据帧的输出如下:

Amt |id |num |Start_date |Identifier
43.45 | 19840 | A345 |(2014-12-26、2014-12-26)|(232323、45466)|
43.45 | 19840 | A345 |(2010-03-16、2010-03-16)|(34343、45454)|

我的要求是从上面的输出

生成如下格式的输出

Amt |id |num |Start_date |Identifier
43.45 | 19840 | A345 | 232323 | 2014-12-26
43.45 | 19840 | A345 | 45466 | 2013-12-12
43.45 | 19840 | A345 | 34343 | 2010-03-16
43.45 | 19840 | A345 | 45454 | 2013-16-12

谁能帮我实现这个?

这就是你要找的东西吗?

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
val sparkSession = ...
import sparkSession.implicits._
val input = sc.parallelize(Seq(
  (43.45, 19840, "A345", Seq("2014-12-26", "2013-12-12"), Seq(232323,45466)),
  (43.45, 19840, "A345", Seq("2010-03-16", "2013-16-12"), Seq(34343,45454))
)).toDF("amt", "id", "num", "start_date", "identifier")
val zipArrays = udf { (dates: Seq[String], identifiers: Seq[Int]) =>
  dates.zip(identifiers)
}
val output = input.select($"amt", $"id", $"num", explode(zipArrays($"start_date", $"identifier")))
  .select($"amt", $"id", $"num", $"col._1".as("start_date"), $"col._2".as("identifier"))
output.show()

返回:

+-----+-----+----+----------+----------+
|  amt|   id| num|start_date|identifier|
+-----+-----+----+----------+----------+
|43.45|19840|A345|2014-12-26|    232323|
|43.45|19840|A345|2013-12-12|     45466|
|43.45|19840|A345|2010-03-16|     34343|
|43.45|19840|A345|2013-16-12|     45454|
+-----+-----+----+----------+----------+
编辑:

由于您希望有多个应该被压缩的列,您应该尝试这样做:

val input = sc.parallelize(Seq(
  (43.45, 19840, "A345", Seq("2014-12-26", "2013-12-12"), Seq("232323","45466"), Seq("123", "234")),
  (43.45, 19840, "A345", Seq("2010-03-16", "2013-16-12"), Seq("34343","45454"), Seq("345", "456"))
)).toDF("amt", "id", "num", "start_date", "identifier", "another_column")
val zipArrays = udf { seqs: Seq[Seq[String]] =>
  for(i <- seqs.head.indices) yield seqs.fold(Seq.empty)((accu, seq) => accu :+ seq(i))
}
val columnsToSelect = Seq($"amt", $"id", $"num")
val columnsToZip = Seq($"start_date", $"identifier", $"another_column")
val outputColumns = columnsToSelect ++ columnsToZip.zipWithIndex.map { case (column, index) =>
  $"col".getItem(index).as(column.toString())
}
val output = input.select($"amt", $"id", $"num", explode(zipArrays(array(columnsToZip: _*)))).select(outputColumns: _*)
output.show()
/*
+-----+-----+----+----------+----------+--------------+
|  amt|   id| num|start_date|identifier|another_column|
+-----+-----+----+----------+----------+--------------+
|43.45|19840|A345|2014-12-26|    232323|           123|
|43.45|19840|A345|2013-12-12|     45466|           234|
|43.45|19840|A345|2010-03-16|     34343|           345|
|43.45|19840|A345|2013-16-12|     45454|           456|
+-----+-----+----+----------+----------+--------------+
*/

如果我理解正确的话,您需要col3和col4的第一个元素。这有道理吗?

val newDataFrame = for {
    row <- oldDataFrame
} yield {
  val zro = row(0) // 43.45
  val one = row(1) // 19840
  val two = row(2) // A345
  val dates = row(3) // [2014-12-26, 2013-12-12]
  val numbers = row(4) // [232323,45466]
  Row(zro, one, two, dates(0), numbers(0))
}

您可以使用SparkSQL。

  • 首先创建一个包含我们需要处理的信息的视图:

    df.createOrReplaceTempView("tableTest")

  • 然后您可以选择扩展名为

    的数据:
    sparkSession.sqlContext.sql(
        "SELECT Amt, id, num, expanded_start_date, expanded_id " +
        "FROM tableTest " +
        "LATERAL VIEW explode(Start_date) Start_date AS expanded_start_date " +
        "LATERAL VIEW explode(Identifier) AS expanded_id")
    .show()
    

相关内容

  • 没有找到相关文章