Scala Spark:键/值结构的扁平数组



我有一个输入数据帧,其中包含一个数组类型的列。数组中的每个条目都是一个结构,由一个键(大约四个值之一(和一个值组成。我想把它变成一个数据帧,每个可能的键有一列,如果该值不在该行的数组中,则为null。键在任何数组中都不会重复,但它们可能出现无序或丢失。

到目前为止,我得到的最好的是

val wantedCols =df.columns
.filter(_ != arrayCol)
.filter(_ != "col")
val flattened = df
.select((wantedCols.map(col(_)) ++ Seq(explode(col(arrayCol)))):_*)
.groupBy(wantedCols.map(col(_)):_*)
.pivot("col.key")
.agg(first("col.value"))

这正是我想要的,但它很可怕,我不知道除了一个专栏之外,每个专栏的分组会是什么样的分歧。正确的方法是什么?

EDIT:示例输入/输出:

case class testStruct(name : String, number : String)
val dfExampleInput = Seq(
(0, "KY", Seq(testStruct("A", "45"))),
(1, "OR", Seq(testStruct("A", "30"), testStruct("B", "10"))))
.toDF("index", "state", "entries")
.show
+-----+-----+------------------+
|index|state|           entries|
+-----+-----+------------------+
|    0|   KY|         [[A, 45]]|
|    1|   OR|[[A, 30], [B, 10]]|
+-----+-----+------------------+
val dfExampleOutput = Seq(
(0, "KY", "45", null),
(1, "OR", "30", "10"))
.toDF("index", "state", "A", "B")
.show
+-----+-----+---+----+
|index|state|  A|   B|
+-----+-----+---+----+
|    0|   KY| 45|null|
|    1|   OR| 30|  10|
+-----+-----+---+----+

进一步编辑:

我自己提交了一个解决方案(见下文(,只要你提前知道密钥(在我的情况下,我知道(,它就能很好地处理这个问题。如果找到密钥是一个问题,另一个答案包含处理这个问题的代码。

groupBypivotaggfirst

请检查以下代码。

scala> val df = Seq((0, "KY", Seq(("A", "45"))),(1, "OR", Seq(("A", "30"),("B", "10")))).toDF("index", "state", "entries").withColumn("entries",$"entries".cast("array<struct<name:string,number:string>>"))
df: org.apache.spark.sql.DataFrame = [index: int, state: string ... 1 more field]
scala> df.printSchema
root
|-- index: integer (nullable = false)
|-- state: string (nullable = true)
|-- entries: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- name: string (nullable = true)
|    |    |-- number: string (nullable = true)

scala> df.show(false)
+-----+-----+------------------+
|index|state|entries           |
+-----+-----+------------------+
|0    |KY   |[[A, 45]]         |
|1    |OR   |[[A, 30], [B, 10]]|
+-----+-----+------------------+

scala> val finalDFColumns = df.select(explode($"entries").as("entries")).select("entries.*").select("name").distinct.map(_.getAs[String](0)).orderBy($"value".asc).collect.foldLeft(df.limit(0))((cdf,c) => cdf.withColumn(c,lit(null))).columns
finalDFColumns: Array[String] = Array(index, state, entries, A, B)
scala> val finalDF = df.select($"*" +: (0 until max).map(i => $"entries".getItem(i)("number").as(i.toString)): _*)
finalDF: org.apache.spark.sql.DataFrame = [index: int, state: string ... 3 more fields]
scala> finalDF.show(false)
+-----+-----+------------------+---+----+
|index|state|entries           |0  |1   |
+-----+-----+------------------+---+----+
|0    |KY   |[[A, 45]]         |45 |null|
|1    |OR   |[[A, 30], [B, 10]]|30 |10  |
+-----+-----+------------------+---+----+

scala> finalDF.printSchema
root
|-- index: integer (nullable = false)
|-- state: string (nullable = true)
|-- entries: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- name: string (nullable = true)
|    |    |-- number: string (nullable = true)
|-- 0: string (nullable = true)
|-- 1: string (nullable = true)
scala> finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf,column) => fdf.withColumnRenamed(column._1,column._2)).show(false)
+-----+-----+------------------+---+----+
|index|state|entries           |A  |B   |
+-----+-----+------------------+---+----+
|0    |KY   |[[A, 45]]         |45 |null|
|1    |OR   |[[A, 30], [B, 10]]|30 |10  |
+-----+-----+------------------+---+----+

scala>

最终输出


scala> finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf,column) => fdf.withColumnRenamed(column._1,column._2)).drop($"entries").show(false)
+-----+-----+---+----+
|index|state|A  |B   |
+-----+-----+---+----+
|0    |KY   |45 |null|
|1    |OR   |30 |10  |
+-----+-----+---+----+

我不会太担心按几列分组,而不会让事情变得混乱。在这种情况下,如果有一种更简单、更可维护的方法,那就去做吧。如果没有示例输入/输出,我不确定这是否能让你达到目标,但也许它会有用:

Seq(Seq("k1" -> "v1", "k2" -> "v2")).toDS() // some basic input based on my understanding of your description
.select(explode($"value")) // flatten the array
.select("col.*") // de-nest the struct
.groupBy("_2") // one row per distinct value
.pivot("_1") // one column per distinct key
.count // or agg(first) if you want the value in each column
.show
+---+----+----+
| _2|  k1|  k2|
+---+----+----+
| v2|null|   1|
| v1|   1|null|
+---+----+----+

根据您现在所说的,我得到的印象是,有许多像"state"这样的列不是聚合所必需的,但需要在最终结果中。

作为参考,如果不需要透视,可以添加一个结构列,其中嵌套了所有这些字段,然后将其添加到聚合中,例如:.agg(first($"myStruct"), first($"number"))。主要优点是只有groubBy中引用的实际键列。但当使用pivot时,事情会变得有点奇怪,所以我们将把这个选项放在一边。

在这个用例中,我能想到的最简单的方法是拆分数据帧,并在聚合后使用一些rowkey将其重新连接在一起。在这个例子中,我假设"index"适用于这个目的:

val mehCols = dfExampleInput.columns.filter(_ != "entries").map(col)
val mehDF = dfExampleInput.select(mehCols:_*)
val aggDF = dfExampleInput
.select($"index", explode($"entries").as("entry"))
.select($"index", $"entry.*")
.groupBy("index")
.pivot("name")
.agg(first($"number"))
scala> mehDF.join(aggDF, Seq("index")).show
+-----+-----+---+----+
|index|state|  A|   B|
+-----+-----+---+----+
|    0|   KY| 45|null|
|    1|   OR| 30|  10|
+-----+-----+---+----+

我怀疑你是否会在表现上看到很大的不同,如果有的话。也许在极端情况下,例如:非常多的meh列,或者非常多的pivot列,或者类似的东西,或者可能什么都没有。就我个人而言,我会用大小适中的输入来测试这两种方法,如果没有显著差异,就使用看起来更容易维护的方法。

这里有另一种方法,它基于entries列上没有重复项的假设,即Seq(testStruct("A", "30"), testStruct("A", "70"), testStruct("B", "10"))将导致错误。下一个解决方案结合了RDD和Dataframe API来实现:

import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.types.StructType
case class testStruct(name : String, number : String)
val df = Seq(
(0, "KY", Seq(testStruct("A", "45"))),
(1, "OR", Seq(testStruct("A", "30"), testStruct("B", "10"))),
(2, "FL", Seq(testStruct("A", "30"), testStruct("B", "10"), testStruct("C", "20"))),
(3, "TX", Seq(testStruct("B", "60"), testStruct("A", "19"), testStruct("C", "40")))
)
.toDF("index", "state", "entries")
.cache
// get all possible keys from entries i.e Seq[A, B, C]
val finalCols = df.select(explode($"entries").as("entry"))
.select($"entry".getField("name").as("entry_name"))
.distinct
.collect
.map{_.getAs[String]("entry_name")}
.sorted // Attention: we need to retain the order of the columns 
// 1. when generating row values and
// 2. when creating the schema
val rdd = df.rdd.map{ r =>
// transform the entries array into a map i.e Map(A -> 30, B -> 10)
val entriesMap = r.getSeq[Row](2).map{r => (r.getString(0), r.getString(1))}.toMap
// transform finalCols into a map with null value i.e Map(A -> null, B -> null, C -> null)
val finalColsMap = finalCols.map{c => (c, null)}.toMap
// replace null values with those that are present from the current row by merging the two previous maps
// Attention: this should retain the order of finalColsMap
val merged = finalColsMap ++ entriesMap
// concatenate the two first row values ["index", "state"] with the values from merged
val finalValues = Seq(r(0), r(1)) ++ merged.values
Row.fromSeq(finalValues)
}
val extraCols = finalCols.map{c => s"`${c}` STRING"}
val schema = StructType.fromDDL("`index` INT, `state` STRING," + extraCols.mkString(","))
val finalDf = spark.createDataFrame(rdd, schema)
finalDf.show
// +-----+-----+---+----+----+
// |index|state|  A|   B|   C|
// +-----+-----+---+----+----+
// |    0|   KY| 45|null|null|
// |    1|   OR| 30|  10|null|
// |    2|   FL| 30|  10|  20|
// |    3|   TX| 19|  60|  40|
// +-----+-----+---+----+----+

注意:解决方案需要一个额外的操作来检索唯一的密钥,尽管它不会导致任何洗牌,因为它只基于窄变换。

我自己制定了一个解决方案:

def extractFromArray(colName : String, key : String, numKeys : Int, keyName : String) = {
val indexCols = (0 to numKeys-1).map(col(colName).getItem(_))
indexCols.foldLeft(lit(null))((innerCol : Column, indexCol : Column) =>
when(indexCol.isNotNull && (indexCol.getItem(keyName) === key), indexCol)
.otherwise(innerCol))
}

示例:

case class testStruct(name : String, number : String)
val df = Seq(
(0, "KY", Seq(testStruct("A", "45"))),
(1, "OR", Seq(testStruct("A", "30"), testStruct("B", "10"))),
(2, "FL", Seq(testStruct("A", "30"), testStruct("B", "10"), testStruct("C", "20"))),
(3, "TX", Seq(testStruct("B", "60"), testStruct("A", "19"), testStruct("C", "40")))
)
.toDF("index", "state", "entries")
.withColumn("A", extractFromArray("entries", "B", 3, "name"))
.show

其产生:

+-----+-----+--------------------+-------+
|index|state|             entries|      A|
+-----+-----+--------------------+-------+
|    0|   KY|           [[A, 45]]|   null|
|    1|   OR|  [[A, 30], [B, 10]]|[B, 10]|
|    2|   FL|[[A, 30], [B, 10]...|[B, 10]|
|    3|   TX|[[B, 60], [A, 19]...|[B, 60]|
+-----+-----+--------------------+-------+

这个解决方案与其他答案有点不同:

  • 它一次只能在单个键上工作
  • 它要求事先知道密钥名称和密钥数量
  • 它生成一列结构,而不是执行提取特定值的额外步骤
  • 它的工作原理是一个简单的列对列操作,而不需要对整个DF进行转换
  • 它可以懒散地评估

前三个问题可以通过调用代码来处理,并且在您已经知道键或结构包含要提取的附加值的情况下,使其更加灵活。

最新更新