在pyspark数据框架中将深嵌套字段向上移动一级



我有一个从XML创建的pyspark数据框架。由于XML的结构方式,我在数据框架的模式中有一个额外的、不必要的嵌套级别。

当前数据框架的模式:

root
|-- a: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- movies: struct (nullable = true)
|    |    |    |-- movie: array (nullable = true)
|    |    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |    |-- b: string (nullable = true)
|    |    |    |    |    |-- c: string (nullable = true)
|    |    |    |    |    |-- d: integer (nullable = true)
|    |    |    |    |    |-- e: string (nullable = true)
|    |    |-- f: string (nullable = true)
|    |    |-- g: string (nullable = true)

我试图用下面的电影数组替换电影结构,如下所示:

root
|-- a: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- movies: array (nullable = true)
|    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |-- b: string (nullable = true)
|    |    |    |    |-- c: string (nullable = true)
|    |    |    |    |-- d: integer (nullable = true)
|    |    |    |    |-- e: string (nullable = true)
|    |    |-- f: string (nullable = true)
|    |    |-- g: string (nullable = true)

我得到的最接近的是:

from pyspark.sql import functions as F
df.withColumn("a", F.transform('a', lambda x: x.withField("movies_new", F.col("a.movies.movie"))))

的结果是:

root
|-- a: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- movies: struct (nullable = true)
|    |    |    |-- movie: array (nullable = true)
|    |    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |    |-- b: string (nullable = true)
|    |    |    |    |    |-- c: string (nullable = true)
|    |    |    |    |    |-- d: integer (nullable = true)
|    |    |    |    |    |-- e: string (nullable = true)
|    |    |-- f: string (nullable = true)
|    |    |-- g: string (nullable = true)
|    |    |-- movies_new: array (nullable = true)
|    |    |    |-- element: array (containsNull = true)
|    |    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |    |-- b: string (nullable = true)
|    |    |    |    |    |-- c: string (nullable = true)
|    |    |    |    |    |-- d: integer (nullable = true)
|    |    |    |    |    |-- e: string (nullable = true)

我理解为什么会发生这种情况,但我认为如果我从未从'a'中提取嵌套数组,它可能不会成为数组的数组。

有什么建议吗?

逻辑是:

  • 爆炸阵列"a"。
  • 重新计算新的结构为(电影)。电影,f, g)
  • 收集"a">
df = df.withColumn("a", F.explode("a"))
df = df.withColumn("a", F.struct( 
df.a.movies.getField("movie").alias("movies"), 
df.a.f.alias("f"), 
df.a.g.alias("g")))
df = df.select(F.collect_list("a").alias("a"))

完整工作代码:

import pyspark.sql.functions as F
df = spark.createDataFrame(data=[
[[(([("b1", "c1", "d1", "e1")],), "f1", "g1")]]
], schema="a array<struct<movies struct<movie array<struct<b string, c string, d string, e string>>>, f string, g string>>")
df.printSchema()
# df.show(truncate=False)
df = df.withColumn("a", F.explode("a"))
df = df.withColumn("a", F.struct( 
df.a.movies.getField("movie").alias("movies"), 
df.a.f.alias("f"), 
df.a.g.alias("g")))
df = df.select(F.collect_list("a").alias("a"))
df.printSchema()
# df.show(truncate=False)

之前的输出模式:

root
|-- a: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- movies: struct (nullable = true)
|    |    |    |-- movie: array (nullable = true)
|    |    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |    |-- b: string (nullable = true)
|    |    |    |    |    |-- c: string (nullable = true)
|    |    |    |    |    |-- d: string (nullable = true)
|    |    |    |    |    |-- e: string (nullable = true)
|    |    |-- f: string (nullable = true)
|    |    |-- g: string (nullable = true)

输出模式:

root
|-- a: array (nullable = false)
|    |-- element: struct (containsNull = false)
|    |    |-- movies: array (nullable = true)
|    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |-- b: string (nullable = true)
|    |    |    |    |-- c: string (nullable = true)
|    |    |    |    |-- d: string (nullable = true)
|    |    |    |    |-- e: string (nullable = true)
|    |    |-- f: string (nullable = true)
|    |    |-- g: string (nullable = true)