{"可乐":"浮动","colB":"弦"、"colC":"浮动"}
3{colC: 3.2,"colX":3.9} {"colC":"浮动","colX":"浮动"}{"colG":"val1","colH":93.2} {"colG":"字符串","colH":"浮动"}{"colG":"val4","可乐":4.2,"colJ":93.2,"投资":"val4"} {"colG":"字符串","可乐":"浮动","ColJ":"浮动","投资":"字符串"}
我有一个数据看起来像这样的情况:
您可以为目标列创建一个模式字符串,并使用from_json
来解析values
字段。
# create target schema
jsonsch = data_sdf.
groupBy(func.lit(1).alias('dropme')).
agg(func.array_join(func.collect_list('schema'), ',').alias('allsch')).
withColumn('allsch', func.regexp_replace('allsch', '},{', ', ')).
select('allsch').
collect()[0][0]
# "{'colA':'FLOAT', 'colB':'STRING', 'colC':'FLOAT', 'colC':'FLOAT', 'colX':'FLOAT', 'colG':'STRING', 'colH':'FLOAT', 'colG':'STRING', 'colA':'FLOAT', 'colJ':'FLOAT', 'colM':'STRING'}"
import json
jsonschema = ', '.join([k[0]+' '+k[1].lower() for k in json.loads(jsonsch.replace("'", '"')).items()])
# "colA float, colB string, colC float, colX float, colG string, colH float, colJ float, colM string"
# parse the `values` column using the target schema
data_sdf.
withColumn('parsed_val', func.from_json('values', jsonschema)).
selectExpr('id', 'parsed_val').
show(truncate=False)
# +---+------------------------------------------------+
# |id |parsed_val |
# +---+------------------------------------------------+
# |2 |{3.2, val2, 3.4, null, null, null, null, null} |
# |3 |{null, null, 3.2, null, null, null, null, 3.9} |
# |4 |{null, null, null, val1, 93.2, null, null, null}|
# |5 |{4.2, null, null, val4, null, 93.2, val4, null} |
# +---+------------------------------------------------+
# root
# |-- id: integer (nullable = true)
# |-- parsed_val: struct (nullable = true)
# | |-- colA: float (nullable = true)
# | |-- colB: string (nullable = true)
# | |-- colC: float (nullable = true)
# | |-- colG: string (nullable = true)
# | |-- colH: float (nullable = true)
# | |-- colJ: float (nullable = true)
# | |-- colM: string (nullable = true)
# | |-- colX: float (nullable = true)