我有一个数据帧,每行包含一个行数组
我想将所有内行聚合到一个数据帧中
以下是我所获得/取得的成就:
这
df.select('*').take(1)
给我这个:
[
Row(
body=[
Row(a=1, b=1),
Row(a=2, b=2)
]
)
]
所以这样做:
df.rdd.flatMap(lambda x: x).collect()
我明白这个:
[[
Row(a=1, b=1)
Row(a=2, b=2)
]]
所以我被迫这样做:
df.rdd.flatMap(lambda x: x).flatMap(lambda x: x)
所以我可以实现以下目标:
[
Row(a=1, b=1)
Row(a=2, b=2)
]
使用上面的结果,我最终可以将其转换为数据帧并保存在某个地方。 这就是我想要的。但是调用 flatMap 两次看起来不对。
我尝试通过使用Reduce进行相同的操作,就像以下代码一样:
flatRdd = df.rdd.flatMap(lambda x: x)
dfMerged = reduce(DataFrame.unionByName, [flatRdd])
reduce的第二个参数必须是可迭代的,所以我被迫添加[flatRdd]。可悲的是,它给了我这个:
[[
Row(a=1, b=1)
Row(a=2, b=2)
]]
当然有更好的方法来实现我想要的。
IIUC,您可以使用.*
语法explode
然后展平生成的Row
。
假设您从以下数据帧开始:
df.show()
#+----------------+
#| body|
#+----------------+
#|[[1, 1], [2, 2]]|
#+----------------+
使用架构:
df.printSchema()
#root
# |-- body: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- a: long (nullable = true)
# | | |-- b: long (nullable = true)
您可以先explode
body
列:
from pyspark.sql.functions import explode
df = df.select(explode("body").alias("exploded"))
df.show()
#+--------+
#|exploded|
#+--------+
#| [1, 1]|
#| [2, 2]|
#+--------+
现在展平exploded
列:
df = df.select("exploded.*")
df.show()
#+---+---+
#| a| b|
#+---+---+
#| 1| 1|
#| 2| 2|
#+---+---+
现在,如果您要调用collect
,您将获得所需的输出:
print(df.collect())
#[Row(a=1, b=1), Row(a=2, b=2)]
另请参阅:
- 使用复杂类型查询 Spark SQL 数据帧
你不需要在 Row 对象上运行 flatMap((,只需直接用键引用它:
>>> df.rdd.flatMap(lambda x: x.body).collect()
[Row(a=1, b=1), Row(a=2, b=2)]