我们有一个pyspark数据框架,它有几个列,其中包含多个值的数组。我们的目标是将这些列的每个值放在几行中,保持初始的不同列。所以,像这样开始:
data = [
("A", ["a", "c"], ["1", "5"]),
("B", ["a", "b"], None),
("C", [], ["1"]),
]
什么:
+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A |[a, c]|[1, 5]|
|B |[a, b]|null |
|C |[] |[1] |
+---+------+------+
我们希望最终得到:
+---+----+----+
|id |col |col |
+---+----+----+
|A |a |null|
|A |c |null|
|A |null|1 |
|A |null|5 |
|B |a |null|
|B |b |null|
|C |null|1 |
+---+----+----+
我们正在考虑几种方法:
- 用列指示符前缀每个值,将所有数组合并为一个数组,将其爆炸,并将不同的值重新组织到不同的列
- 将数据帧拆分为几个,每个包含其中一个数组列,爆炸数组列,然后连接数据帧
但是所有这些方法闻起来都像是肮脏、复杂、容易出错和效率低下的变通方法。
有人知道如何以优雅的方式解决这个问题吗?
如果list_a和list_b列都为空,我会在数据集中添加第四个大小写
data = [
("A", ["a", "c"], ["1", "5"]),
("B", ["a", "b"], None),
("C", [], ["1"]),
("D", None, None),
]
df = spark.createDataFrame(data,["id","list_a","list_b"])
然后我将原始df分成3(都是null, list_a爆炸和list_b爆炸)并执行unionByName
dfnulls = df.filter(col("list_a").isNull() & col("list_b").isNull())
.withColumn("list_a", lit(None))
.withColumn("list_b", lit(None))
df1 = df
.withColumn("list_a", explode_outer(col("list_a")))
.withColumn("list_b", lit(None))
.filter(~col("list_a").isNull())
df2 = df
.withColumn("list_b", explode_outer(col("list_b")))
.withColumn("list_a", lit(None))
.filter(~col("list_b").isNull())
merged_df = df1.unionByName(df2).unionByName(dfnulls)
merged_df.show()
+---+------+------+
| id|list_a|list_b|
+---+------+------+
| A| a| null|
| A| c| null|
| B| a| null|
| B| b| null|
| A| null| 1|
| A| null| 5|
| C| null| 1|
| D| null| null|
+---+------+------+
下面的方法可能会对你有所帮助,它是基于Scala的
基本上是单独展开各自的列表列,并根据虚拟列连接数据集,以获得所需的结果。
import org.apache.spark.sql.functions.{explode_outer, col, lit, concat}
val df1 = inputDF
.withColumn("list_a", explode_outer(col("list_a")))
.withColumn("random_join_col", concat(col("id"), lit("1")))
.drop("list_b")
val df2 = inputDF
.withColumn("list_b", explode_outer(col("list_b")))
.withColumn("random_join_col", concat(col("id"), lit("2")))
.drop("list_a")
val finalDF = df1.join(df2, Seq("id", "random_join_col"), "full_outer").drop("random_join_col")
// Drop rows, if it got null value on both the list columns
finalDF.na.drop(how = "all", Seq("list_a","list_b")).orderBy("id").show(false)
试试这个动态解决方案。
输入:
data = [
("A", ["a", "c"], ["1", "5"]),
("B", ["a", "b"], None),
("C", [], ["1"]),
]
df=spark.createDataFrame(data,["id","list_a","list_b"])
df.show(truncate=False)
+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A |[a, c]|[1, 5]|
|B |[a, b]|null |
|C |[] |[1] |
+---+------+------+
让我们为df中的每个数组列创建一个dataframe数组。首先用空Dataframe初始化,然后在for循环中重写它。对于每个列,将其展开,对于所有其他列,将数据类型更改为带NULL的字符串。
from pyspark.sql.types import *
array_cols=df.columns[1:] #just ignoring the ID column
c=0
dfarr=[spark.createDataFrame([],schema=StructType()) for i in array_cols ]
for i in array_cols:
dfarr[c]=df.withColumn(i,explode(col(i)))
for j in array_cols:
if(i!=j):
dfarr[c]=dfarr[c].withColumn(j,expr(" cast(null as string) "))
c=c+1
现在,dfarr是一个模式为
的数据帧数组。dfarr[0].printSchema()
root
|-- id: string (nullable = true)
|-- list_a: string (nullable = true)
|-- list_b: string (nullable = true)
dfarr[1].show(truncate=False)
+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A |null |1 |
|A |null |5 |
|C |null |1 |
+---+------+------+
不同的数据类型现在都是相似的,所以只需对它们进行并集。为此,我们需要functools
中的reduce函数from functools import reduce
from pyspark.sql import DataFrame
def unionAll(*dfs):
return reduce(DataFrame.unionByName, dfs)
应用于我们的dfarr
combo=unionAll(*dfarr)
combo.show(truncate=False)
+---+------+------+
|id |list_a|list_b|
+---+------+------+
|A |a |null |
|A |c |null |
|B |a |null |
|B |b |null |
|A |null |1 |
|A |null |5 |
|C |null |1 |
+---+------+------+