将元素添加到结构数组,并在 Spark 2.3 中合并结构数组



我将以下数据帧模式作为df.currentSchema,并且需要获取预期的模式作为df.expectSchema,有没有办法在Spark 2.3中实现这一点

df.currentSchema:

|-- enqueuedTime: timestamp (nullable = true)
|-- VIN: string (nullable = true)
|-- TT: long (nullable = true)
|-- MSG_TYPE: string (nullable = true)
|-- ADA: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- E: long (nullable = true)
|    |    |-- V: double (nullable = true)
|    |    |-- E: long (nullable = true)
|    |    |-- V: double (nullable = true)
|-- ADW: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- E: long (nullable = true)
|    |    |-- V: double (nullable = true)

df.expectSchema:

|-- enqueuedTime: timestamp (nullable = true)
|-- VIN: string (nullable = true)
|-- TT: long (nullable = true)
|-- MSG_TYPE: string (nullable = true)
|-- SIGNAL: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- SN: string (nullable = true)
|    |    |-- E:  long (nullable = true)
|    |    |-- V:  double (nullable = true)
|    |    |-- SN: string (nullable = true) 
|    |    |-- E:  long (nullable = true)
|    |    |-- V:  double (nullable = true)
|    |    |-- SN: string (nullable = true)
|    |    |-- E: long (nullable = true)
|    |    |-- V: double (nullable = true)

示例数据:

+----------------+---+---------+-----------------------------------------------------+--------------------------+
|vin              |tt |msg_type |ada                                                  |adw                       |                      |
+-----------------+---+---------+-----------------------------------------------------+--------------------------+
|FU7XXXXXXXXXXXXXX|0  |SIGNAL   |[{"E":15XXXXXXXX,"V":2, {"E":15XXXXXXXX,"V":1}]      |null                      |                          
|FU7XXXXXXXXXXXXXX|0  |SIGNAL   |null                                                 |[{"E":15XXXXXXXX,"V":3}]  |                      
|FU7XXXXXXXXXXXXXX|0  |SIGNAL   |null                                                 |[{"E":15XXXXXXXX,"V":4.1}]|
+-----------------+---+---------+--------------------------+--------------------------+--------------------------+

注意:这里需要实现两件事:

  1. 要为元素中的每个 E、V 对创建新字段 SN,其值应为数组名称。 例如:对于第一个数组列 (ADA(,SN = ADA 的值。
  2. 将数组(
  3. ADA,ADW(合并为一个外部数组(SIGNAL(。

您正在查找的架构不正确,并且在写入数据帧时可能会失败。 我调整了架构,如下所示:

scala> newDF.printSchema
root
|-- ADA: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- E: string (nullable = true)
|    |    |-- V: long (nullable = true)
|-- ADW: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- E: string (nullable = true)
|    |    |-- V: long (nullable = true)
|-- MSG_TYPE: string (nullable = true)
|-- number: long (nullable = true)
|-- tt: long (nullable = true)
|-- vin: string (nullable = true)
|-- sig: struct (nullable = false)
|    |-- SN: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- E: string (nullable = true)
|    |    |    |-- V: long (nullable = true)
|    |-- SN: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- E: string (nullable = true)
|    |    |    |-- V: long (nullable = true)

如果您对此架构感到满意,请进一步阅读以实现它。

创建虚拟数据以复制架构(可以忽略此步骤(

scala> val vas = """{"df":[ { "vin": "FU7XXXXXXXXXXXXXX", "tt": 0, "MSG_TYPE": "SIGNAL", "number": 123, "ADA": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}], "ADW": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}] }, { "vin": "FU7XXXXXXXXXXXXXX", "tt": 0, "MSG_TYPE": "SIGNAL", "number": 123, "ADA": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}], "ADW": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}] }, { "vin": "FU7XXXXXXXXXXXXXX", "tt": 0, "MSG_TYPE": "SIGNAL", "number": 123, "ADA": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}], "ADW":[{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}] }] }"""
vas: String = {"df":[ { "vin": "FU7XXXXXXXXXXXXXX", "tt": 0, "MSG_TYPE": "SIGNAL", "number": 123, "ADA": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}], "ADW": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}] }, { "vin": "FU7XXXXXXXXXXXXXX", "tt": 0, "MSG_TYPE": "SIGNAL", "number": 123, "ADA": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}], "ADW": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}] }, { "vin": "FU7XXXXXXXXXXXXXX", "tt": 0, "MSG_TYPE": "SIGNAL", "number": 123, "ADA": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}], "ADW":[{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}] }] }
scala> val df = spark.read.json(Seq(vas).toDS).toDF.withColumn("arr", explode($"df")).select("arr.*")
df: org.apache.spark.sql.DataFrame = [ADA: array<struct<E:string,V:bigint>>, ADW: array<struct<E:string,V:bigint>> ... 4 more fields]

我希望您的数据是这样的:

scala> df.show(false)
+--------------------------------+--------------------------------+--------+------+---+-----------------+
|ADA                             |ADW                             |MSG_TYPE|number|tt |vin              |
+--------------------------------+--------------------------------+--------+------+---+-----------------+
|[[15XXXXXXXX,2], [15XXXXXXXX,1]]|[[15XXXXXXXX,2], [15XXXXXXXX,1]]|SIGNAL  |123   |0  |FU7XXXXXXXXXXXXXX|
|[[15XXXXXXXX,2], [15XXXXXXXX,1]]|[[15XXXXXXXX,2], [15XXXXXXXX,1]]|SIGNAL  |123   |0  |FU7XXXXXXXXXXXXXX|
|[[15XXXXXXXX,2], [15XXXXXXXX,1]]|[[15XXXXXXXX,2], [15XXXXXXXX,1]]|SIGNAL  |123   |0  |FU7XXXXXXXXXXXXXX|
+--------------------------------+--------------------------------+--------+------+---+-----------------+

实现所需输出的步骤

scala> val newDF = df.withColumn("sig", struct($"ADA".as("SN"), $"ADW".as("SN")))
newDF: org.apache.spark.sql.DataFrame = [ADA: array<struct<E:string,V:bigint>>, ADW: array<struct<E:string,V:bigint>> ... 5 more fields]
scala> newDF.printSchema
root
|-- ADA: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- E: string (nullable = true)
|    |    |-- V: long (nullable = true)
|-- ADW: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- E: string (nullable = true)
|    |    |-- V: long (nullable = true)
|-- MSG_TYPE: string (nullable = true)
|-- number: long (nullable = true)
|-- tt: long (nullable = true)
|-- vin: string (nullable = true)
|-- sig: struct (nullable = false)
|    |-- SN: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- E: string (nullable = true)
|    |    |    |-- V: long (nullable = true)
|    |-- SN: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- E: string (nullable = true)
|    |    |    |-- V: long (nullable = true)

我尝试编写此数据帧,它工作正常

newDF.write.mode("overwrite").parquet(path + "newDF.parquet")

最新更新