在pySpark中解析高度嵌套的JSON



我试图解析/读取pyspark数据帧中的以下嵌套JSON。即使pyspark推断出模式,或者当我将模式传递给它时,这也会失败

我正在运行这个AWS EMR集群

{ 
"coffee": {
"region": [
{"id":1,"name":"John Doe"},
{"id":2,"name":"Don Joeh"}
],
"country": {"id":2,"company":"ACME"}
}, 
"brewing": {
"region": [
{"id":1,"name":"John Doe"},
{"id":2,"name":"Don Joeh"}
],
"country": {"id":2,"company":"ACME"}
}
}

Pyspark本身无法解析模式,并引发以下错误。

An error occurred while calling o745.json.
: java.lang.UnsupportedOperationException
at org.apache.hadoop.fs.http.AbstractHttpFileSystem.listStatus(AbstractHttpFileSystem.java:91)
at org.apache.hadoop.fs.http.HttpsFileSystem.listStatus(HttpsFileSystem.java:23)
at org.apache.hadoop.fs.Globber.listStatus(Globber.java:77)
at org.apache.hadoop.fs.Globber.doGlob(Globber.java:235)
at org.apache.hadoop.fs.Globber.glob(Globber.java:149)
............
............
............
Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 274, in json
return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)

我确实尝试过传递我自己的模式,它在下面

代码:

c1_schema= StructType([StructField("id",IntegerType()), StructField("name",StringType())])
region_schema= StructField('region',ArrayType(c1_schema))
country_schema= StructField('country',StructType([StructField("id",IntegerType()), StructField("company",StringType())]))t_schema= StructType([StructField("coffee",StructType([region_schema,country_schema])),StructField("brewing",StructType([region_schema,country_schema]))])
df3= spark.read.option("multiline", "true").json(path1,t_schema)

除非您的JSON有问题,否则您的代码非常好。这是我的版本(我在本地使用Spark 3.1.1(

obj = '''<your entire json above here>'''
# write to file
path1 = 'obj.json'
with open(path1, 'w') as f:
f.write(a)
f.close()
# your exact schema here
c1_schema= StructType([StructField("id",IntegerType()), StructField("name",StringType())])
region_schema= StructField('region',ArrayType(c1_schema))
country_schema= StructField('country',StructType([StructField("id",IntegerType()), StructField("company",StringType())]))
t_schema= StructType([StructField("coffee",StructType([region_schema,country_schema])),StructField("brewing",StructType([region_schema,country_schema]))])
# your exact dataframe here
df3 = spark.read.option("multiline", "true").json(path1,t_schema)

结果

# df3.printSchema()
root
|-- coffee: struct (nullable = true)
|    |-- region: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- id: integer (nullable = true)
|    |    |    |-- name: string (nullable = true)
|    |-- country: struct (nullable = true)
|    |    |-- id: integer (nullable = true)
|    |    |-- company: string (nullable = true)
|-- brewing: struct (nullable = true)
|    |-- region: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- id: integer (nullable = true)
|    |    |    |-- name: string (nullable = true)
|    |-- country: struct (nullable = true)
|    |    |-- id: integer (nullable = true)
|    |    |-- company: string (nullable = true)
# df3.show(10, False)
+-------------------------------------------+-------------------------------------------+
|coffee                                     |brewing                                    |
+-------------------------------------------+-------------------------------------------+
|{[{1, John Doe}, {2, Don Joeh}], {2, ACME}}|{[{1, John Doe}, {2, Don Joeh}], {2, ACME}}|
+-------------------------------------------+-------------------------------------------+

最新更新