如何将PySpark Dataframe列的类型指定为JSON



以下是我们的pyspark应用程序代码片段。

schema = StructType(
[
StructField('name', StringType(), True),
StructField('version', StringType(), True),
StructField('requestBody', StringType(), True),
StructField('id', StringType(), True),
]
)
df_new = df.withColumn('value', from_json('value', schema)) 
.where(col('value.version') == '1') 
.select(col('value.*'))
.na.drop() 
.withColumn('requestBody', decrypt_udf(col('requestBody')))
df_new.show()
+-------+--------+---------------------------------------------+---+
|   name| version|                                  requestBody| id|
+-------+--------+---------------------------------------------+---+
|kj-test|       1|{"data": {"score": 130, "group": "silver"}}  |  1|
|kj-test|       1|{"data": {"score": 250, "group": "gold"}}    |  2|
|kj-test|       1|{"data": {"score": 330, "group": "platinum"}}|  3|
+-------+--------+---------------------------------------------+---+

decrypt_udf UDF函数片段:

@udf(returnType=StringType())
def decrypt_udf(encrypted_string: str):
...
...
return decrypted_json_str

当我将spark数据框写入S3 bucket时,如下所示

df_new.write.mode('overwrite').json(path=s3outputpath)

结果文件的内容如下,这里requestBody的值被写为String,因此在双引号中,也转义了内部的双引号。

{"name":"kj-test","version":"1","requestBody":"{"data": {"score": 130, "group": "silver"}}","id":"1"}
{"name":"kj-test","version":"2","requestBody":"{"data": {"score": 250, "group": "gold"}}","id":"1"}
{"name":"kj-test","version":"3","requestBody":"{"data": {"score": 330, "group": "platinum"}}","id":"1"}

然而,我希望requestBody的值被写为json,如下所示。

{"name":"kj-test","version":"1","requestBody":{"data": {"score": 130, "group": "silver"}},"id":"1"}

我知道我已经在模式StructField('requestBody', StringType(), True)中将requestBody的类型指定为字符串,因此我看到了这样的输出。我怎样才能达到我所期望的输出?没有JsonType这种类型


编辑:

请注意,我的requestBody模式将不总是像这个{"data": {"score": 130, "group": "silver"}}。对于一个给定的运行,它是固定的,但另一个运行可能有一个完全不同的模式。

本质上,需要一种从json字符串推断模式的方法。找到了一些可能有帮助的SO帖子,将尝试这些:

https://stackoverflow.com/a/45880574/948268
Spark from_json with dynamic schema

试试下面的代码。(我没有测试过)

使用from_json函数将requestBodyjson字符串转换为struct。

schema = StructType(
[
StructField('name', StringType(), True),
StructField('version', StringType(), True),
StructField('requestBody', StringType(), True),
StructField('id', StringType(), True),
]
)

requestBody准备schema

requestSchema=StructType(
[
StructField('data', StructType([StructField('group',StringType(),True),StructField('score',LongType(),True)])),
]
)
df_new = df.withColumn('value', from_json('value', schema)) 
.where(col('value.version') == '1') 
.select(col('value.*'))
.withColumn()
.na.drop() 
.withColumn('requestBody', from_json('requestBody',requestSchema))
df_new.write.mode('overwrite').json(path=s3outputpath)

在udf中添加以下方法,将python对象转换为JSON字符串:

import json   
@udf(returnType=StringType())
def decrypt_udf(encrypted_string: str):
...
...
return json.dumps(decrypted_json_str)

较新的解决方案(我觉得这个比较好)

我们最后使用的另一个聪明的解决方案。在这里,我们定义了一个udfget_combined_json,它结合了给定Row的所有列,然后返回一个json字符串。导致我们最终的数据框只有一个列,这样我们就可以把数据框写成一个文本文件,这样整个json字符串就可以不带任何转义地写成。以下是代码片段:

df_new = df.withColumn('value', from_json('value', schema)) 
.where(col('value.version') == '1') 
.select(col('value.*'))
.na.drop() 
.withColumn('requestBody', decrypt_udf(col('requestBody')))
df_new.withColumn('combinedColumns', get_combined_json(struct([df_new[x] for x in df_new.columns]))) 
.select(col('combinedColumns'))
.write.mode('overwrite').text(path=output_s3_bucket_path)
...
@udf(returnType=StringType())
def get_combined_json(row: Row):
return json.dumps({"requestBody": json.loads(row.requestBody),
"name": row.name,
"version": row.version,
"id": row.id})



老解

下面是我们如何从requestBodyjson字符串中导出/推断模式的:

request_body_schema = spark_session.read.json(df_new.rdd.map(lambda r: r.requestBody)).schema

然后使用模式来更新数据框架。这是最终有效的代码:

df_new = df.withColumn('value', from_json('value', schema)) 
.where(col('value.version') == '1') 
.select(col('value.*'))
.na.drop() 
.withColumn('requestBody', decrypt_udf(col('requestBody')))
request_body_schema = spark_session.read.json(df_new.rdd.map(lambda r: r.requestBody)).schema
df_new = df_new.withColumn('requestBody', from_json(col('requestBody'), request_body_schema))
df_new.write.mode('overwrite').json(path=output_s3_bucket_path)

写入S3桶的输出格式如下:

{"name":"kj-test","version":"1","requestBody":{"data": {"score": 130, "group": "silver"}},"id":"1"}

最新更新