Pyspark >具有多个数组列的数据帧分成多行,每行一个值



我们有一个pyspark数据框架,它有几个列,其中包含多个值的数组。我们的目标是将这些列的每个值放在几行中,保持初始的不同列。所以,像这样开始:

data = [
("A", ["a", "c"], ["1", "5"]),
("B", ["a", "b"], None),
("C", [], ["1"]),
]

什么:

+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A  |[a, c]|[1, 5]|
|B  |[a, b]|null  |
|C  |[]    |[1]   |
+---+------+------+

我们希望最终得到:

+---+----+----+
|id |col |col |
+---+----+----+
|A  |a   |null|
|A  |c   |null|
|A  |null|1   |
|A  |null|5   |
|B  |a   |null|
|B  |b   |null|
|C  |null|1   |
+---+----+----+

我们正在考虑几种方法:

  1. 用列指示符前缀每个值,将所有数组合并为一个数组,将其爆炸,并将不同的值重新组织到不同的列
  2. 将数据帧拆分为几个,每个包含其中一个数组列,爆炸数组列,然后连接数据帧

但是所有这些方法闻起来都像是肮脏、复杂、容易出错和效率低下的变通方法。

有人知道如何以优雅的方式解决这个问题吗?

如果list_a和list_b列都为空,我会在数据集中添加第四个大小写

data = [
("A", ["a", "c"], ["1", "5"]),
("B", ["a", "b"], None),
("C", [], ["1"]),
("D", None, None),
]
df = spark.createDataFrame(data,["id","list_a","list_b"])

然后我将原始df分成3(都是null, list_a爆炸和list_b爆炸)并执行unionByName

dfnulls = df.filter(col("list_a").isNull() & col("list_b").isNull())
.withColumn("list_a", lit(None))
.withColumn("list_b", lit(None))
df1 = df
.withColumn("list_a", explode_outer(col("list_a")))
.withColumn("list_b", lit(None))
.filter(~col("list_a").isNull())
df2 = df
.withColumn("list_b", explode_outer(col("list_b")))
.withColumn("list_a", lit(None))
.filter(~col("list_b").isNull())
merged_df = df1.unionByName(df2).unionByName(dfnulls)
merged_df.show()
+---+------+------+
| id|list_a|list_b|
+---+------+------+
|  A|     a|  null|
|  A|     c|  null|
|  B|     a|  null|
|  B|     b|  null|
|  A|  null|     1|
|  A|  null|     5|
|  C|  null|     1|
|  D|  null|  null|
+---+------+------+

下面的方法可能会对你有所帮助,它是基于Scala的

基本上是单独展开各自的列表列,并根据虚拟列连接数据集,以获得所需的结果。

import org.apache.spark.sql.functions.{explode_outer, col, lit, concat}

val df1 = inputDF
.withColumn("list_a", explode_outer(col("list_a")))
.withColumn("random_join_col", concat(col("id"), lit("1")))
.drop("list_b")
val df2 = inputDF
.withColumn("list_b", explode_outer(col("list_b")))
.withColumn("random_join_col", concat(col("id"), lit("2")))
.drop("list_a")

val finalDF = df1.join(df2, Seq("id", "random_join_col"), "full_outer").drop("random_join_col")
// Drop rows, if it got null value on both the list columns
finalDF.na.drop(how = "all", Seq("list_a","list_b")).orderBy("id").show(false)

试试这个动态解决方案。

输入:

data = [
("A", ["a", "c"], ["1", "5"]),
("B", ["a", "b"], None),
("C", [], ["1"]),
]
df=spark.createDataFrame(data,["id","list_a","list_b"])
df.show(truncate=False)
+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A  |[a, c]|[1, 5]|
|B  |[a, b]|null  |
|C  |[]    |[1]   |
+---+------+------+

让我们为df中的每个数组列创建一个dataframe数组。首先用空Dataframe初始化,然后在for循环中重写它。对于每个列,将其展开,对于所有其他列,将数据类型更改为带NULL的字符串。

from pyspark.sql.types import *
array_cols=df.columns[1:]  #just ignoring the ID column
c=0
dfarr=[spark.createDataFrame([],schema=StructType()) for i in array_cols ]
for i in array_cols:
dfarr[c]=df.withColumn(i,explode(col(i)))
for j in array_cols:
if(i!=j):
dfarr[c]=dfarr[c].withColumn(j,expr(" cast(null as string) "))
c=c+1

现在,dfarr是一个模式为

的数据帧数组。
dfarr[0].printSchema()
root
|-- id: string (nullable = true)
|-- list_a: string (nullable = true)
|-- list_b: string (nullable = true)
dfarr[1].show(truncate=False)
+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A  |null  |1     |
|A  |null  |5     |
|C  |null  |1     |
+---+------+------+

不同的数据类型现在都是相似的,所以只需对它们进行并集。为此,我们需要functools

中的reduce函数
from functools import reduce  
from pyspark.sql import DataFrame
def unionAll(*dfs):
return reduce(DataFrame.unionByName, dfs) 

应用于我们的dfarr

combo=unionAll(*dfarr)
combo.show(truncate=False)
+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A  |a     |null  |
|A  |c     |null  |
|B  |a     |null  |
|B  |b     |null  |
|A  |null  |1     |
|A  |null  |5     |
|C  |null  |1     |
+---+------+------+

相关内容

  • 没有找到相关文章

最新更新