我正在尝试将嵌套struct
类型的数据帧列(见下文)扩展到多个列。我正在使用的结构架构看起来像{"foo": 3, "bar": {"baz": 2}}
.
理想情况下,我想将上述内容扩展为两列("foo"
和"bar.baz"
)。但是,当我尝试使用.select("data.*")
(其中data
是结构列)时,我只得到foo
和bar
列,其中bar
仍然是struct
。
有没有办法让我扩展两个层的结构?
data.bar.baz
作为bar.baz
:
df.show()
+-------+
| data|
+-------+
|[3,[2]]|
+-------+
df.printSchema()
root
|-- data: struct (nullable = false)
| |-- foo: long (nullable = true)
| |-- bar: struct (nullable = false)
| | |-- baz: long (nullable = true)
在 pyspark 中:
import pyspark.sql.functions as F
df.select(F.col("data.foo").alias("foo"), F.col("data.bar.baz").alias("bar.baz")).show()
+---+-------+
|foo|bar.baz|
+---+-------+
| 3| 2|
+---+-------+
我最终选择了以下函数,该函数递归地"解包"分层结构体:
从本质上讲,它不断挖掘Struct
字段并保持其他字段不变,并且这种方法消除了当Struct
有很多字段时具有很长的df.select(...)
语句的需要。代码如下:
# Takes in a StructType schema object and return a column selector that flattens the Struct
def flatten_struct(schema, prefix=""):
result = []
for elem in schema:
if isinstance(elem.dataType, StructType):
result += flatten_struct(elem.dataType, prefix + elem.name + ".")
else:
result.append(col(prefix + elem.name).alias(prefix + elem.name))
return result
df = sc.parallelize([Row(r=Row(a=1, b=Row(foo="b", bar="12")))]).toDF()
df.show()
+----------+
| r|
+----------+
|[1,[12,b]]|
+----------+
df_expanded = df.select("r.*")
df_flattened = df_expanded.select(flatten_struct(df_expanded.schema))
df_flattened.show()
+---+-----+-----+
| a|b.bar|b.foo|
+---+-----+-----+
| 1| 12| b|
+---+-----+-----+