Spark DataFrame扁平化一个列,它是一个嵌套列表或集合列表



我正在按";基于两个键key1,key2的数据帧。然后我collect_list另一列value返回一个嵌套列表,因为该列已经是Seq(同样适用于如果列是Set)。

我需要将嵌套列表平铺。

import org.apache.spark.sql.functions._
import spark.implicits._
case class Record(key1: String, key2: String, values: Seq[String])
val ds: Dataset[Record] = spark.createDataset(Seq(
Record("abc", "bca", Seq("one", "two", "three")),
Record("abc", "bca", Seq("three", "two", "one")),
Record("xyz", "xyz", Seq("four", "five", "six"))
))
ds.show(false)
/*
+----+----+-----------------+
|key1|key2|values           |
+----+----+-----------------+
|abc |bca |[one, two, three]|
|abc |bca |[three, two, one]|
|xyz |xyz |[four, five, six]|
+----+----+-----------------+
*/
val firstDf: DataFrame = ds.groupBy($"key1", $"key2").agg(collect_list($"values").as("values"))
firstDf.show(false)
/* Column "value" is a nested list.
+----+----+--------------------------------------+
|key1|key2|values                                |
+----+----+--------------------------------------+
|xyz |xyz |[[four, five, six]]                   |
|abc |bca |[[one, two, three], [three, two, one]]|
+----+----+--------------------------------------+
/*
firstDf.printSchema()
root
|-- key1: string (nullable = true)
|-- key2: string (nullable = true)
|-- values: array (nullable = true)
|    |-- element: array (containsNull = true)
|    |    |-- element: string (containsNull = true)

预期的结果

允许重复值(列表语义)。但是在其他一些情况下,I不需要重复条目(集合语义),所以让我们将答案设置为通用的。

+----+----+----------------------------------+
|key1|key2|values                            |
+----+----+----------------------------------+
|xyz |xyz |[four, five, six]                 |
|abc |bca |[one, two, three, three, two, one]|
+----+----+----------------------------------+
Or alternatively:
+----+----+-----------------+
|key1|key2|values           |
+----+----+-----------------+
|xyz |xyz |[four, five, six]|
|abc |bca |[one, two, three]|
+----+----+-----------------+

事实上,我正在寻找这样的解决方案:Pandas将列内列表的列表Flatten ?

我试着按照其他一些帖子firstDf.select("values.*"),但我得到:org.apache.spark.sql.AnalysisException: Can only star expand struct data types. Attribute: ArrayBuffer(values);

提前感谢您的帮助。

您正在寻找的函数确切地称为flatten:

val firstDf = ds.groupBy($"key1", $"key2").agg(flatten(collect_list($"values")).as("values"))
firstDf.show(false)
+----+----+----------------------------------+
|key1|key2|values                            |
+----+----+----------------------------------+
|xyz |xyz |[four, five, six]                 |
|abc |bca |[one, two, three, three, two, one]|
+----+----+----------------------------------+

如果你只想要不同的项,你可以使用array_distinct:

val firstDf = ds.groupBy($"key1", $"key2").agg(array_distinct(flatten(collect_list($"values"))).as("values"))
firstDf.show(false)
+----+----+-----------------+
|key1|key2|values           |
+----+----+-----------------+
|xyz |xyz |[four, five, six]|
|abc |bca |[one, two, three]|
+----+----+-----------------+

对于旧版本的spark (<2.4)你可以使用UDF来实现扁平化:

def flattenudf = udf((x: Seq[Seq[String]]) => x.flatten)
// if you want distinct elements,
// def flattenudf = udf((x: Seq[Seq[String]]) => x.flatten.distinct)
val firstDf = ds.groupBy($"key1", $"key2").agg(flattenudf(collect_list($"values")).as("values"))

如果不能使用flattenarray_distinct函数,则使用RDD API的另一种方法:

val final_df = firstDF.rdd.map { 
r => (r.getString(0), r.getString(1), r.getAs[Seq[Seq[String]]](2).flatten.distinct)
}.toDF("key1", "key2", "values")
final_df.show
//+----+----+-----------------+
//|key1|key2|           values|
//+----+----+-----------------+
//| xyz| xyz|[four, five, six]|
//| abc| bca|[one, two, three]|
//+----+----+-----------------+

这是另一种使用explode(如果数据太大可能会有问题)和避免flatten等的方法:

from pyspark.sql import functions as F
df = df.select('key1', 'key2', F.explode('values').alias('values'))
.groupby('key1', 'key2')
.agg(F.collect_list('values'))

要获得不同的值,将collect_list替换为collect_set

代码是PySpark,但类似的东西也应该在Scala中工作。

最新更新