我有一个换行符分隔的 json 文件,看起来像
{"id":1,"nested_col": {"key1": "val1", "key2": "val2", "key3": ["arr1", "arr2"]}}
{"id":2,"nested_col": {"key1": "val1_2", "key2": "val2_2", "key3": ["arr1_2", "arr2"]}}
一旦我使用 df = spark.read.json(path_to_file)
读取文件,我最终会得到一个数据帧,其架构如下所示:
DataFrame[id: bigint,nested_col:struct<key1:string,key2:string,key3:array<string>>]
我想做的是将nested_col
转换为字符串,而无需将primitivesAsString
设置为 true(因为我实际上有 100+ 列,并且需要推断所有其他列的类型)。我也不知道nested_col
事先是什么样子的。换句话说,我希望我的DataFrame
看起来像
DataFrame[id: bigint,nested_col:string]
我试着做
df.select(df['nested_col'].cast('string')).take(1)`
但它不会返回 JSON 的正确字符串表示形式:
[Row(nested_col=u'[0,2000000004,2800000004,3000000014,316c6176,326c6176,c00000002,3172726100000010,32727261]')]`
而我希望:
[Row(nested_col=u'{"key1": "val1", "key2": "val2", "key3": ["arr1", "arr2"]}')]
有谁知道我如何获得所需的结果(也称为将嵌套的 JSON 字段/StructType
转换为字符串)?
老实说,解析 JSON 并推断模式只是为了将所有内容推回 JSON 听起来有点奇怪,但你在这里:
-
所需导入:
from pyspark.sql import types from pyspark.sql.functions import to_json, concat_ws, concat, struct
-
帮助程序函数:
def jsonify(df): def convert(f): if isinstance(f.dataType, types.StructType): return to_json(f.name).alias(f.name) if isinstance(f.dataType, types.ArrayType): return get_json_object( to_json(struct(f.name)), "$.{0}".format(f.name) ).alias(f.name) return f.name return df.select([convert(f) for f in df.schema.fields])
-
用法示例:
df = sc.parallelize([("a", 1, (2, 3), ["1", "2", "3"])]).toDF() jsonify(df).show()
+---+---+---------------+-------------+ | _1| _2| _3| _4| +---+---+---------------+-------------+ | a| 1|{"_1":2,"_2":3}|["1","2","3"]| +---+---+---------------+-------------+