正在读取pyspark中复杂的嵌套json文件



我几天来一直在努力解决这个问题。

我有一个嵌套的json文件,它有一个复杂的模式(数组在结构中,结构在数组中(,我需要把数据放在数据帧中。

我输入的是(作为一个例子(:

+-----+----------------+-----------------------------------+---------+
| id  | name           | detail                            | item    |
+-----+----------------+-----------------------------------+---------+
| 100 | Peter Castle   | [[D100A, Credit],[D100B, Debit]]  | [10,31] |
| 101 | Quino Yukimori | [[D101A, Credit],[D101B, Credit]] | [55,49] |
+-----+----------------+-----------------------------------+---------+

我应该像这个一样阅读

+-----+----------------+-----------+--------+-----------+
| id  | name           | detail_id | type   | item_qty  |
+-----+----------------+-----------+--------+-----------+
| 100 | Peter Castle   | D100A     | Credit | 10        |
| 100 | Peter Castle   | D100B     | Debit  | 31        |
| 101 | Quino Yukimori | D101A     | Credit | 55        |
| 101 | Quino Yukimori | D101B     | Credit | 49        |
+-----+----------------+-----------+--------+-----------+

但我得到的是:


df.withColumn('detail', explode('detail')).withColumn('item', explode('item'))
+-----+----------------+-----------+--------+-----------+
| id  | name           | detail_id | type   |  item_qty |
+-----+----------------+-----------+--------+-----------+
| 100 | Peter Castle   | D100A     | Credit | 10        |
| 100 | Peter Castle   | D100A     | Debit  | 10        |
| 100 | Peter Castle   | D100B     | Credit | 31        |
| 100 | Peter Castle   | D100B     | Debit  | 31        |
| 101 | Quino Yukimori | D101A     | Credit | 55        |
| 101 | Quino Yukimori | D101A     | Credit | 55        |
| 101 | Quino Yukimori | D101B     | Credit | 49        |
| 101 | Quino Yukimori | D101B     | Credit | 49        |
+-----+----------------+-----------+--------+-----------+

我尝试过将列与arrays_zip组合,然后进行分解,但问题是数组中有数组,如果我分解详细数组列,则项数组列的分解会使数据倍增。

知道我该如何实现吗?

对不起,我的英语不是我的母语。

更新

这是我的模式,它使我为多个嵌套数组读取它变得复杂:

|-- id: string(nullable = true)
|-- name: string(nullable = true)
|-- detail: array (nullable = true)
|   |-- element: struct (containsNull = true)
|   |    |-- detail_id: string(nullable = true)
|   |    |-- type: string(nullable = true)
|-- item: array (nullable = true)
|   |-- element: struct (containsNull = true)
|   |    |-- item_qty : long(nullable = true)
|-- deliveryTrack: array (nullable = true)
|   |-- element: struct (containsNull = true)
|   |    |-- date: string(nullable = true)
|   |    |-- track: array (nullable = true)
|   |    |   |-- element: struct (containsNull = true)
|   |    |   |   |-- time: string (nullable = true)
|   |    |   |   |-- driver: string (nullable = true)

使用arrays_zip压缩两个数组后,仅使用explode一次。然后,使用expr函数获取数据。

from pyspark.sql.functions import explode, arrays_zip, col, expr
df1 = (df
.withColumn('buffer', explode(arrays_zip(col('detail'), col('item'))))
.withColumn('detail_id', expr("buffer.detail.detail_id"))
.withColumn('type', expr("buffer.detail.type"))
.withColumn('item_qty', expr("buffer.item.item_qty"))
.drop(*['detail', 'item', 'buffer'])
)
df1.show()
+---+--------------+---------+------+--------+
|id |name          |detail_id|type  |item_qty|
+---+--------------+---------+------+--------+
|100|Peter Castle  |D100A    |Credit|10      |
|100|Peter Castle  |D100B    |Debit |31      |
|101|Quino Yukimori|D101A    |Credit|55      |
|101|Quino Yukimori|D101B    |Credit|49      |
+---+--------------+---------+------+--------+

最新更新