基于键合并结构数组的两列



我有一个模式的数据帧,如下所示:

输入数据帧

|-- A: string (nullable = true)
|-- B_2020: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- key: string (nullable = true)
|    |    |-- x: double (nullable = true)
|    |    |-- y: double (nullable = true)
|    |    |-- z: double (nullable = true)
|-- B_2019: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- key: string (nullable = true)
|    |    |-- x: double (nullable = true)
|    |    |-- y: double (nullable = true)

我想根据匹配的key值,将2020和2019列合并为结构数组的一列。

所需架构:

预期输出数据帧

|-- A: string (nullable = true)
|-- B: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- key: string (nullable = true)
|    |    |-- x_this_year: double (nullable = true)
|    |    |-- y_this_year: double (nullable = true)
|    |    |-- x_last_year: double (nullable = true)
|    |    |-- y_last_year: double (nullable = true)
|    |    |-- z_this_year: double (nullable = true)

我想合并结构中匹配的键。还要注意,如果只有2019或2020数据中的一个数据中存在关键字,则需要使用null来替换合并列中其他年份的值。

scala> val df = Seq(
|   ("ABC", 
|   Seq(
|     ("a", 2, 4, 6),
|     ("b", 3, 6, 9),
|     ("c", 1, 2, 3)
|   ),
|   Seq(
|     ("a", 4, 8),
|     ("d", 3, 4)
|   ))
| ).toDF("A", "B_2020", "B_2019").select(
|   $"A",
|   $"B_2020" cast "array<struct<key:string,x:double,y:double,z:double>>",
|   $"B_2019" cast "array<struct<key:string,x:double,y:double>>"
| )
df: org.apache.spark.sql.DataFrame = [A: string, B_2020: array<struct<key:string,x:double,y:double,z:double>> ... 1 more field]
scala> df.printSchema
root
|-- A: string (nullable = true)
|-- B_2020: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- key: string (nullable = true)
|    |    |-- x: double (nullable = true)
|    |    |-- y: double (nullable = true)
|    |    |-- z: double (nullable = true)
|-- B_2019: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- key: string (nullable = true)
|    |    |-- x: double (nullable = true)
|    |    |-- y: double (nullable = true)

scala> df.show(false)
+---+------------------------------------------------------------+------------------------------+
|A  |B_2020                                                      |B_2019                        |
+---+------------------------------------------------------------+------------------------------+
|ABC|[[a, 2.0, 4.0, 6.0], [b, 3.0, 6.0, 9.0], [c, 1.0, 2.0, 3.0]]|[[a, 4.0, 8.0], [d, 3.0, 4.0]]|
+---+------------------------------------------------------------+------------------------------+

scala> val df2020 = df.select($"A", explode($"B_2020") as "this_year").select($"A", 
| $"this_year.key" as "key", $"this_year.x" as "x_this_year", 
| $"this_year.y" as "y_this_year", $"this_year.z" as "z_this_year")
df2020: org.apache.spark.sql.DataFrame = [A: string, key: string ... 3 more fields]

scala> val df2019 = df.select($"A", explode($"B_2019") as "last_year").select($"A", 
| $"last_year.key" as "key", $"last_year.x" as "x_last_year", 
| $"last_year.y" as "y_last_year")
df2019: org.apache.spark.sql.DataFrame = [A: string, key: string ... 2 more fields]

scala> df2020.show(false)
+---+---+-----------+-----------+-----------+
|A  |key|x_this_year|y_this_year|z_this_year|
+---+---+-----------+-----------+-----------+
|ABC|a  |2.0        |4.0        |6.0        |
|ABC|b  |3.0        |6.0        |9.0        |
|ABC|c  |1.0        |2.0        |3.0        |
+---+---+-----------+-----------+-----------+

scala> df2019.show(false)
+---+---+-----------+-----------+
|A  |key|x_last_year|y_last_year|
+---+---+-----------+-----------+
|ABC|a  |4.0        |8.0        |
|ABC|d  |3.0        |4.0        |
+---+---+-----------+-----------+

scala> val outputDF = df2020.join(df2019, Seq("A", "key"),  "outer").select(
|   $"A" as "market_name", 
|   struct($"key", $"x_this_year", $"y_this_year", $"x_last_year", 
|     $"y_last_year", $"z_this_year") as "cancellation_policy_booking")
outputDF: org.apache.spark.sql.DataFrame = [market_name: string, cancellation_policy_booking: struct<key: string, x_this_year: double ... 4 more fields>]
scala> outputDF.printSchema
root
|-- market_name: string (nullable = true)
|-- cancellation_policy_booking: struct (nullable = false)
|    |-- key: string (nullable = true)
|    |-- x_this_year: double (nullable = true)
|    |-- y_this_year: double (nullable = true)
|    |-- x_last_year: double (nullable = true)
|    |-- y_last_year: double (nullable = true)
|    |-- z_this_year: double (nullable = true)

scala> outputDF.show(false)
+-----------+----------------------------+                                      
|market_name|cancellation_policy_booking |
+-----------+----------------------------+
|ABC        |[b, 3.0, 6.0,,, 9.0]        |
|ABC        |[a, 2.0, 4.0, 4.0, 8.0, 6.0]|
|ABC        |[d,,, 3.0, 4.0,]            |
|ABC        |[c, 1.0, 2.0,,, 3.0]        |
+-----------+----------------------------+

最新更新