从高度嵌套的数据中选择列



对于下面的数据帧,它是从avro文件生成的,我正在尝试将列名作为列表或其他格式获取,以便我可以在select语句中使用它。node1node2具有相同的元素。例如,我知道我们可以做df.select(col('data.node1.name')),但我不确定

  1. 如何一次选择所有列而不对所有列名称进行硬编码,以及
  2. 如何处理嵌套部分。我认为为了使其可读,应该将productvaluesporders选择到单独的单个数据帧/表中?

输入架构:

root
|-- metadata: struct
|...
|-- data :struct 
|    |--node1 : struct
|    |   |--name : string
|    |   |--productlist: array
|    |        |--element : struct
|              |--productvalues: array
|                   |--element : struct
|                         |-- pname:string
|                         |-- porders:array
|                                |--element : struct
|                                      |-- ordernum: int
|                                      |-- field: string
|--node2 : struct
|        |--name : string
|        |--productlist: array
|             |--element : struct
|--productvalues: array
|--element : struct
|-- pname:string
|-- porders:array
|--element : struct
|-- ordernum: int
|-- field: string

与其将所有数据收集到一个表中,我建议您为每个列表创建更多表。为了从列表中获取值,您可以使用explode函数。例如,为了制作productlist表:

productlist = df.select(col('data.node1.name').alias("name"), explode(col('data.node1.productlist'))).alias("first_explode"))

在下一步中,您可以使用productlistdf,并运行以下扇区:

productValue=df.select(col('productlist.name'),col('productlist.node1.first_explode.element'),explode(col('productlist.node1.first_explode.productvalues')).alias("second_explode"))

等等。您也可以从中获得一些帮助 此链接 以及。

这样,您就不需要对所有结构字段进行硬编码。但是您需要提供具有结构数组类型的列/字段的列表。您有 3 个这样的字段,我们将再添加一列,因此总共为 4 个。

首先,数据帧,类似于您的:

from pyspark.sql import functions as F
df = spark.createDataFrame(
[(
('a', 'b'),
(
(
'name_1',
[
([
(
'pname_111',
[
(1111, 'field_1111'),
(1112, 'field_1112')
]
),
(
'pname_112',
[
(1121, 'field_1121'),
(1122, 'field_1122')
]
)
],),
([
(
'pname_121',
[
(1211, 'field_1211'),
(1212, 'field_1212')
]
),
(
'pname_122',
[
(1221, 'field_1221'),
(1222, 'field_1222')
]
)
],)
]
),
(
'name_2',
[
([
(
'pname_211',
[
(2111, 'field_2111'),
(2112, 'field_2112')
]
),
(
'pname_212',
[
(2121, 'field_2121'),
(2122, 'field_2122')
]
)
],),
([
(
'pname_221',
[
(2211, 'field_2211'),
(2212, 'field_2212')
]
),
(
'pname_222',
[
(2221, 'field_2221'),
(2222, 'field_2222')
]
)
],)
]
)
),
)],
'metadata:struct<fld1:string,fld2:string>, data:struct<node1:struct<name:string, productlist:array<struct<productvalues:array<struct<pname:string, porders:array<struct<ordernum:int, field:string>>>>>>>, node2:struct<name:string, productlist:array<struct<productvalues:array<struct<pname:string, porders:array<struct<ordernum:int, field:string>>>>>>>>'
)
# df.printSchema()
# root
#  |-- metadata: struct (nullable = true)
#  |    |-- fld1: string (nullable = true)
#  |    |-- fld2: string (nullable = true)
#  |-- data: struct (nullable = true)
#  |    |-- node1: struct (nullable = true)
#  |    |    |-- name: string (nullable = true)
#  |    |    |-- productlist: array (nullable = true)
#  |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |-- productvalues: array (nullable = true)
#  |    |    |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |    |    |-- pname: string (nullable = true)
#  |    |    |    |    |    |    |-- porders: array (nullable = true)
#  |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |    |    |    |    |-- ordernum: integer (nullable = true)
#  |    |    |    |    |    |    |    |    |-- field: string (nullable = true)
#  |    |-- node2: struct (nullable = true)
#  |    |    |-- name: string (nullable = true)
#  |    |    |-- productlist: array (nullable = true)
#  |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |-- productvalues: array (nullable = true)
#  |    |    |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |    |    |-- pname: string (nullable = true)
#  |    |    |    |    |    |    |-- porders: array (nullable = true)
#  |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |    |    |    |    |-- ordernum: integer (nullable = true)
#  |    |    |    |    |    |    |    |    |-- field: string (nullable = true)

答案

  • 星火 3.1+

    nodes = df.select("data.*").columns
    for n in nodes:
    df = df.withColumn("data", F.col("data").withField(n, F.struct(F.lit(n).alias("node"), f"data.{n}.*")))
    df = df.withColumn("data", F.array("data.*"))
    for arr_of_struct in ["data", "productlist", "productvalues", "porders"]:
    df = df.select(
    *[c for c in df.columns if c != arr_of_struct],
    F.expr(f"inline({arr_of_struct})")
    )
    
  • 较低的 Spark 版本:

    nodes = df.select("data.*").columns
    for n in nodes:
    df = df.withColumn(
    "data",
    F.struct(
    F.struct(F.lit(n).alias("node"), f"data.{n}.*").alias(n),
    *[f"data.{c}" for c in df.select("data.*").columns if c != n]
    )
    )
    df = df.withColumn("data", F.array("data.*"))
    for arr_of_struct in ["data", "productlist", "productvalues", "porders"]:
    df = df.select(
    *[c for c in df.columns if c != arr_of_struct],
    F.expr(f"inline({arr_of_struct})")
    )
    

结果:

df.printSchema()
# root
#  |-- metadata: struct (nullable = true)
#  |    |-- fld1: string (nullable = true)
#  |    |-- fld2: string (nullable = true)
#  |-- node: string (nullable = false)
#  |-- name: string (nullable = true)
#  |-- pname: string (nullable = true)
#  |-- ordernum: integer (nullable = true)
#  |-- field: string (nullable = true)
df.show()
# +--------+-----+------+---------+--------+----------+
# |metadata| node|  name|    pname|ordernum|     field|
# +--------+-----+------+---------+--------+----------+
# |  {a, b}|node1|name_1|pname_111|    1111|field_1111|
# |  {a, b}|node1|name_1|pname_111|    1112|field_1112|
# |  {a, b}|node1|name_1|pname_112|    1121|field_1121|
# |  {a, b}|node1|name_1|pname_112|    1122|field_1122|
# |  {a, b}|node1|name_1|pname_121|    1211|field_1211|
# |  {a, b}|node1|name_1|pname_121|    1212|field_1212|
# |  {a, b}|node1|name_1|pname_122|    1221|field_1221|
# |  {a, b}|node1|name_1|pname_122|    1222|field_1222|
# |  {a, b}|node2|name_2|pname_211|    2111|field_2111|
# |  {a, b}|node2|name_2|pname_211|    2112|field_2112|
# |  {a, b}|node2|name_2|pname_212|    2121|field_2121|
# |  {a, b}|node2|name_2|pname_212|    2122|field_2122|
# |  {a, b}|node2|name_2|pname_221|    2211|field_2211|
# |  {a, b}|node2|name_2|pname_221|    2212|field_2212|
# |  {a, b}|node2|name_2|pname_222|    2221|field_2221|
# |  {a, b}|node2|name_2|pname_222|    2222|field_2222|
# +--------+-----+------+---------+--------+----------+

解释

nodes = df.select("data.*").columns
for n in nodes:
df = df.withColumn("data", F.col("data").withField(n, F.struct(F.lit(n).alias("node"), f"data.{n}.*")))

使用上述方法,我决定保存节点标题以备不时之需。 它首先从"数据"列字段中获取节点列表。使用该列表,for循环在每个节点结构内为节点标题再创建一个字段。

df = df.withColumn("data", F.array("data.*"))

上面将"data"列类型从结构转换为数组,以便在下一步中我们可以轻松地将其分解为列。

for arr_of_struct in ["data", "productlist", "productvalues", "porders"]:
df = df.select(
*[c for c in df.columns if c != arr_of_struct],
F.expr(f"inline({arr_of_struct})")
)

在上面,主线是F.expr(f"inline({arr_of_struct})").它必须在循环中使用,因为它是一个生成器,你不能在 Spark 中将它们嵌套在一起。inline结构数组分解为列。在此步骤中,您有 4 个 [结构数组],因此将创建 4 个inline表达式。

所以,这对你来说不是一个完美的答案,但我希望它能给你一些想法来解决你的问题。我知道您说过您不想对列名称进行硬编码,但目前我无法处理该部分。

首先,我创建了此示例 JSON 进行测试

{
"metadata": {},
"data": {
"node1": {
"name": "Node001",
"productlist": [
{
"productvalues": [
{
"pname": "Node001-P001",
"porders": [
{"ordernum":  1, "field": "Node001-P001-001"},
{"ordernum":  2, "field": "Node001-P001-002"}
]
},
{
"pname": "Node001-P002",
"porders": [
{"ordernum":  3, "field": "Node001-P002-003"},
{"ordernum":  4, "field": "Node001-P002-004"},
{"ordernum":  5, "field": "Node001-P002-005"},
{"ordernum":  6, "field": "Node001-P002-006"}
]
},
{
"pname": "Node001-P003",
"porders": [
{"ordernum":  7, "field": "Node001-P003-007"}
]
}
]
},
{
"productvalues": [
{
"pname": "Node001-P004",
"porders": [
{"ordernum":  8, "field": "Node001-P004-008"},
{"ordernum":  9, "field": "Node001-P004-009"},
{"ordernum": 10, "field": "Node001-P004-010"}
]
},
{
"pname": "Node001-P005",
"porders": [

{"ordernum": 11, "field": "Node001-P005-011"},
{"ordernum": 12, "field": "Node001-P005-012"},
{"ordernum": 13, "field": "Node001-P005-013"}
]
}
]
}
]
},
"node2": {
"name": "Node002",
"productlist": [
{
"productvalues": [
{
"pname": "Node002-P001",
"porders": [
{"ordernum": 14, "field": "Node002-P001-014"}
]
},
{
"pname": "Node002-P002",
"porders": [
{"ordernum": 15, "field": "Node002-P002-015"}
]
},
{
"pname": "Node002-P003",
"porders": [
{"ordernum": 16, "field": "Node002-P003-016"}
]
}
]
},
{
"productvalues": [
{
"pname": "Node002-P004",
"porders": [
{"ordernum": 17, "field": "Node002-P004-017"}
]
},
{
"pname": "Node002-P005",
"porders": [

{"ordernum": 18, "field": "Node002-P005-018"}
]
}
]
}
]
}
}
}

现在,这是一个"类似字典"的列,您需要稍后使用

cols_dict = [
{
'col': ['data.node1.name'],
'exp': ['data.node1.productlist'],
},
{
'exp': ['productlist.productvalues'],
},
{
'col': ['productvalues.pname'],
'exp': ['productvalues.porders'],
},
{
'col': ['porders.ordernum', 'porders.field']
}
]

最后,循环遍历此字典并添加一些转换以获得最终结果

dfx = df
select_col = []
for i in cols_dict:
select_col = [c.split('.')[-1] for c in select_col]
if i.get('col'):
select_col += i['col']

select_exp = []
if i.get('exp'):
select_exp += i['exp']
dfx = dfx.select([F.col(c) for c in select_col] + [F.explode(c).alias(c.split('.')[-1]) for c in select_exp])
+-------+------------+--------+----------------+
|   name|       pname|ordernum|           field|
+-------+------------+--------+----------------+
|Node001|Node001-P001|       1|Node001-P001-001|
|Node001|Node001-P001|       2|Node001-P001-002|
|Node001|Node001-P002|       3|Node001-P002-003|
|Node001|Node001-P002|       4|Node001-P002-004|
|Node001|Node001-P002|       5|Node001-P002-005|
|Node001|Node001-P002|       6|Node001-P002-006|
|Node001|Node001-P003|       7|Node001-P003-007|
|Node001|Node001-P004|       8|Node001-P004-008|
|Node001|Node001-P004|       9|Node001-P004-009|
|Node001|Node001-P004|      10|Node001-P004-010|
|Node001|Node001-P005|      11|Node001-P005-011|
|Node001|Node001-P005|      12|Node001-P005-012|
|Node001|Node001-P005|      13|Node001-P005-013|
+-------+------------+--------+----------------+

最新更新