我正在使用PySpark在Databricks中使用REST API。API响应返回一个列表,其中列表的每个元素都是一个JSON字符串。当我并行化JSON时,它会生成一个_corrup_record列,其中该列的每个值都是一个JSON字符串:
### API Call
response = requests.get(api_url, headers=api_call_header)
api_json = response.json()
df = spark.read.json(sc.parallelize(api_json))
display(df)
当我将单个值的JSON字符串复制到JSON验证器中时,它就是这样的:
{
'Var1': 'String',
'Var2': {
'Var3': 'String',
'Var4': None,
'Var5': 'String',
'Var6': 'String',
'Var7': 'String',
'Var8': 'String'
},
'Var9': None,
'Var10': 'String'
}
无论出于何种原因,我都无法访问Var2的嵌套Struct对象。当我使用from_json函数和以下从头开始的模式时,它会从Var2开始产生NULL值:
schema = StructType([
StructField('Var1', StringType()),
StructField('Var2',
StructType([
StructField('Var3', StringType()),
StructField('Var4', NullType()),
StructField('Var5', StringType()),
StructField('Var6', StringType()),
StructField('Var7', StringType()),
StructField('Var8', StringType())
])
),
StructField('Var9', NullType()),
StructField('Var10', StringType())
])
这是我试图解析JSON字符串的代码:df = df.withColumn('struct_json', from_json(col('_corrupt_record'), schema))
它解析第一个键:值对,但将列的其余值生成为NULL:
*object:*
Var1: "String"
Var2: NULL
Var3: NULL
Var4: NULL
Var5: NULL
Var6: NULL
Var7: NULL
Var8: NULL
Var9: NULL
Var10: NULL
任何帮助都将不胜感激!
尝试的解决方案:
JSON模式从头开始-如上所述,它产生NULL值。
multiLine=True和allowSingleQuotes=True读取选项-在另一篇StackOverflow文章中发现了这一点,但在使用我的从头开始的JSON模式时,它仍然产生了NULL值。
使用rdd.map方法的JSON模式-我试图使用
json_schema = spark.read.json(df.rdd.map(lambda row: row._corrupt_record)).schema
派生一个模式,但这只是创建了一个单层Struct对象,其中该层由整个JSON字符串组成,没有解析出任何嵌套对象。SQL解析键:值对-嵌套对象和数组太多,无法成功解析,性能太差。
答案简单得令人尴尬:
通过API调用,api_json = response.json()
创建了一个Python字典。这在type(api_json)
中得到了证实。
使用spark.read.json
方法创建DataFrame是不正确的,因为源api_json
数据是字典而不是JSON。
所以修复程序改变了这个:
response = requests.get(api_url, headers=api_call_header)
api_json = response.json()
df = spark.read.json(sc.parallelize(api_json))
display(df)
对此:
response = requests.get(api_url, headers=api_call_header)
api_json = response.json()
df = spark.createDataFrame(api_json, schema=schema)
display(df)
对于模式,我使用了在PySpark中从头开始构建的模式。