Databricks中的_corupt_record列在使用JSON架构(PySpark)时产生NULL值



我正在使用PySpark在Databricks中使用REST API。API响应返回一个列表,其中列表的每个元素都是一个JSON字符串。当我并行化JSON时,它会生成一个_corrup_record列,其中该列的每个值都是一个JSON字符串:

### API Call
response = requests.get(api_url, headers=api_call_header)
api_json = response.json()
df = spark.read.json(sc.parallelize(api_json))
display(df)

当我将单个值的JSON字符串复制到JSON验证器中时,它就是这样的:

{
'Var1': 'String',
'Var2': {
'Var3': 'String',
'Var4': None,
'Var5': 'String',
'Var6': 'String',
'Var7': 'String',
'Var8': 'String'
},
'Var9': None,
'Var10': 'String'
}

无论出于何种原因,我都无法访问Var2的嵌套Struct对象。当我使用from_json函数和以下从头开始的模式时,它会从Var2开始产生NULL值:

schema = StructType([
StructField('Var1', StringType()),
StructField('Var2', 
StructType([
StructField('Var3', StringType()),
StructField('Var4', NullType()),
StructField('Var5', StringType()),
StructField('Var6', StringType()),
StructField('Var7', StringType()),
StructField('Var8', StringType())
])
),
StructField('Var9', NullType()),
StructField('Var10', StringType())
])

这是我试图解析JSON字符串的代码:df = df.withColumn('struct_json', from_json(col('_corrupt_record'), schema))

它解析第一个键:值对,但将列的其余值生成为NULL:

*object:*
Var1: "String"
Var2: NULL
Var3: NULL
Var4: NULL
Var5: NULL
Var6: NULL
Var7: NULL
Var8: NULL
Var9: NULL
Var10: NULL

任何帮助都将不胜感激!

尝试的解决方案:

  1. JSON模式从头开始-如上所述,它产生NULL值。

  2. multiLine=True和allowSingleQuotes=True读取选项-在另一篇StackOverflow文章中发现了这一点,但在使用我的从头开始的JSON模式时,它仍然产生了NULL值。

  3. 使用rdd.map方法的JSON模式-我试图使用json_schema = spark.read.json(df.rdd.map(lambda row: row._corrupt_record)).schema派生一个模式,但这只是创建了一个单层Struct对象,其中该层由整个JSON字符串组成,没有解析出任何嵌套对象。

  4. SQL解析键:值对-嵌套对象和数组太多,无法成功解析,性能太差。

答案简单得令人尴尬:

通过API调用,api_json = response.json()创建了一个Python字典。这在type(api_json)中得到了证实。

使用spark.read.json方法创建DataFrame是不正确的,因为源api_json数据是字典而不是JSON。

所以修复程序改变了这个:

response = requests.get(api_url, headers=api_call_header)
api_json = response.json()
df = spark.read.json(sc.parallelize(api_json))
display(df)

对此:

response = requests.get(api_url, headers=api_call_header)
api_json = response.json()
df = spark.createDataFrame(api_json, schema=schema)
display(df)

对于模式,我使用了在PySpark中从头开始构建的模式。

相关内容

  • 没有找到相关文章

最新更新