使用PySpark dataframe根据索引从一个数组中定位值并复制到另一个数组



在这个数据框架中,我有以下两个数组:discount_applications和line_items。line_items数组有一个名为discount_allocaions的内部数组,它有一个名为discount_application_index的字段。要求使用discount_application_index值并找到相应的"类型"。

将discount_applications数组索引中的值复制到对应的applications_type字段。数据框架:

records = '[{"_c":{"discount_applications":[{"type":"manual0"},{"type":"manual1"},{"type":"manual2"},{"type":"manual3"}],"line_items":[{"discount_allocations":[{"application_type":"","discount_application_index":0}]},{"discount_allocations":[{"application_type":"","discount_application_index":1}]},{"discount_allocations":[{"application_type":"","discount_application_index":2}]},{"discount_allocations":[{"application_type":"","discount_application_index":3}]}]}},{"_c":{"discount_applications":[{"type":"manual0"},{"type":"manual1"},{"type":"manual2"}],"line_items":[{"discount_allocations":[{"application_type":"","discount_application_index":0}]},{"discount_allocations":[{"application_type":"","discount_application_index":1}]},{"discount_allocations":[{"application_type":"","discount_application_index":2}]}]}},{"_c":{"discount_applications":[{"type":"manual0"},{"type":"manual1"},{"type":"manual2"}],"line_items":[{"discount_allocations":[{"application_type":"","discount_application_index":0}]},{"discount_allocations":[{"application_type":"","discount_application_index":1}]},{"discount_allocations":[{"application_type":"","discount_application_index":2}]}]}}]'
df = spark.read.json(sc.parallelize([records]))
df.show(truncate=False)
df.printSchema()
root
|-- _c: struct (nullable = true)
|    |-- discount_applications: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- type: string (nullable = true)
|    |-- line_items: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- discount_allocations: array (nullable = true)
|    |    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |    |-- application_type: string (nullable = true)
|    |    |    |    |    |-- discount_application_index: long (nullable = true)
+--------------------------------------------------------------------------------------------+
|_c                                                                                          |
+--------------------------------------------------------------------------------------------+
|[[[manual0], [manual1], [manual2], [manual3]], [[[[, 0]]], [[[, 1]]], [[[, 2]]], [[[, 3]]]]]|
|[[[manual0], [manual1], [manual2]], [[[[, 0]]], [[[, 1]]], [[[, 2]]]]]                      |
|[[[manual0], [manual1], [manual2]], [[[[, 0]]], [[[, 1]]], [[[, 2]]]]]                      |
+--------------------------------------------------------------------------------------------+

转换之后,请求是让数据框看起来像这样:

+------------------------------------------------------------------------------------------------------------------------+
|_c                                                                                                                      |
+------------------------------------------------------------------------------------------------------------------------+
|[[[manual0], [manual1], [manual2], [manual3]], [[[[manual0, 0]]], [[[manual1, 1]]], [[[manual2, 2]]], [[[manual3, 3]]]]]|
|[[[manual0], [manual1], [manual2]], [[[[manual0, 0]]], [[[manual1, 1]]], [[[manual2, 2]]]]]                             |
|[[[manual0], [manual1], [manual2]], [[[[manual0, 0]]], [[[manual1, 1]]], [[[manual2, 2]]]]]                             |
+------------------------------------------------------------------------------------------------------------------------+

让你的头脑清醒,做transform:)

import pyspark.sql.functions as F
df2 = df.withColumn(
'_c', 
F.expr("""
struct(
_c.discount_applications,
transform(
_c.line_items,
x -> struct(
transform(
x.discount_allocations,
y -> struct(
_c.discount_applications[int(y.discount_application_index)].type as application_type,
y.discount_application_index as discount_application_index
)
) as discount_allocations
)
) as line_items
)
""")
)
df2.show(truncate=False)
+------------------------------------------------------------------------------------------------------------------------+
|_c                                                                                                                      |
+------------------------------------------------------------------------------------------------------------------------+
|[[[manual0], [manual1], [manual2], [manual3]], [[[[manual0, 0]]], [[[manual1, 1]]], [[[manual2, 2]]], [[[manual3, 3]]]]]|
|[[[manual0], [manual1], [manual2]], [[[[manual0, 0]]], [[[manual1, 1]]], [[[manual2, 2]]]]]                             |
|[[[manual0], [manual1], [manual2]], [[[[manual0, 0]]], [[[manual1, 1]]], [[[manual2, 2]]]]]                             |
+------------------------------------------------------------------------------------------------------------------------+

最新更新