我将以下数据帧模式作为df.currentSchema,并且需要获取预期的模式作为df.expectSchema,有没有办法在Spark 2.3中实现这一点
df.currentSchema:
|-- enqueuedTime: timestamp (nullable = true)
|-- VIN: string (nullable = true)
|-- TT: long (nullable = true)
|-- MSG_TYPE: string (nullable = true)
|-- ADA: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: long (nullable = true)
| | |-- V: double (nullable = true)
| | |-- E: long (nullable = true)
| | |-- V: double (nullable = true)
|-- ADW: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: long (nullable = true)
| | |-- V: double (nullable = true)
df.expectSchema:
|-- enqueuedTime: timestamp (nullable = true)
|-- VIN: string (nullable = true)
|-- TT: long (nullable = true)
|-- MSG_TYPE: string (nullable = true)
|-- SIGNAL: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- SN: string (nullable = true)
| | |-- E: long (nullable = true)
| | |-- V: double (nullable = true)
| | |-- SN: string (nullable = true)
| | |-- E: long (nullable = true)
| | |-- V: double (nullable = true)
| | |-- SN: string (nullable = true)
| | |-- E: long (nullable = true)
| | |-- V: double (nullable = true)
示例数据:
+----------------+---+---------+-----------------------------------------------------+--------------------------+
|vin |tt |msg_type |ada |adw | |
+-----------------+---+---------+-----------------------------------------------------+--------------------------+
|FU7XXXXXXXXXXXXXX|0 |SIGNAL |[{"E":15XXXXXXXX,"V":2, {"E":15XXXXXXXX,"V":1}] |null |
|FU7XXXXXXXXXXXXXX|0 |SIGNAL |null |[{"E":15XXXXXXXX,"V":3}] |
|FU7XXXXXXXXXXXXXX|0 |SIGNAL |null |[{"E":15XXXXXXXX,"V":4.1}]|
+-----------------+---+---------+--------------------------+--------------------------+--------------------------+
注意:这里需要实现两件事:
- 要为元素中的每个 E、V 对创建新字段 SN,其值应为数组名称。 例如:对于第一个数组列 (ADA(,SN = ADA 的值。 将数组(
- ADA,ADW(合并为一个外部数组(SIGNAL(。
您正在查找的架构不正确,并且在写入数据帧时可能会失败。 我调整了架构,如下所示:
scala> newDF.printSchema
root
|-- ADA: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: string (nullable = true)
| | |-- V: long (nullable = true)
|-- ADW: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: string (nullable = true)
| | |-- V: long (nullable = true)
|-- MSG_TYPE: string (nullable = true)
|-- number: long (nullable = true)
|-- tt: long (nullable = true)
|-- vin: string (nullable = true)
|-- sig: struct (nullable = false)
| |-- SN: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- E: string (nullable = true)
| | | |-- V: long (nullable = true)
| |-- SN: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- E: string (nullable = true)
| | | |-- V: long (nullable = true)
如果您对此架构感到满意,请进一步阅读以实现它。
创建虚拟数据以复制架构(可以忽略此步骤(
scala> val vas = """{"df":[ { "vin": "FU7XXXXXXXXXXXXXX", "tt": 0, "MSG_TYPE": "SIGNAL", "number": 123, "ADA": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}], "ADW": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}] }, { "vin": "FU7XXXXXXXXXXXXXX", "tt": 0, "MSG_TYPE": "SIGNAL", "number": 123, "ADA": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}], "ADW": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}] }, { "vin": "FU7XXXXXXXXXXXXXX", "tt": 0, "MSG_TYPE": "SIGNAL", "number": 123, "ADA": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}], "ADW":[{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}] }] }"""
vas: String = {"df":[ { "vin": "FU7XXXXXXXXXXXXXX", "tt": 0, "MSG_TYPE": "SIGNAL", "number": 123, "ADA": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}], "ADW": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}] }, { "vin": "FU7XXXXXXXXXXXXXX", "tt": 0, "MSG_TYPE": "SIGNAL", "number": 123, "ADA": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}], "ADW": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}] }, { "vin": "FU7XXXXXXXXXXXXXX", "tt": 0, "MSG_TYPE": "SIGNAL", "number": 123, "ADA": [{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}], "ADW":[{"E":"15XXXXXXXX","V":2}, {"E":"15XXXXXXXX","V":1}] }] }
scala> val df = spark.read.json(Seq(vas).toDS).toDF.withColumn("arr", explode($"df")).select("arr.*")
df: org.apache.spark.sql.DataFrame = [ADA: array<struct<E:string,V:bigint>>, ADW: array<struct<E:string,V:bigint>> ... 4 more fields]
我希望您的数据是这样的:
scala> df.show(false)
+--------------------------------+--------------------------------+--------+------+---+-----------------+
|ADA |ADW |MSG_TYPE|number|tt |vin |
+--------------------------------+--------------------------------+--------+------+---+-----------------+
|[[15XXXXXXXX,2], [15XXXXXXXX,1]]|[[15XXXXXXXX,2], [15XXXXXXXX,1]]|SIGNAL |123 |0 |FU7XXXXXXXXXXXXXX|
|[[15XXXXXXXX,2], [15XXXXXXXX,1]]|[[15XXXXXXXX,2], [15XXXXXXXX,1]]|SIGNAL |123 |0 |FU7XXXXXXXXXXXXXX|
|[[15XXXXXXXX,2], [15XXXXXXXX,1]]|[[15XXXXXXXX,2], [15XXXXXXXX,1]]|SIGNAL |123 |0 |FU7XXXXXXXXXXXXXX|
+--------------------------------+--------------------------------+--------+------+---+-----------------+
实现所需输出的步骤
scala> val newDF = df.withColumn("sig", struct($"ADA".as("SN"), $"ADW".as("SN")))
newDF: org.apache.spark.sql.DataFrame = [ADA: array<struct<E:string,V:bigint>>, ADW: array<struct<E:string,V:bigint>> ... 5 more fields]
scala> newDF.printSchema
root
|-- ADA: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: string (nullable = true)
| | |-- V: long (nullable = true)
|-- ADW: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: string (nullable = true)
| | |-- V: long (nullable = true)
|-- MSG_TYPE: string (nullable = true)
|-- number: long (nullable = true)
|-- tt: long (nullable = true)
|-- vin: string (nullable = true)
|-- sig: struct (nullable = false)
| |-- SN: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- E: string (nullable = true)
| | | |-- V: long (nullable = true)
| |-- SN: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- E: string (nullable = true)
| | | |-- V: long (nullable = true)
我尝试编写此数据帧,它工作正常
newDF.write.mode("overwrite").parquet(path + "newDF.parquet")