使用 flatMap / reduce:处理包含行列表的行



我有一个数据帧,每行包含一个行数组

我想将所有内行聚合到一个数据帧中

以下是我所获得/取得的成就:

df.select('*').take(1)

给我这个:

[
   Row(
       body=[
               Row(a=1, b=1), 
               Row(a=2, b=2)
            ]
      )
]

所以这样做:

df.rdd.flatMap(lambda x: x).collect()

我明白这个:

[[
   Row(a=1, b=1)
   Row(a=2, b=2)
]]

所以我被迫这样做:

df.rdd.flatMap(lambda x: x).flatMap(lambda x: x)

所以我可以实现以下目标:

[
  Row(a=1, b=1) 
  Row(a=2, b=2)
]

使用上面的结果,我最终可以将其转换为数据帧并保存在某个地方。 这就是我想要的。但是调用 flatMap 两次看起来不对。

我尝试通过使用Reduce进行相同的操作,就像以下代码一样:

flatRdd = df.rdd.flatMap(lambda x: x)        
dfMerged = reduce(DataFrame.unionByName, [flatRdd])

reduce的第二个参数必须是可迭代的,所以我被迫添加[flatRdd]。可悲的是,它给了我这个:

[[
   Row(a=1, b=1)
   Row(a=2, b=2)
]]

当然有更好的方法来实现我想要的。

IIUC,您可以使用.*语法explode然后展平生成的Row

假设您从以下数据帧开始:

df.show()
#+----------------+
#|            body|
#+----------------+
#|[[1, 1], [2, 2]]|
#+----------------+

使用架构:

df.printSchema()
#root
# |-- body: array (nullable = true)
# |    |-- element: struct (containsNull = true)
# |    |    |-- a: long (nullable = true)
# |    |    |-- b: long (nullable = true)

您可以先explode body列:

from pyspark.sql.functions import explode
df = df.select(explode("body").alias("exploded"))
df.show()
#+--------+
#|exploded|
#+--------+
#|  [1, 1]|
#|  [2, 2]|
#+--------+

现在展平exploded列:

df = df.select("exploded.*")
df.show()
#+---+---+
#|  a|  b|
#+---+---+
#|  1|  1|
#|  2|  2|
#+---+---+

现在,如果您要调用collect,您将获得所需的输出:

print(df.collect())
#[Row(a=1, b=1), Row(a=2, b=2)]

另请参阅:

  • 使用复杂类型查询 Spark SQL 数据帧

你不需要在 Row 对象上运行 flatMap((,只需直接用键引用它:

>>> df.rdd.flatMap(lambda x: x.body).collect()
[Row(a=1, b=1), Row(a=2, b=2)]

最新更新