我几天来一直在努力解决这个问题。
我有一个嵌套的json文件,它有一个复杂的模式(数组在结构中,结构在数组中(,我需要把数据放在数据帧中。
我输入的是(作为一个例子(:
+-----+----------------+-----------------------------------+---------+
| id | name | detail | item |
+-----+----------------+-----------------------------------+---------+
| 100 | Peter Castle | [[D100A, Credit],[D100B, Debit]] | [10,31] |
| 101 | Quino Yukimori | [[D101A, Credit],[D101B, Credit]] | [55,49] |
+-----+----------------+-----------------------------------+---------+
我应该像这个一样阅读
+-----+----------------+-----------+--------+-----------+
| id | name | detail_id | type | item_qty |
+-----+----------------+-----------+--------+-----------+
| 100 | Peter Castle | D100A | Credit | 10 |
| 100 | Peter Castle | D100B | Debit | 31 |
| 101 | Quino Yukimori | D101A | Credit | 55 |
| 101 | Quino Yukimori | D101B | Credit | 49 |
+-----+----------------+-----------+--------+-----------+
但我得到的是:
df.withColumn('detail', explode('detail')).withColumn('item', explode('item'))
+-----+----------------+-----------+--------+-----------+
| id | name | detail_id | type | item_qty |
+-----+----------------+-----------+--------+-----------+
| 100 | Peter Castle | D100A | Credit | 10 |
| 100 | Peter Castle | D100A | Debit | 10 |
| 100 | Peter Castle | D100B | Credit | 31 |
| 100 | Peter Castle | D100B | Debit | 31 |
| 101 | Quino Yukimori | D101A | Credit | 55 |
| 101 | Quino Yukimori | D101A | Credit | 55 |
| 101 | Quino Yukimori | D101B | Credit | 49 |
| 101 | Quino Yukimori | D101B | Credit | 49 |
+-----+----------------+-----------+--------+-----------+
我尝试过将列与arrays_zip组合,然后进行分解,但问题是数组中有数组,如果我分解详细数组列,则项数组列的分解会使数据倍增。
知道我该如何实现吗?
对不起,我的英语不是我的母语。
更新
这是我的模式,它使我为多个嵌套数组读取它变得复杂:
|-- id: string(nullable = true)
|-- name: string(nullable = true)
|-- detail: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- detail_id: string(nullable = true)
| | |-- type: string(nullable = true)
|-- item: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- item_qty : long(nullable = true)
|-- deliveryTrack: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: string(nullable = true)
| | |-- track: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- time: string (nullable = true)
| | | | |-- driver: string (nullable = true)
使用arrays_zip
压缩两个数组后,仅使用explode
一次。然后,使用expr
函数获取数据。
from pyspark.sql.functions import explode, arrays_zip, col, expr
df1 = (df
.withColumn('buffer', explode(arrays_zip(col('detail'), col('item'))))
.withColumn('detail_id', expr("buffer.detail.detail_id"))
.withColumn('type', expr("buffer.detail.type"))
.withColumn('item_qty', expr("buffer.item.item_qty"))
.drop(*['detail', 'item', 'buffer'])
)
df1.show()
+---+--------------+---------+------+--------+
|id |name |detail_id|type |item_qty|
+---+--------------+---------+------+--------+
|100|Peter Castle |D100A |Credit|10 |
|100|Peter Castle |D100B |Debit |31 |
|101|Quino Yukimori|D101A |Credit|55 |
|101|Quino Yukimori|D101B |Credit|49 |
+---+--------------+---------+------+--------+