以下是我们的pyspark应用程序代码片段。
schema = StructType(
[
StructField('name', StringType(), True),
StructField('version', StringType(), True),
StructField('requestBody', StringType(), True),
StructField('id', StringType(), True),
]
)
df_new = df.withColumn('value', from_json('value', schema))
.where(col('value.version') == '1')
.select(col('value.*'))
.na.drop()
.withColumn('requestBody', decrypt_udf(col('requestBody')))
df_new.show()
+-------+--------+---------------------------------------------+---+
| name| version| requestBody| id|
+-------+--------+---------------------------------------------+---+
|kj-test| 1|{"data": {"score": 130, "group": "silver"}} | 1|
|kj-test| 1|{"data": {"score": 250, "group": "gold"}} | 2|
|kj-test| 1|{"data": {"score": 330, "group": "platinum"}}| 3|
+-------+--------+---------------------------------------------+---+
decrypt_udf UDF函数片段:
@udf(returnType=StringType())
def decrypt_udf(encrypted_string: str):
...
...
return decrypted_json_str
当我将spark数据框写入S3 bucket时,如下所示
df_new.write.mode('overwrite').json(path=s3outputpath)
结果文件的内容如下,这里requestBody
的值被写为String
,因此在双引号中,也转义了内部的双引号。
{"name":"kj-test","version":"1","requestBody":"{"data": {"score": 130, "group": "silver"}}","id":"1"}
{"name":"kj-test","version":"2","requestBody":"{"data": {"score": 250, "group": "gold"}}","id":"1"}
{"name":"kj-test","version":"3","requestBody":"{"data": {"score": 330, "group": "platinum"}}","id":"1"}
然而,我希望requestBody
的值被写为json,如下所示。
{"name":"kj-test","version":"1","requestBody":{"data": {"score": 130, "group": "silver"}},"id":"1"}
我知道我已经在模式StructField('requestBody', StringType(), True)
中将requestBody的类型指定为字符串,因此我看到了这样的输出。我怎样才能达到我所期望的输出?没有JsonType
这种类型
编辑:
请注意,我的requestBody
模式将不总是像这个{"data": {"score": 130, "group": "silver"}}
。对于一个给定的运行,它是固定的,但另一个运行可能有一个完全不同的模式。
本质上,需要一种从json字符串推断模式的方法。找到了一些可能有帮助的SO帖子,将尝试这些:
https://stackoverflow.com/a/45880574/948268
Spark from_json with dynamic schema
试试下面的代码。(我没有测试过)
使用from_json
函数将requestBody
json字符串转换为struct。
schema = StructType(
[
StructField('name', StringType(), True),
StructField('version', StringType(), True),
StructField('requestBody', StringType(), True),
StructField('id', StringType(), True),
]
)
为requestBody
准备schema
requestSchema=StructType(
[
StructField('data', StructType([StructField('group',StringType(),True),StructField('score',LongType(),True)])),
]
)
df_new = df.withColumn('value', from_json('value', schema))
.where(col('value.version') == '1')
.select(col('value.*'))
.withColumn()
.na.drop()
.withColumn('requestBody', from_json('requestBody',requestSchema))
df_new.write.mode('overwrite').json(path=s3outputpath)
在udf中添加以下方法,将python对象转换为JSON字符串:
import json
@udf(returnType=StringType())
def decrypt_udf(encrypted_string: str):
...
...
return json.dumps(decrypted_json_str)
较新的解决方案(我觉得这个比较好)
我们最后使用的另一个聪明的解决方案。在这里,我们定义了一个udfget_combined_json
,它结合了给定Row
的所有列,然后返回一个json字符串。导致我们最终的数据框只有一个列,这样我们就可以把数据框写成一个文本文件,这样整个json字符串就可以不带任何转义地写成。以下是代码片段:
df_new = df.withColumn('value', from_json('value', schema))
.where(col('value.version') == '1')
.select(col('value.*'))
.na.drop()
.withColumn('requestBody', decrypt_udf(col('requestBody')))
df_new.withColumn('combinedColumns', get_combined_json(struct([df_new[x] for x in df_new.columns])))
.select(col('combinedColumns'))
.write.mode('overwrite').text(path=output_s3_bucket_path)
...
@udf(returnType=StringType())
def get_combined_json(row: Row):
return json.dumps({"requestBody": json.loads(row.requestBody),
"name": row.name,
"version": row.version,
"id": row.id})
老解
下面是我们如何从requestBody
json字符串中导出/推断模式的:
request_body_schema = spark_session.read.json(df_new.rdd.map(lambda r: r.requestBody)).schema
然后使用模式来更新数据框架。这是最终有效的代码:
df_new = df.withColumn('value', from_json('value', schema))
.where(col('value.version') == '1')
.select(col('value.*'))
.na.drop()
.withColumn('requestBody', decrypt_udf(col('requestBody')))
request_body_schema = spark_session.read.json(df_new.rdd.map(lambda r: r.requestBody)).schema
df_new = df_new.withColumn('requestBody', from_json(col('requestBody'), request_body_schema))
df_new.write.mode('overwrite').json(path=output_s3_bucket_path)
写入S3桶的输出格式如下:
{"name":"kj-test","version":"1","requestBody":{"data": {"score": 130, "group": "silver"}},"id":"1"}