我有一个从XML创建的pyspark数据框架。由于XML的结构方式,我在数据框架的模式中有一个额外的、不必要的嵌套级别。
当前数据框架的模式:
root
|-- a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- movies: struct (nullable = true)
| | | |-- movie: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- b: string (nullable = true)
| | | | | |-- c: string (nullable = true)
| | | | | |-- d: integer (nullable = true)
| | | | | |-- e: string (nullable = true)
| | |-- f: string (nullable = true)
| | |-- g: string (nullable = true)
我试图用下面的电影数组替换电影结构,如下所示:
root
|-- a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- movies: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- b: string (nullable = true)
| | | | |-- c: string (nullable = true)
| | | | |-- d: integer (nullable = true)
| | | | |-- e: string (nullable = true)
| | |-- f: string (nullable = true)
| | |-- g: string (nullable = true)
我得到的最接近的是:
from pyspark.sql import functions as F
df.withColumn("a", F.transform('a', lambda x: x.withField("movies_new", F.col("a.movies.movie"))))
的结果是:
root
|-- a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- movies: struct (nullable = true)
| | | |-- movie: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- b: string (nullable = true)
| | | | | |-- c: string (nullable = true)
| | | | | |-- d: integer (nullable = true)
| | | | | |-- e: string (nullable = true)
| | |-- f: string (nullable = true)
| | |-- g: string (nullable = true)
| | |-- movies_new: array (nullable = true)
| | | |-- element: array (containsNull = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- b: string (nullable = true)
| | | | | |-- c: string (nullable = true)
| | | | | |-- d: integer (nullable = true)
| | | | | |-- e: string (nullable = true)
我理解为什么会发生这种情况,但我认为如果我从未从'a'中提取嵌套数组,它可能不会成为数组的数组。
有什么建议吗?
逻辑是:
- 爆炸阵列"a"。
- 重新计算新的结构为(电影)。电影,f, g)
- 收集"a">
df = df.withColumn("a", F.explode("a"))
df = df.withColumn("a", F.struct(
df.a.movies.getField("movie").alias("movies"),
df.a.f.alias("f"),
df.a.g.alias("g")))
df = df.select(F.collect_list("a").alias("a"))
完整工作代码:
import pyspark.sql.functions as F
df = spark.createDataFrame(data=[
[[(([("b1", "c1", "d1", "e1")],), "f1", "g1")]]
], schema="a array<struct<movies struct<movie array<struct<b string, c string, d string, e string>>>, f string, g string>>")
df.printSchema()
# df.show(truncate=False)
df = df.withColumn("a", F.explode("a"))
df = df.withColumn("a", F.struct(
df.a.movies.getField("movie").alias("movies"),
df.a.f.alias("f"),
df.a.g.alias("g")))
df = df.select(F.collect_list("a").alias("a"))
df.printSchema()
# df.show(truncate=False)
之前的输出模式:
root
|-- a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- movies: struct (nullable = true)
| | | |-- movie: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- b: string (nullable = true)
| | | | | |-- c: string (nullable = true)
| | | | | |-- d: string (nullable = true)
| | | | | |-- e: string (nullable = true)
| | |-- f: string (nullable = true)
| | |-- g: string (nullable = true)
输出模式:
root
|-- a: array (nullable = false)
| |-- element: struct (containsNull = false)
| | |-- movies: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- b: string (nullable = true)
| | | | |-- c: string (nullable = true)
| | | | |-- d: string (nullable = true)
| | | | |-- e: string (nullable = true)
| | |-- f: string (nullable = true)
| | |-- g: string (nullable = true)