我有一个JSON如下:
{"id": 1, "type": "int", "data": {"key0": "val1", "key2": "val2"}}
{"id": 2, "type": "int", "data": {"key2": "val3", "key3": "val4"}}
{"id": 3, "type": "int", "data": {"key1": "val5", "key3": "val6"}}
现在当扁平化使用pyspark时,我需要将所有列设置为key0,key1,key2,key3但是在选择列数据时。Key3或任何其他没有出现在这两个记录中的键作业失败,报错"pyspark.sql.utils。AnalysisException: 'No such struct field"尝试传递模式,但问题仍然存在,并尝试使用when的withColumn方法,但这也失败了。有人遇到类似的问题,并修复了善意的帮助。
下面是我阅读schema的方式:
df_landing = spark.read.format("json").option("multiline", "true").load(input_file)
print(df_landing.printSchema())
下面是结果:
root
|-- data: struct (nullable = true)
| |-- key0: string (nullable = true)
| |-- key2: string (nullable = true)
|-- id: long (nullable = true)
|-- type: string (nullable = true)
您应该删除.option("multiline", "true")
,这是当一个JSON记录跨多行时。
你的数据是JSONL,每行都是有效的JSON,每个JSON不会跨多行。
如果你删除了这个选项,你应该看到这样的模式。
root
|-- data: struct (nullable = true)
| |-- key0: string (nullable = true)
| |-- key1: string (nullable = true)
| |-- key2: string (nullable = true)
| |-- key3: string (nullable = true)
|-- id: long (nullable = true)
|-- type: string (nullable = true)
然后,您可以使用此代码展开结构体。
df_landing = df_landing.select('id', 'type', 'data.*')
# df_landing.show()
+---+----+----+----+----+----+
| id|type|key0|key1|key2|key3|
+---+----+----+----+----+----+
| 1| int|val1|null|val2|null|
| 2| int|null|null|val3|val4|
| 3| int|null|val5|null|val6|
+---+----+----+----+----+----+